Convert API gateway to old Poetry format and add service routing for new microservices
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Deploy to Testnet / notify-deployment (push) Has been cancelled
Documentation Validation / validate-docs (push) Has been cancelled
Documentation Validation / validate-policies-strict (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
API Endpoint Tests / test-api-endpoints (push) Successful in 1m55s
Blockchain Synchronization Verification / sync-verification (push) Failing after 11s
CLI Tests / test-cli (push) Failing after 8s
Cross-Chain Functionality Tests / test-cross-chain-sync (push) Failing after 12s
Cross-Chain Functionality Tests / test-cross-chain-transactions (push) Successful in 13s
Cross-Chain Functionality Tests / test-cross-chain-bridge (push) Has been skipped
Cross-Chain Functionality Tests / test-multi-chain-consensus (push) Failing after 13s
Cross-Chain Functionality Tests / aggregate-results (push) Has been skipped
P2P Network Verification / p2p-verification (push) Successful in 6s
Package Tests / Python package - aitbc-agent-sdk (push) Failing after 32s
Package Tests / Python package - aitbc-core (push) Successful in 15s
Package Tests / Python package - aitbc-crypto (push) Successful in 11s
Package Tests / Python package - aitbc-sdk (push) Successful in 11s
Package Tests / JavaScript package - aitbc-sdk-js (push) Successful in 26s
Package Tests / JavaScript package - aitbc-token (push) Successful in 25s
Production Tests / Production Integration Tests (push) Failing after 1m15s
Smart Contract Tests / test-solidity (map[name:aitbc-contracts path:contracts]) (push) Failing after 2m5s
Smart Contract Tests / test-solidity (map[name:aitbc-token path:packages/solidity/aitbc-token]) (push) Successful in 31s
Smart Contract Tests / test-foundry (push) Failing after 19s
Smart Contract Tests / lint-solidity (push) Successful in 17s
Smart Contract Tests / deploy-contracts (push) Successful in 1m24s
Staking Tests / test-staking-service (push) Failing after 14s
Staking Tests / test-staking-integration (push) Has been skipped
Staking Tests / test-staking-contract (push) Has been skipped
Staking Tests / run-staking-test-runner (push) Has been skipped
Systemd Sync / sync-systemd (push) Successful in 22s
Multi-Node Blockchain Health Monitoring / health-check (push) Failing after 14m13s

- Convert api-gateway pyproject.toml to old Poetry format for workspace compatibility
- Add routing configuration for AI service (port 8106)
- Add routing configuration for Monitoring service (port 8107)
- Add routing configuration for OpenClaw service (port 8108)
- Add routing configuration for Plugin service (port 8109)
- Remove duplicate middleware implementations from coordinator-api (app_logging.py, error
This commit is contained in:
aitbc
2026-04-30 16:15:05 +02:00
parent 11030a3980
commit 8602732d46
52 changed files with 12793 additions and 937 deletions

View File

@@ -1,41 +0,0 @@
"""
Logging utilities for AITBC coordinator API
Uses structlog for structured logging
"""
import structlog
import logging
import sys
def configure_logging(level: str = "INFO") -> None:
"""Configure structlog for structured logging"""
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
wrapper_class=structlog.stdlib.BoundLogger,
)
# Configure standard logging to use structlog
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=getattr(logging, level.upper()),
)
def get_logger(name: str) -> structlog.stdlib.BoundLogger:
"""Get a structured logger instance"""
return structlog.get_logger(name)

View File

@@ -1 +0,0 @@
"""Middleware components for the coordinator API."""

View File

@@ -1,61 +0,0 @@
"""
Standardized error response middleware for FastAPI
"""
from typing import Callable
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp
from ..app_logging import get_logger
logger = get_logger(__name__)
class ErrorHandlerMiddleware(BaseHTTPMiddleware):
"""Middleware to standardize error responses"""
async def dispatch(self, request: Request, call_next: Callable) -> JSONResponse:
try:
response = await call_next(request)
return response
except HTTPException as e:
logger.warning(
"HTTP exception",
status_code=e.status_code,
detail=e.detail,
path=request.url.path,
method=request.method,
)
return JSONResponse(
status_code=e.status_code,
content={
"error": {
"type": "http_error",
"message": e.detail,
"status_code": e.status_code,
"path": request.url.path,
}
},
)
except Exception as e:
logger.error(
"Unhandled exception",
error=str(e),
path=request.url.path,
method=request.method,
exc_info=True,
)
return JSONResponse(
status_code=500,
content={
"error": {
"type": "internal_error",
"message": "An internal server error occurred",
"status_code": 500,
"path": request.url.path,
}
},
)

View File

@@ -1,41 +0,0 @@
"""
Performance logging middleware for tracking request timing
"""
import time
from typing import Callable
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp
from ..app_logging import get_logger
logger = get_logger(__name__)
class PerformanceLoggingMiddleware(BaseHTTPMiddleware):
"""Middleware to log request performance metrics"""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
start_time = time.perf_counter()
# Process request
response = await call_next(request)
# Calculate duration
duration = time.perf_counter() - start_time
# Log performance metrics
logger.info(
"Request performance",
method=request.method,
path=request.url.path,
status_code=response.status_code,
duration_ms=round(duration * 1000, 2),
)
# Add performance header
response.headers["X-Process-Time"] = f"{duration:.3f}"
return response

View File

@@ -1,54 +0,0 @@
"""
Request ID correlation middleware for structured logging
"""
import uuid
from typing import Callable
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp
from ..app_logging import get_logger
logger = get_logger(__name__)
class RequestIDMiddleware(BaseHTTPMiddleware):
"""Middleware to add request ID to all requests for correlation"""
def __init__(self, app: ASGIApp) -> None:
super().__init__(app)
self.header_name = "X-Request-ID"
async def dispatch(self, request: Request, call_next: Callable) -> Response:
# Generate or retrieve request ID
request_id = request.headers.get(self.header_name) or str(uuid.uuid4())
# Add request ID to request state for use in endpoints
request.state.request_id = request_id
# Bind request ID to logger context
logger = get_logger(__name__).bind(request_id=request_id)
# Log request start
logger.info(
"Incoming request",
method=request.method,
path=request.url.path,
client=request.client.host if request.client else "unknown",
)
# Process request
response = await call_next(request)
# Add request ID to response headers
response.headers[self.header_name] = request_id
# Log request completion
logger.info(
"Request completed",
status_code=response.status_code,
)
return response

View File

@@ -1,306 +0,0 @@
"""
Tenant context middleware for multi-tenant isolation
"""
import hashlib
from collections.abc import Callable
from contextvars import ContextVar
from datetime import datetime, UTC
from fastapi import HTTPException, Request, status
from sqlalchemy import and_, event, select
from sqlalchemy.orm import Session
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
from ..exceptions import TenantError
from ..models.multitenant import Tenant, TenantApiKey
from ..services.tenant_management import TenantManagementService
from ..storage.db_pg import get_db
# Context variable for current tenant
current_tenant: ContextVar[Tenant | None] = ContextVar("current_tenant", default=None)
current_tenant_id: ContextVar[str | None] = ContextVar("current_tenant_id", default=None)
def get_current_tenant() -> Tenant | None:
"""Get the current tenant from context"""
return current_tenant.get()
def get_current_tenant_id() -> str | None:
"""Get the current tenant ID from context"""
return current_tenant_id.get()
class TenantContextMiddleware(BaseHTTPMiddleware):
"""Middleware to extract and set tenant context"""
def __init__(self, app, excluded_paths: list | None = None):
super().__init__(app)
self.excluded_paths = excluded_paths or ["/health", "/metrics", "/docs", "/openapi.json", "/favicon.ico", "/static"]
self.logger = __import__("logging").getLogger(f"aitbc.{self.__class__.__name__}")
async def dispatch(self, request: Request, call_next: Callable) -> Response:
# Skip tenant extraction for excluded paths
if self._should_exclude(request.url.path):
return await call_next(request)
# Extract tenant from request
tenant = await self._extract_tenant(request)
if not tenant:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Tenant not found or invalid")
# Check tenant status
if tenant.status not in ["active", "trial"]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Tenant is {tenant.status}")
# Set tenant context
current_tenant.set(tenant)
current_tenant_id.set(str(tenant.id))
# Add tenant to request state for easy access
request.state.tenant = tenant
request.state.tenant_id = str(tenant.id)
# Process request
response = await call_next(request)
# Clear context
current_tenant.set(None)
current_tenant_id.set(None)
return response
def _should_exclude(self, path: str) -> bool:
"""Check if path should be excluded from tenant extraction"""
for excluded in self.excluded_paths:
if path.startswith(excluded):
return True
return False
async def _extract_tenant(self, request: Request) -> Tenant | None:
"""Extract tenant from request using various methods"""
# Method 1: Subdomain
tenant = await self._extract_from_subdomain(request)
if tenant:
return tenant
# Method 2: Custom header
tenant = await self._extract_from_header(request)
if tenant:
return tenant
# Method 3: API key
tenant = await self._extract_from_api_key(request)
if tenant:
return tenant
# Method 4: JWT token (if using OAuth)
tenant = await self._extract_from_token(request)
if tenant:
return tenant
return None
async def _extract_from_subdomain(self, request: Request) -> Tenant | None:
"""Extract tenant from subdomain"""
host = request.headers.get("host", "").split(":")[0]
# Split hostname to get subdomain
parts = host.split(".")
if len(parts) > 2:
subdomain = parts[0]
# Skip common subdomains
if subdomain in ["www", "api", "admin", "app"]:
return None
# Look up tenant by subdomain/slug
db = next(get_db())
try:
service = TenantManagementService(db)
return await service.get_tenant_by_slug(subdomain)
finally:
db.close()
return None
async def _extract_from_header(self, request: Request) -> Tenant | None:
"""Extract tenant from custom header"""
tenant_id = request.headers.get("X-Tenant-ID")
if not tenant_id:
return None
db = next(get_db())
try:
service = TenantManagementService(db)
return await service.get_tenant(tenant_id)
finally:
db.close()
async def _extract_from_api_key(self, request: Request) -> Tenant | None:
"""Extract tenant from API key"""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return None
api_key = auth_header[7:] # Remove "Bearer "
# SECURITY FIX: Use HMAC with a secret key instead of plain sha256 for API key hashing
# This prevents rainbow table attacks and provides better security
import hmac
secret_key = os.environ.get("API_KEY_HASH_SECRET")
if not secret_key:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="API_KEY_HASH_SECRET environment variable not set"
)
key_hash = hmac.new(secret_key.encode(), api_key.encode(), hashlib.sha256).hexdigest()
db = next(get_db())
try:
# Look up API key
stmt = select(TenantApiKey).where(and_(TenantApiKey.key_hash == key_hash, TenantApiKey.is_active))
api_key_record = db.execute(stmt).scalar_one_or_none()
if not api_key_record:
return None
# Check if key has expired
if api_key_record.expires_at and api_key_record.expires_at < datetime.now(datetime.UTC):
return None
# Update last used timestamp
api_key_record.last_used_at = datetime.now(datetime.UTC)
db.commit()
# Get tenant
service = TenantManagementService(db)
return await service.get_tenant(str(api_key_record.tenant_id))
finally:
db.close()
async def _extract_from_token(self, request: Request) -> Tenant | None:
"""Extract tenant from JWT token (HS256 signed)."""
import base64 as _b64
import hmac as _hmac
import json
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return None
token = auth_header[7:]
parts = token.split(".")
if len(parts) != 3:
return None
try:
# Verify HS256 signature
secret = request.app.state.jwt_secret if hasattr(request.app.state, "jwt_secret") else ""
if not secret:
return None
expected_sig = _hmac.new(secret.encode(), f"{parts[0]}.{parts[1]}".encode(), "sha256").hexdigest()
if not _hmac.compare_digest(parts[2], expected_sig):
return None
# Decode payload
padded = parts[1] + "=" * (-len(parts[1]) % 4)
payload = json.loads(_b64.urlsafe_b64decode(padded))
tenant_id = payload.get("tenant_id")
if not tenant_id:
return None
db = next(get_db())
try:
service = TenantManagementService(db)
return await service.get_tenant(tenant_id)
finally:
db.close()
except Exception:
return None
class TenantRowLevelSecurity:
"""Row-level security implementation for tenant isolation"""
def __init__(self, db: Session):
self.db = db
self.logger = __import__("logging").getLogger(f"aitbc.{self.__class__.__name__}")
def enable_rls(self):
"""Enable row-level security for the session"""
tenant_id = get_current_tenant_id()
if not tenant_id:
raise TenantError("No tenant context found")
# Set session variable for PostgreSQL RLS
self.db.execute("SET SESSION aitbc.current_tenant_id = :tenant_id", {"tenant_id": tenant_id})
self.logger.debug(f"Enabled RLS for tenant: {tenant_id}")
def disable_rls(self):
"""Disable row-level security for the session"""
self.db.execute("RESET aitbc.current_tenant_id")
self.logger.debug("Disabled RLS")
# Database event listeners for automatic RLS
@event.listens_for(Session, "after_begin")
def on_session_begin(session, transaction):
"""Enable RLS when session begins"""
try:
tenant_id = get_current_tenant_id()
if tenant_id:
session.execute("SET SESSION aitbc.current_tenant_id = :tenant_id", {"tenant_id": tenant_id})
except Exception as e:
# Log error but don't fail
logger = __import__("logging").getLogger(__name__)
logger.error(f"Failed to set tenant context: {e}")
# Decorator for tenant-aware endpoints
def requires_tenant(func):
"""Decorator to ensure tenant context is present"""
async def wrapper(*args, **kwargs):
tenant = get_current_tenant()
if not tenant:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Tenant context required")
return await func(*args, **kwargs)
return wrapper
# Dependency for FastAPI
async def get_current_tenant_dependency(request: Request) -> Tenant:
"""FastAPI dependency to get current tenant"""
tenant = getattr(request.state, "tenant", None)
if not tenant:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Tenant not found")
return tenant
# Utility functions
def with_tenant_context(tenant_id: str):
"""Execute code with specific tenant context"""
token = current_tenant_id.set(tenant_id)
try:
yield
finally:
current_tenant_id.reset(token)
def is_tenant_admin(user_permissions: list) -> bool:
"""Check if user has tenant admin permissions"""
return "tenant:admin" in user_permissions or "admin" in user_permissions
def has_tenant_permission(permission: str, user_permissions: list) -> bool:
"""Check if user has specific tenant permission"""
return permission in user_permissions or "tenant:admin" in user_permissions

View File

@@ -1,66 +0,0 @@
"""
Request validation middleware for FastAPI
"""
from typing import Callable
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp
from ..app_logging import get_logger
logger = get_logger(__name__)
class RequestValidationMiddleware(BaseHTTPMiddleware):
"""Middleware to validate incoming requests"""
def __init__(
self,
app: ASGIApp,
max_request_size: int = 10 * 1024 * 1024, # 10MB default
max_response_size: int = 10 * 1024 * 1024, # 10MB default
) -> None:
super().__init__(app)
self.max_request_size = max_request_size
self.max_response_size = max_response_size
async def dispatch(self, request: Request, call_next: Callable) -> Response:
# Validate request size
content_length = request.headers.get("content-length")
if content_length:
try:
size = int(content_length)
if size > self.max_request_size:
logger.warning(
"Request too large",
content_length=size,
max_size=self.max_request_size,
client=request.client.host if request.client else "unknown",
)
raise HTTPException(
status_code=413,
detail=f"Request too large. Maximum size is {self.max_request_size} bytes",
)
except ValueError:
logger.warning("Invalid content-length header", content_length=content_length)
# Process request
response = await call_next(request)
# Validate response size
response_size = len(response.body)
if response_size > self.max_response_size:
logger.warning(
"Response too large",
response_size=response_size,
max_size=self.max_response_size,
path=request.url.path,
)
raise HTTPException(
status_code=500,
detail="Response too large",
)
return response

View File

@@ -1,60 +0,0 @@
from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from ..domain.gpu_marketplace import ConsumerGPUProfile, EdgeGPUMetrics, GPUArchitecture
from ..services.edge_gpu_service import EdgeGPUService
from ..storage import get_session
router = APIRouter(prefix="/v1/marketplace/edge-gpu", tags=["edge-gpu"])
def get_edge_service(session: Annotated[Session, Depends(get_session)]) -> EdgeGPUService:
return EdgeGPUService(session)
@router.get("/profiles", response_model=list[ConsumerGPUProfile])
async def get_consumer_gpu_profiles(
architecture: GPUArchitecture | None = Query(default=None),
edge_optimized: bool | None = Query(default=None),
min_memory_gb: int | None = Query(default=None),
svc: EdgeGPUService = Depends(get_edge_service),
) -> list[ConsumerGPUProfile]:
return svc.list_profiles(architecture=architecture, edge_optimized=edge_optimized, min_memory_gb=min_memory_gb)
@router.get("/metrics/{gpu_id}", response_model=list[EdgeGPUMetrics])
async def get_edge_gpu_metrics(
gpu_id: str,
limit: int = Query(default=100, ge=1, le=500),
svc: EdgeGPUService = Depends(get_edge_service),
) -> list[EdgeGPUMetrics]:
return svc.list_metrics(gpu_id=gpu_id, limit=limit)
@router.post("/scan/{miner_id}")
async def scan_edge_gpus(miner_id: str, svc: EdgeGPUService = Depends(get_edge_service)) -> dict[str, Any]:
"""Scan and register edge GPUs for a miner"""
try:
result = await svc.discover_and_register_edge_gpus(miner_id)
return {
"miner_id": miner_id,
"gpus_discovered": len(result["gpus"]),
"gpus_registered": result["registered"],
"edge_optimized": result["edge_optimized"],
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/optimize/inference/{gpu_id}")
async def optimize_inference(
gpu_id: str, model_name: str, request_data: dict, svc: EdgeGPUService = Depends(get_edge_service)
) -> dict[str, Any]:
"""Optimize ML inference request for edge GPU"""
try:
optimized = await svc.optimize_inference_for_edge(gpu_id, model_name, request_data)
return optimized
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))