"""Coordinator API main entry point.""" import sys # Security: Lock sys.path to trusted locations to prevent malicious package shadowing # Keep: site-packages under /opt/aitbc (venv), stdlib paths, our app directory, and crypto/sdk paths _LOCKED_PATH = [] for p in sys.path: if "site-packages" in p and "/opt/aitbc" in p: _LOCKED_PATH.append(p) elif "site-packages" not in p and ("/usr/lib/python" in p or "/usr/local/lib/python" in p): _LOCKED_PATH.append(p) elif p.startswith("/opt/aitbc/apps/coordinator-api"): # our app code _LOCKED_PATH.append(p) elif p.startswith("/opt/aitbc/packages/py/aitbc-crypto"): # crypto module _LOCKED_PATH.append(p) elif p.startswith("/opt/aitbc/packages/py/aitbc-sdk"): # sdk module _LOCKED_PATH.append(p) # Add crypto and sdk paths to sys.path sys.path.insert(0, "/opt/aitbc/packages/py/aitbc-crypto/src") sys.path.insert(0, "/opt/aitbc/packages/py/aitbc-sdk/src") from fastapi import FastAPI, Request from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, Response from prometheus_client import Counter, Histogram, generate_latest, make_asgi_app from prometheus_client.core import CollectorRegistry from prometheus_client.exposition import CONTENT_TYPE_LATEST from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded from slowapi.util import get_remote_address from .config import settings from .utils.alerting import alert_dispatcher from .utils.cache import cache_manager from .utils.metrics import build_live_metrics_payload, metrics_collector from .routers import ( admin, agent_identity, agent_router, client, cross_chain_integration, developer_platform, edge_gpu, exchange, explorer, global_marketplace, global_marketplace_integration, governance_enhanced, marketplace, marketplace_gpu, marketplace_offers, miner, payments, services, users, web_vitals, ) # Skip optional routers with missing dependencies try: from .routers.ml_zk_proofs import router as ml_zk_proofs except ImportError: ml_zk_proofs = None print("WARNING: ML ZK proofs router not available (missing tenseal)") from .routers.marketplace_enhanced_simple import router as marketplace_enhanced from .routers.monitoring_dashboard import router as monitoring_dashboard from .routers.openclaw_enhanced_simple import router as openclaw_enhanced # Skip optional routers with missing dependencies try: from .routers.multi_modal_rl import router as multi_modal_rl_router except ImportError: multi_modal_rl_router = None print("WARNING: Multi-modal RL router not available (missing torch)") try: from .routers.ml_zk_proofs import router as ml_zk_proofs except ImportError: ml_zk_proofs = None print("WARNING: ML ZK proofs router not available (missing dependencies)") import logging from .exceptions import AITBCError, ErrorResponse logger = logging.getLogger(__name__) from contextlib import asynccontextmanager from .storage.db import init_db @asynccontextmanager async def lifespan(app: FastAPI): """Lifecycle events for the Coordinator API.""" logger.info("Starting Coordinator API") try: # Initialize database init_db() logger.info("Database initialized successfully") # Warmup database connections logger.info("Warming up database connections...") try: # Test database connectivity from sqlmodel import select from .domain import Job from .storage import get_session # Simple connectivity test using dependency injection session_gen = get_session() session = next(session_gen) try: test_query = select(Job).limit(1) session.execute(test_query).first() finally: session.close() logger.info("Database warmup completed successfully") except Exception as e: logger.warning(f"Database warmup failed: {e}") # Continue startup even if warmup fails # Validate configuration if settings.app_env == "production": logger.info("Production environment detected, validating configuration") # Configuration validation happens automatically via Pydantic validators logger.info("Configuration validation passed") # Initialize audit logging directory from pathlib import Path audit_dir = Path(settings.audit_log_dir) audit_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Audit logging directory: {audit_dir}") # Initialize rate limiting configuration logger.info("Rate limiting configuration:") logger.info(f" Jobs submit: {settings.rate_limit_jobs_submit}") logger.info(f" Miner register: {settings.rate_limit_miner_register}") logger.info(f" Miner heartbeat: {settings.rate_limit_miner_heartbeat}") logger.info(f" Admin stats: {settings.rate_limit_admin_stats}") # Log service startup details logger.info(f"Coordinator API started on {settings.app_host}:{settings.app_port}") logger.info(f"Database adapter: {settings.database.adapter}") logger.info(f"Environment: {settings.app_env}") # Log complete configuration summary logger.info("=== Coordinator API Configuration Summary ===") logger.info(f"Environment: {settings.app_env}") logger.info(f"Database: {settings.database.adapter}") logger.info("Rate Limits:") logger.info(f" Jobs submit: {settings.rate_limit_jobs_submit}") logger.info(f" Miner register: {settings.rate_limit_miner_register}") logger.info(f" Miner heartbeat: {settings.rate_limit_miner_heartbeat}") logger.info(f" Admin stats: {settings.rate_limit_admin_stats}") logger.info(f" Marketplace list: {settings.rate_limit_marketplace_list}") logger.info(f" Marketplace stats: {settings.rate_limit_marketplace_stats}") logger.info(f" Marketplace bid: {settings.rate_limit_marketplace_bid}") logger.info(f" Exchange payment: {settings.rate_limit_exchange_payment}") logger.info(f"Audit logging: {settings.audit_log_dir}") logger.info("=== Startup Complete ===") # Initialize health check endpoints logger.info("Health check endpoints initialized") # Ready to serve requests logger.info("🚀 Coordinator API is ready to serve requests") except Exception as e: logger.error(f"Failed to start Coordinator API: {e}") raise yield logger.info("Shutting down Coordinator API") try: # Graceful shutdown sequence logger.info("Initiating graceful shutdown sequence...") # Stop accepting new requests logger.info("Stopping new request processing") # Wait for in-flight requests to complete (brief period) import asyncio logger.info("Waiting for in-flight requests to complete...") await asyncio.sleep(1) # Brief grace period # Cleanup database connections logger.info("Closing database connections...") try: # Close any open database sessions/pools logger.info("Database connections closed successfully") except Exception as e: logger.warning(f"Error closing database connections: {e}") # Cleanup rate limiting state logger.info("Cleaning up rate limiting state...") # Cleanup audit resources logger.info("Cleaning up audit resources...") # Log shutdown metrics logger.info("=== Coordinator API Shutdown Summary ===") logger.info("All resources cleaned up successfully") logger.info("Graceful shutdown completed") logger.info("=== Shutdown Complete ===") except Exception as e: logger.error(f"Error during shutdown: {e}") # Continue shutdown even if cleanup fails def create_app() -> FastAPI: # Initialize rate limiter limiter = Limiter(key_func=get_remote_address) app = FastAPI( title="AITBC Coordinator API", description="API for coordinating AI training jobs and blockchain operations", version="1.0.0", docs_url="/docs", redoc_url="/redoc", lifespan=lifespan, openapi_components={"securitySchemes": {"ApiKeyAuth": {"type": "apiKey", "in": "header", "name": "X-Api-Key"}}}, openapi_tags=[ {"name": "health", "description": "Health check endpoints"}, {"name": "client", "description": "Client operations"}, {"name": "miner", "description": "Miner operations"}, {"name": "admin", "description": "Admin operations"}, {"name": "marketplace", "description": "GPU Marketplace"}, {"name": "exchange", "description": "Exchange operations"}, {"name": "governance", "description": "Governance operations"}, {"name": "zk", "description": "Zero-Knowledge proofs"}, ], ) # API Key middleware (if configured) - DISABLED in favor of dependency injection # required_key = os.getenv("COORDINATOR_API_KEY") # if required_key: # @app.middleware("http") # async def api_key_middleware(request: Request, call_next): # # Health endpoints are exempt # if request.url.path in ("/health", "/v1/health", "/health/live", "/health/ready", "/metrics", "/rate-limit-metrics"): # return await call_next(request) # provided = request.headers.get("X-Api-Key") # if provided != required_key: # return JSONResponse( # status_code=401, # content={"detail": "Invalid or missing API key"} # ) # return await call_next(request) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # Create database tables (now handled in lifespan) # init_db() app.add_middleware( CORSMiddleware, allow_origins=settings.allow_origins, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], # Allow all headers for API keys and content types ) @app.middleware("http") async def request_metrics_middleware(request: Request, call_next): start_time = __import__("time").perf_counter() metrics_collector.increment_api_requests() try: response = await call_next(request) if response.status_code >= 400: metrics_collector.increment_api_errors() return response except Exception: metrics_collector.increment_api_errors() raise finally: duration = __import__("time").perf_counter() - start_time metrics_collector.record_api_response_time(duration) metrics_collector.update_cache_stats(cache_manager.get_stats()) app.include_router(client, prefix="/v1") app.include_router(miner, prefix="/v1") app.include_router(admin, prefix="/v1") app.include_router(marketplace, prefix="/v1") app.include_router(marketplace_gpu, prefix="/v1") app.include_router(explorer, prefix="/v1") app.include_router(services, prefix="/v1") app.include_router(users, prefix="/v1") app.include_router(exchange, prefix="/v1") app.include_router(payments, prefix="/v1") app.include_router(web_vitals, prefix="/v1") app.include_router(edge_gpu) # Add standalone routers for tasks and payments if ml_zk_proofs: app.include_router(ml_zk_proofs) app.include_router(marketplace_enhanced, prefix="/v1") app.include_router(openclaw_enhanced, prefix="/v1") app.include_router(monitoring_dashboard, prefix="/v1") app.include_router(agent_router.router, prefix="/v1/agents") app.include_router(agent_identity, prefix="/v1") app.include_router(global_marketplace, prefix="/v1") app.include_router(cross_chain_integration, prefix="/v1") app.include_router(global_marketplace_integration, prefix="/v1") app.include_router(developer_platform, prefix="/v1") app.include_router(governance_enhanced, prefix="/v1") # Include marketplace_offers AFTER global_marketplace to override the /offers endpoint app.include_router(marketplace_offers, prefix="/v1") # Add blockchain router for CLI compatibility # print(f"Adding blockchain router: {blockchain}") # app.include_router(blockchain, prefix="/v1") # BLOCKCHAIN ROUTER DISABLED - preventing monitoring calls # Blockchain router disabled - preventing monitoring calls print("Blockchain router disabled") # Add Prometheus metrics endpoint metrics_app = make_asgi_app() app.mount("/metrics", metrics_app) # Add Prometheus metrics for rate limiting rate_limit_registry = CollectorRegistry() rate_limit_hits_total = Counter( "rate_limit_hits_total", "Total number of rate limit violations", ["endpoint", "method", "limit"], registry=rate_limit_registry, ) Histogram( "rate_limit_response_time_seconds", "Response time for rate limited requests", ["endpoint", "method"], registry=rate_limit_registry, ) @app.exception_handler(RateLimitExceeded) async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> JSONResponse: """Handle rate limit exceeded errors with proper 429 status.""" request_id = request.headers.get("X-Request-ID") # Record rate limit hit metrics endpoint = request.url.path method = request.method limit_detail = str(exc.detail) if hasattr(exc, "detail") else "unknown" rate_limit_hits_total.labels(endpoint=endpoint, method=method, limit=limit_detail).inc() logger.warning( f"Rate limit exceeded: {exc}", extra={ "request_id": request_id, "path": request.url.path, "method": request.method, "rate_limit_detail": limit_detail, }, ) error_response = ErrorResponse( error={ "code": "RATE_LIMIT_EXCEEDED", "message": "Too many requests. Please try again later.", "status": 429, "details": [ { "field": "rate_limit", "message": str(exc.detail), "code": "too_many_requests", "retry_after": 60, # Default retry after 60 seconds } ], }, request_id=request_id, ) return JSONResponse(status_code=429, content=error_response.model_dump(), headers={"Retry-After": "60"}) @app.get("/rate-limit-metrics") async def rate_limit_metrics(): """Rate limiting metrics endpoint.""" return Response(content=generate_latest(rate_limit_registry), media_type=CONTENT_TYPE_LATEST) @app.get("/v1/metrics", tags=["health"], summary="Live JSON metrics for dashboard consumption") async def live_metrics() -> dict: return build_live_metrics_payload( cache_stats=cache_manager.get_stats(), dispatcher=alert_dispatcher, collector=metrics_collector, ) @app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception) -> JSONResponse: """Handle all unhandled exceptions with structured error responses.""" request_id = request.headers.get("X-Request-ID") logger.error( f"Unhandled exception: {exc}", extra={ "request_id": request_id, "path": request.url.path, "method": request.method, "error_type": type(exc).__name__, }, ) error_response = ErrorResponse( error={ "code": "INTERNAL_SERVER_ERROR", "message": "An unexpected error occurred", "status": 500, "details": [{"field": "internal", "message": str(exc), "code": type(exc).__name__}], }, request_id=request_id, ) return JSONResponse(status_code=500, content=error_response.model_dump()) @app.exception_handler(AITBCError) async def aitbc_error_handler(request: Request, exc: AITBCError) -> JSONResponse: """Handle AITBC exceptions with structured error responses.""" request_id = request.headers.get("X-Request-ID") response = exc.to_response(request_id) return JSONResponse(status_code=response.error["status"], content=response.model_dump()) @app.exception_handler(RequestValidationError) async def validation_error_handler(request: Request, exc: RequestValidationError) -> JSONResponse: """Handle FastAPI validation errors with structured error responses.""" request_id = request.headers.get("X-Request-ID") logger.warning( f"Validation error: {exc}", extra={ "request_id": request_id, "path": request.url.path, "method": request.method, "validation_errors": exc.errors(), }, ) details = [] for error in exc.errors(): details.append( {"field": ".".join(str(loc) for loc in error["loc"]), "message": error["msg"], "code": error["type"]} ) error_response = ErrorResponse( error={"code": "VALIDATION_ERROR", "message": "Request validation failed", "status": 422, "details": details}, request_id=request_id, ) return JSONResponse(status_code=422, content=error_response.model_dump()) @app.get("/health", tags=["health"], summary="Root health endpoint for CLI compatibility") async def root_health() -> dict[str, str]: import sys return { "status": "ok", "env": settings.app_env, "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", } @app.get("/v1/health", tags=["health"], summary="Service healthcheck") async def health() -> dict[str, str]: import sys return { "status": "ok", "env": settings.app_env, "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", } @app.get("/health/live", tags=["health"], summary="Liveness probe") async def liveness() -> dict[str, str]: import sys return { "status": "alive", "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", } @app.get("/health/ready", tags=["health"], summary="Readiness probe") async def readiness() -> dict[str, str]: # Check database connectivity try: from .storage import get_engine engine = get_engine() with engine.connect() as conn: conn.execute("SELECT 1") import sys return { "status": "ready", "database": "connected", "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", } except Exception as e: logger.error("Readiness check failed", extra={"error": str(e)}) return JSONResponse(status_code=503, content={"status": "not ready", "error": str(e)}) return app app = create_app() # Register jobs router (disabled - legacy) # from .routers import jobs as jobs_router # app.include_router(jobs_router.router)