From f5b3c8c1bde0f3961550f9744ea7d57f38cf01cd Mon Sep 17 00:00:00 2001 From: aitbc Date: Mon, 30 Mar 2026 23:30:59 +0200 Subject: [PATCH] fix: disable blockchain router to prevent monitoring call conflicts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Blockchain Router Changes: - Commented out blockchain router inclusion in main.py - Added clear deprecation notice explaining router is disabled - Changed startup message from "added successfully" to "disabled" - Reason: Blockchain router was preventing monitoring calls from functioning properly Router Management: ✅ ROUTER DISABLED: Blockchain router no longer included in app ⚠️ Monitoring Fix: Prevents conflicts with monitoring endpoints --- apps/coordinator-api/src/app/main.py | 8 +- apps/coordinator-api/src/app/main.py.backup | 479 +++++++++++++++++++- 2 files changed, 467 insertions(+), 20 deletions(-) diff --git a/apps/coordinator-api/src/app/main.py b/apps/coordinator-api/src/app/main.py index bb034101..fee255d3 100755 --- a/apps/coordinator-api/src/app/main.py +++ b/apps/coordinator-api/src/app/main.py @@ -312,9 +312,11 @@ def create_app() -> FastAPI: app.include_router(marketplace_offers, prefix="/v1") # Add blockchain router for CLI compatibility - print(f"Adding blockchain router: {blockchain}") - app.include_router(blockchain, prefix="/v1") - print("Blockchain router added successfully") + # print(f"Adding blockchain router: {blockchain}") + # app.include_router(blockchain, prefix="/v1") + # BLOCKCHAIN ROUTER DISABLED - preventing monitoring calls + # Blockchain router disabled - preventing monitoring calls + print("Blockchain router disabled") # Add Prometheus metrics endpoint metrics_app = make_asgi_app() diff --git a/apps/coordinator-api/src/app/main.py.backup b/apps/coordinator-api/src/app/main.py.backup index 85325b73..bb034101 100644 --- a/apps/coordinator-api/src/app/main.py.backup +++ b/apps/coordinator-api/src/app/main.py.backup @@ -1,6 +1,38 @@ -from fastapi import FastAPI +"""Coordinator API main entry point.""" +import sys +import os + +# Security: Lock sys.path to trusted locations to prevent malicious package shadowing +# Keep: site-packages under /opt/aitbc (venv), stdlib paths, our app directory, and crypto/sdk paths +_LOCKED_PATH = [] +for p in sys.path: + if 'site-packages' in p and '/opt/aitbc' in p: + _LOCKED_PATH.append(p) + elif 'site-packages' not in p and ('/usr/lib/python' in p or '/usr/local/lib/python' in p): + _LOCKED_PATH.append(p) + elif p.startswith('/opt/aitbc/apps/coordinator-api'): # our app code + _LOCKED_PATH.append(p) + elif p.startswith('/opt/aitbc/packages/py/aitbc-crypto'): # crypto module + _LOCKED_PATH.append(p) + elif p.startswith('/opt/aitbc/packages/py/aitbc-sdk'): # sdk module + _LOCKED_PATH.append(p) + +# Add crypto and sdk paths to sys.path +sys.path.insert(0, '/opt/aitbc/packages/py/aitbc-crypto/src') +sys.path.insert(0, '/opt/aitbc/packages/py/aitbc-sdk/src') + +from sqlalchemy.orm import Session +from typing import Annotated +from slowapi import Limiter, _rate_limit_exceeded_handler +from slowapi.util import get_remote_address +from slowapi.errors import RateLimitExceeded +from fastapi import FastAPI, Request, Depends from fastapi.middleware.cors import CORSMiddleware -from prometheus_client import make_asgi_app +from fastapi.responses import JSONResponse, Response +from fastapi.exceptions import RequestValidationError +from prometheus_client import Counter, Histogram, generate_latest, make_asgi_app +from prometheus_client.core import CollectorRegistry +from prometheus_client.exposition import CONTENT_TYPE_LATEST from .config import settings from .storage import init_db @@ -17,21 +49,226 @@ from .routers import ( zk_applications, explorer, payments, + web_vitals, + edge_gpu, + cache_management, + agent_identity, + agent_router, + global_marketplace, + cross_chain_integration, + global_marketplace_integration, + developer_platform, + governance_enhanced, + blockchain ) -from .routers.governance import router as governance +# Skip optional routers with missing dependencies +try: + from .routers.ml_zk_proofs import router as ml_zk_proofs +except ImportError: + ml_zk_proofs = None + print("WARNING: ML ZK proofs router not available (missing tenseal)") +from .routers.community import router as community_router +from .routers.governance import router as new_governance_router from .routers.partners import router as partners -from .storage.models_governance import GovernanceProposal, ProposalVote, TreasuryTransaction, GovernanceParameter +from .routers.marketplace_enhanced_simple import router as marketplace_enhanced +from .routers.openclaw_enhanced_simple import router as openclaw_enhanced +from .routers.monitoring_dashboard import router as monitoring_dashboard +# Skip optional routers with missing dependencies +try: + from .routers.multi_modal_rl import router as multi_modal_rl_router +except ImportError: + multi_modal_rl_router = None + print("WARNING: Multi-modal RL router not available (missing torch)") +try: + from .routers.ml_zk_proofs import router as ml_zk_proofs +except ImportError: + ml_zk_proofs = None + print("WARNING: ML ZK proofs router not available (missing dependencies)") +from .storage.models_governance import GovernanceProposal, ProposalVote, TreasuryTransaction, GovernanceParameter +from .exceptions import AITBCError, ErrorResponse +import logging +logger = logging.getLogger(__name__) +from .config import settings +from .storage.db import init_db + + + +from contextlib import asynccontextmanager + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Lifecycle events for the Coordinator API.""" + logger.info("Starting Coordinator API") + + try: + # Initialize database + init_db() + logger.info("Database initialized successfully") + + # Warmup database connections + logger.info("Warming up database connections...") + try: + # Test database connectivity + from sqlmodel import select + from .domain import Job + from .storage import get_session + + # Simple connectivity test using dependency injection + session_gen = get_session() + session = next(session_gen) + try: + test_query = select(Job).limit(1) + session.execute(test_query).first() + finally: + session.close() + logger.info("Database warmup completed successfully") + except Exception as e: + logger.warning(f"Database warmup failed: {e}") + # Continue startup even if warmup fails + + # Validate configuration + if settings.app_env == "production": + logger.info("Production environment detected, validating configuration") + # Configuration validation happens automatically via Pydantic validators + logger.info("Configuration validation passed") + + # Initialize audit logging directory + from pathlib import Path + audit_dir = Path(settings.audit_log_dir) + audit_dir.mkdir(parents=True, exist_ok=True) + logger.info(f"Audit logging directory: {audit_dir}") + + # Initialize rate limiting configuration + logger.info("Rate limiting configuration:") + logger.info(f" Jobs submit: {settings.rate_limit_jobs_submit}") + logger.info(f" Miner register: {settings.rate_limit_miner_register}") + logger.info(f" Miner heartbeat: {settings.rate_limit_miner_heartbeat}") + logger.info(f" Admin stats: {settings.rate_limit_admin_stats}") + + # Log service startup details + logger.info(f"Coordinator API started on {settings.app_host}:{settings.app_port}") + logger.info(f"Database adapter: {settings.database.adapter}") + logger.info(f"Environment: {settings.app_env}") + + # Log complete configuration summary + logger.info("=== Coordinator API Configuration Summary ===") + logger.info(f"Environment: {settings.app_env}") + logger.info(f"Database: {settings.database.adapter}") + logger.info(f"Rate Limits:") + logger.info(f" Jobs submit: {settings.rate_limit_jobs_submit}") + logger.info(f" Miner register: {settings.rate_limit_miner_register}") + logger.info(f" Miner heartbeat: {settings.rate_limit_miner_heartbeat}") + logger.info(f" Admin stats: {settings.rate_limit_admin_stats}") + logger.info(f" Marketplace list: {settings.rate_limit_marketplace_list}") + logger.info(f" Marketplace stats: {settings.rate_limit_marketplace_stats}") + logger.info(f" Marketplace bid: {settings.rate_limit_marketplace_bid}") + logger.info(f" Exchange payment: {settings.rate_limit_exchange_payment}") + logger.info(f"Audit logging: {settings.audit_log_dir}") + logger.info("=== Startup Complete ===") + + # Initialize health check endpoints + logger.info("Health check endpoints initialized") + + # Ready to serve requests + logger.info("🚀 Coordinator API is ready to serve requests") + + except Exception as e: + logger.error(f"Failed to start Coordinator API: {e}") + raise + + yield + + logger.info("Shutting down Coordinator API") + try: + # Graceful shutdown sequence + logger.info("Initiating graceful shutdown sequence...") + + # Stop accepting new requests + logger.info("Stopping new request processing") + + # Wait for in-flight requests to complete (brief period) + import asyncio + logger.info("Waiting for in-flight requests to complete...") + await asyncio.sleep(1) # Brief grace period + + # Cleanup database connections + logger.info("Closing database connections...") + try: + # Close any open database sessions/pools + logger.info("Database connections closed successfully") + except Exception as e: + logger.warning(f"Error closing database connections: {e}") + + # Cleanup rate limiting state + logger.info("Cleaning up rate limiting state...") + + # Cleanup audit resources + logger.info("Cleaning up audit resources...") + + # Log shutdown metrics + logger.info("=== Coordinator API Shutdown Summary ===") + logger.info("All resources cleaned up successfully") + logger.info("Graceful shutdown completed") + logger.info("=== Shutdown Complete ===") + + except Exception as e: + logger.error(f"Error during shutdown: {e}") + # Continue shutdown even if cleanup fails def create_app() -> FastAPI: + # Initialize rate limiter + limiter = Limiter(key_func=get_remote_address) + app = FastAPI( title="AITBC Coordinator API", - version="0.1.0", - description="Stage 1 coordinator service handling job orchestration between clients and miners.", + description="API for coordinating AI training jobs and blockchain operations", + version="1.0.0", + docs_url="/docs", + redoc_url="/redoc", + lifespan=lifespan, + openapi_components={ + "securitySchemes": { + "ApiKeyAuth": { + "type": "apiKey", + "in": "header", + "name": "X-Api-Key" + } + } + }, + openapi_tags=[ + {"name": "health", "description": "Health check endpoints"}, + {"name": "client", "description": "Client operations"}, + {"name": "miner", "description": "Miner operations"}, + {"name": "admin", "description": "Admin operations"}, + {"name": "marketplace", "description": "GPU Marketplace"}, + {"name": "exchange", "description": "Exchange operations"}, + {"name": "governance", "description": "Governance operations"}, + {"name": "zk", "description": "Zero-Knowledge proofs"}, + ] ) - # Create database tables - init_db() + # API Key middleware (if configured) - DISABLED in favor of dependency injection + # required_key = os.getenv("COORDINATOR_API_KEY") + # if required_key: + # @app.middleware("http") + # async def api_key_middleware(request: Request, call_next): + # # Health endpoints are exempt + # if request.url.path in ("/health", "/v1/health", "/health/live", "/health/ready", "/metrics", "/rate-limit-metrics"): + # return await call_next(request) + # provided = request.headers.get("X-Api-Key") + # if provided != required_key: + # return JSONResponse( + # status_code=401, + # content={"detail": "Invalid or missing API key"} + # ) + # return await call_next(request) + + app.state.limiter = limiter + app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) + + # Create database tables (now handled in lifespan) + # init_db() app.add_middleware( CORSMiddleware, @@ -41,30 +278,238 @@ def create_app() -> FastAPI: allow_headers=["*"] # Allow all headers for API keys and content types ) + # Enable all routers with OpenAPI disabled app.include_router(client, prefix="/v1") app.include_router(miner, prefix="/v1") app.include_router(admin, prefix="/v1") app.include_router(marketplace, prefix="/v1") app.include_router(marketplace_gpu, prefix="/v1") - app.include_router(exchange, prefix="/v1") - app.include_router(users, prefix="/v1/users") - app.include_router(services, prefix="/v1") - app.include_router(payments, prefix="/v1") - app.include_router(marketplace_offers, prefix="/v1") - app.include_router(zk_applications.router, prefix="/v1") - app.include_router(governance, prefix="/v1") - app.include_router(partners, prefix="/v1") app.include_router(explorer, prefix="/v1") + app.include_router(services, prefix="/v1") + app.include_router(users, prefix="/v1") + app.include_router(exchange, prefix="/v1") + app.include_router(payments, prefix="/v1") + app.include_router(web_vitals, prefix="/v1") + app.include_router(edge_gpu) + + # Add standalone routers for tasks and payments + app.include_router(marketplace_gpu, prefix="/v1") + + if ml_zk_proofs: + app.include_router(ml_zk_proofs) + app.include_router(marketplace_enhanced, prefix="/v1") + app.include_router(openclaw_enhanced, prefix="/v1") + app.include_router(monitoring_dashboard, prefix="/v1") + app.include_router(agent_router.router, prefix="/v1/agents") + app.include_router(agent_identity, prefix="/v1") + app.include_router(global_marketplace, prefix="/v1") + app.include_router(cross_chain_integration, prefix="/v1") + app.include_router(global_marketplace_integration, prefix="/v1") + app.include_router(developer_platform, prefix="/v1") + app.include_router(governance_enhanced, prefix="/v1") + + # Include marketplace_offers AFTER global_marketplace to override the /offers endpoint + app.include_router(marketplace_offers, prefix="/v1") + + # Add blockchain router for CLI compatibility + print(f"Adding blockchain router: {blockchain}") + app.include_router(blockchain, prefix="/v1") + print("Blockchain router added successfully") # Add Prometheus metrics endpoint metrics_app = make_asgi_app() app.mount("/metrics", metrics_app) + # Add Prometheus metrics for rate limiting + rate_limit_registry = CollectorRegistry() + rate_limit_hits_total = Counter( + 'rate_limit_hits_total', + 'Total number of rate limit violations', + ['endpoint', 'method', 'limit'], + registry=rate_limit_registry + ) + rate_limit_response_time = Histogram( + 'rate_limit_response_time_seconds', + 'Response time for rate limited requests', + ['endpoint', 'method'], + registry=rate_limit_registry + ) + + @app.exception_handler(RateLimitExceeded) + async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> JSONResponse: + """Handle rate limit exceeded errors with proper 429 status.""" + request_id = request.headers.get("X-Request-ID") + + # Record rate limit hit metrics + endpoint = request.url.path + method = request.method + limit_detail = str(exc.detail) if hasattr(exc, 'detail') else 'unknown' + + rate_limit_hits_total.labels( + endpoint=endpoint, + method=method, + limit=limit_detail + ).inc() + + logger.warning(f"Rate limit exceeded: {exc}", extra={ + "request_id": request_id, + "path": request.url.path, + "method": request.method, + "rate_limit_detail": limit_detail + }) + + error_response = ErrorResponse( + error={ + "code": "RATE_LIMIT_EXCEEDED", + "message": "Too many requests. Please try again later.", + "status": 429, + "details": [{ + "field": "rate_limit", + "message": str(exc.detail), + "code": "too_many_requests", + "retry_after": 60 # Default retry after 60 seconds + }] + }, + request_id=request_id + ) + return JSONResponse( + status_code=429, + content=error_response.model_dump(), + headers={"Retry-After": "60"} + ) + + @app.get("/rate-limit-metrics") + async def rate_limit_metrics(): + """Rate limiting metrics endpoint.""" + return Response( + content=generate_latest(rate_limit_registry), + media_type=CONTENT_TYPE_LATEST + ) + + @app.exception_handler(Exception) + async def general_exception_handler(request: Request, exc: Exception) -> JSONResponse: + """Handle all unhandled exceptions with structured error responses.""" + request_id = request.headers.get("X-Request-ID") + logger.error(f"Unhandled exception: {exc}", extra={ + "request_id": request_id, + "path": request.url.path, + "method": request.method, + "error_type": type(exc).__name__ + }) + + error_response = ErrorResponse( + error={ + "code": "INTERNAL_SERVER_ERROR", + "message": "An unexpected error occurred", + "status": 500, + "details": [{ + "field": "internal", + "message": str(exc), + "code": type(exc).__name__ + }] + }, + request_id=request_id + ) + return JSONResponse( + status_code=500, + content=error_response.model_dump() + ) + + @app.exception_handler(AITBCError) + async def aitbc_error_handler(request: Request, exc: AITBCError) -> JSONResponse: + """Handle AITBC exceptions with structured error responses.""" + request_id = request.headers.get("X-Request-ID") + response = exc.to_response(request_id) + return JSONResponse( + status_code=response.error["status"], + content=response.model_dump() + ) + + @app.exception_handler(RequestValidationError) + async def validation_error_handler(request: Request, exc: RequestValidationError) -> JSONResponse: + """Handle FastAPI validation errors with structured error responses.""" + request_id = request.headers.get("X-Request-ID") + logger.warning(f"Validation error: {exc}", extra={ + "request_id": request_id, + "path": request.url.path, + "method": request.method, + "validation_errors": exc.errors() + }) + + details = [] + for error in exc.errors(): + details.append({ + "field": ".".join(str(loc) for loc in error["loc"]), + "message": error["msg"], + "code": error["type"] + }) + + error_response = ErrorResponse( + error={ + "code": "VALIDATION_ERROR", + "message": "Request validation failed", + "status": 422, + "details": details + }, + request_id=request_id + ) + return JSONResponse( + status_code=422, + content=error_response.model_dump() + ) + + @app.get("/health", tags=["health"], summary="Root health endpoint for CLI compatibility") + async def root_health() -> dict[str, str]: + import sys + return { + "status": "ok", + "env": settings.app_env, + "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + } + @app.get("/v1/health", tags=["health"], summary="Service healthcheck") async def health() -> dict[str, str]: - return {"status": "ok", "env": settings.app_env} + import sys + return { + "status": "ok", + "env": settings.app_env, + "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + } + + @app.get("/health/live", tags=["health"], summary="Liveness probe") + async def liveness() -> dict[str, str]: + import sys + return { + "status": "alive", + "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + } + + @app.get("/health/ready", tags=["health"], summary="Readiness probe") + async def readiness() -> dict[str, str]: + # Check database connectivity + try: + from .storage import get_engine + engine = get_engine() + with engine.connect() as conn: + conn.execute("SELECT 1") + import sys + return { + "status": "ready", + "database": "connected", + "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + } + except Exception as e: + logger.error("Readiness check failed", extra={"error": str(e)}) + return JSONResponse( + status_code=503, + content={"status": "not ready", "error": str(e)} + ) return app app = create_app() + +# Register jobs router (disabled - legacy) +# from .routers import jobs as jobs_router +# app.include_router(jobs_router.router)