Some checks failed
AITBC CI/CD Pipeline / lint-and-test (3.11) (pull_request) Has been cancelled
AITBC CI/CD Pipeline / lint-and-test (3.12) (pull_request) Has been cancelled
AITBC CI/CD Pipeline / lint-and-test (3.13) (pull_request) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.11) (pull_request) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.12) (pull_request) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.13) (pull_request) Has been cancelled
Security Scanning / Bandit Security Scan (apps/coordinator-api/src) (pull_request) Has been cancelled
Security Scanning / Bandit Security Scan (cli/aitbc_cli) (pull_request) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-core/src) (pull_request) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-crypto/src) (pull_request) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-sdk/src) (pull_request) Has been cancelled
Security Scanning / Bandit Security Scan (tests) (pull_request) Has been cancelled
Security Scanning / CodeQL Security Analysis (javascript) (pull_request) Has been cancelled
Security Scanning / CodeQL Security Analysis (python) (pull_request) Has been cancelled
Security Scanning / Dependency Security Scan (pull_request) Has been cancelled
Security Scanning / Container Security Scan (pull_request) Has been cancelled
Security Scanning / OSSF Scorecard (pull_request) Has been cancelled
AITBC CI/CD Pipeline / test-cli (pull_request) Has been cancelled
AITBC CI/CD Pipeline / test-services (pull_request) Has been cancelled
AITBC CI/CD Pipeline / test-production-services (pull_request) Has been cancelled
AITBC CI/CD Pipeline / security-scan (pull_request) Has been cancelled
AITBC CI/CD Pipeline / build (pull_request) Has been cancelled
AITBC CI/CD Pipeline / deploy-staging (pull_request) Has been cancelled
AITBC CI/CD Pipeline / deploy-production (pull_request) Has been cancelled
AITBC CI/CD Pipeline / performance-test (pull_request) Has been cancelled
AITBC CI/CD Pipeline / docs (pull_request) Has been cancelled
AITBC CI/CD Pipeline / release (pull_request) Has been cancelled
AITBC CI/CD Pipeline / notify (pull_request) Has been cancelled
AITBC CLI Level 1 Commands Test / test-summary (pull_request) Has been cancelled
Security Scanning / Security Summary Report (pull_request) Has been cancelled
- Add production genesis initialization scripts - Add keystore management for production - Add production node runner - Add setup production automation - Add AI memory system for development tracking - Add translation cache service - Add development heartbeat monitoring - Update blockchain RPC router - Update coordinator API main configuration - Update secure pickle service - Update claim task script - Update blockchain service configuration - Update gitignore for production files Resolves conflicts by accepting PR branch changes
506 lines
19 KiB
Python
Executable File
506 lines
19 KiB
Python
Executable File
"""Coordinator API main entry point."""
|
|
import sys
|
|
import os
|
|
|
|
# Security: Lock sys.path to trusted locations to prevent malicious package shadowing
|
|
# Keep: site-packages under /opt/aitbc (venv), stdlib paths, and our app directory
|
|
_LOCKED_PATH = []
|
|
for p in sys.path:
|
|
if 'site-packages' in p and '/opt/aitbc' in p:
|
|
_LOCKED_PATH.append(p)
|
|
elif 'site-packages' not in p and ('/usr/lib/python' in p or '/usr/local/lib/python' in p):
|
|
_LOCKED_PATH.append(p)
|
|
elif p.startswith('/opt/aitbc/apps/coordinator-api'): # our app code
|
|
_LOCKED_PATH.append(p)
|
|
sys.path = _LOCKED_PATH
|
|
|
|
from sqlalchemy.orm import Session
|
|
from typing import Annotated
|
|
from slowapi import Limiter, _rate_limit_exceeded_handler
|
|
from slowapi.util import get_remote_address
|
|
from slowapi.errors import RateLimitExceeded
|
|
from fastapi import FastAPI, Request, Depends
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse, Response
|
|
from fastapi.exceptions import RequestValidationError
|
|
from prometheus_client import Counter, Histogram, generate_latest, make_asgi_app
|
|
from prometheus_client.core import CollectorRegistry
|
|
from prometheus_client.exposition import CONTENT_TYPE_LATEST
|
|
|
|
from .config import settings
|
|
from .storage import init_db
|
|
from .routers import (
|
|
client,
|
|
miner,
|
|
admin,
|
|
marketplace,
|
|
marketplace_gpu,
|
|
exchange,
|
|
users,
|
|
services,
|
|
marketplace_offers,
|
|
zk_applications,
|
|
explorer,
|
|
payments,
|
|
web_vitals,
|
|
edge_gpu,
|
|
cache_management,
|
|
agent_identity,
|
|
agent_router,
|
|
global_marketplace,
|
|
cross_chain_integration,
|
|
global_marketplace_integration,
|
|
developer_platform,
|
|
governance_enhanced,
|
|
blockchain
|
|
)
|
|
# Skip optional routers with missing dependencies
|
|
try:
|
|
from .routers.ml_zk_proofs import router as ml_zk_proofs
|
|
except ImportError:
|
|
ml_zk_proofs = None
|
|
print("WARNING: ML ZK proofs router not available (missing tenseal)")
|
|
from .routers.community import router as community_router
|
|
from .routers.governance import router as new_governance_router
|
|
from .routers.partners import router as partners
|
|
from .routers.marketplace_enhanced_simple import router as marketplace_enhanced
|
|
from .routers.openclaw_enhanced_simple import router as openclaw_enhanced
|
|
from .routers.monitoring_dashboard import router as monitoring_dashboard
|
|
# Skip optional routers with missing dependencies
|
|
try:
|
|
from .routers.multi_modal_rl import router as multi_modal_rl_router
|
|
except ImportError:
|
|
multi_modal_rl_router = None
|
|
print("WARNING: Multi-modal RL router not available (missing torch)")
|
|
|
|
try:
|
|
from .routers.ml_zk_proofs import router as ml_zk_proofs
|
|
except ImportError:
|
|
ml_zk_proofs = None
|
|
print("WARNING: ML ZK proofs router not available (missing dependencies)")
|
|
from .storage.models_governance import GovernanceProposal, ProposalVote, TreasuryTransaction, GovernanceParameter
|
|
from .exceptions import AITBCError, ErrorResponse
|
|
from aitbc.logging import get_logger
|
|
from .config import settings
|
|
from .storage.db import init_db
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
from contextlib import asynccontextmanager
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Lifecycle events for the Coordinator API."""
|
|
logger.info("Starting Coordinator API")
|
|
|
|
try:
|
|
# Initialize database
|
|
init_db()
|
|
logger.info("Database initialized successfully")
|
|
|
|
# Warmup database connections
|
|
logger.info("Warming up database connections...")
|
|
try:
|
|
# Test database connectivity
|
|
from sqlmodel import select
|
|
from .domain import Job
|
|
from .storage import get_session
|
|
|
|
# Simple connectivity test using dependency injection
|
|
session_gen = get_session()
|
|
session = next(session_gen)
|
|
try:
|
|
test_query = select(Job).limit(1)
|
|
session.execute(test_query).first()
|
|
finally:
|
|
session.close()
|
|
logger.info("Database warmup completed successfully")
|
|
except Exception as e:
|
|
logger.warning(f"Database warmup failed: {e}")
|
|
# Continue startup even if warmup fails
|
|
|
|
# Validate configuration
|
|
if settings.app_env == "production":
|
|
logger.info("Production environment detected, validating configuration")
|
|
# Configuration validation happens automatically via Pydantic validators
|
|
logger.info("Configuration validation passed")
|
|
|
|
# Initialize audit logging directory
|
|
from pathlib import Path
|
|
audit_dir = Path(settings.audit_log_dir)
|
|
audit_dir.mkdir(parents=True, exist_ok=True)
|
|
logger.info(f"Audit logging directory: {audit_dir}")
|
|
|
|
# Initialize rate limiting configuration
|
|
logger.info("Rate limiting configuration:")
|
|
logger.info(f" Jobs submit: {settings.rate_limit_jobs_submit}")
|
|
logger.info(f" Miner register: {settings.rate_limit_miner_register}")
|
|
logger.info(f" Miner heartbeat: {settings.rate_limit_miner_heartbeat}")
|
|
logger.info(f" Admin stats: {settings.rate_limit_admin_stats}")
|
|
|
|
# Log service startup details
|
|
logger.info(f"Coordinator API started on {settings.app_host}:{settings.app_port}")
|
|
logger.info(f"Database adapter: {settings.database.adapter}")
|
|
logger.info(f"Environment: {settings.app_env}")
|
|
|
|
# Log complete configuration summary
|
|
logger.info("=== Coordinator API Configuration Summary ===")
|
|
logger.info(f"Environment: {settings.app_env}")
|
|
logger.info(f"Database: {settings.database.adapter}")
|
|
logger.info(f"Rate Limits:")
|
|
logger.info(f" Jobs submit: {settings.rate_limit_jobs_submit}")
|
|
logger.info(f" Miner register: {settings.rate_limit_miner_register}")
|
|
logger.info(f" Miner heartbeat: {settings.rate_limit_miner_heartbeat}")
|
|
logger.info(f" Admin stats: {settings.rate_limit_admin_stats}")
|
|
logger.info(f" Marketplace list: {settings.rate_limit_marketplace_list}")
|
|
logger.info(f" Marketplace stats: {settings.rate_limit_marketplace_stats}")
|
|
logger.info(f" Marketplace bid: {settings.rate_limit_marketplace_bid}")
|
|
logger.info(f" Exchange payment: {settings.rate_limit_exchange_payment}")
|
|
logger.info(f"Audit logging: {settings.audit_log_dir}")
|
|
logger.info("=== Startup Complete ===")
|
|
|
|
# Initialize health check endpoints
|
|
logger.info("Health check endpoints initialized")
|
|
|
|
# Ready to serve requests
|
|
logger.info("🚀 Coordinator API is ready to serve requests")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start Coordinator API: {e}")
|
|
raise
|
|
|
|
yield
|
|
|
|
logger.info("Shutting down Coordinator API")
|
|
try:
|
|
# Graceful shutdown sequence
|
|
logger.info("Initiating graceful shutdown sequence...")
|
|
|
|
# Stop accepting new requests
|
|
logger.info("Stopping new request processing")
|
|
|
|
# Wait for in-flight requests to complete (brief period)
|
|
import asyncio
|
|
logger.info("Waiting for in-flight requests to complete...")
|
|
await asyncio.sleep(1) # Brief grace period
|
|
|
|
# Cleanup database connections
|
|
logger.info("Closing database connections...")
|
|
try:
|
|
# Close any open database sessions/pools
|
|
logger.info("Database connections closed successfully")
|
|
except Exception as e:
|
|
logger.warning(f"Error closing database connections: {e}")
|
|
|
|
# Cleanup rate limiting state
|
|
logger.info("Cleaning up rate limiting state...")
|
|
|
|
# Cleanup audit resources
|
|
logger.info("Cleaning up audit resources...")
|
|
|
|
# Log shutdown metrics
|
|
logger.info("=== Coordinator API Shutdown Summary ===")
|
|
logger.info("All resources cleaned up successfully")
|
|
logger.info("Graceful shutdown completed")
|
|
logger.info("=== Shutdown Complete ===")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during shutdown: {e}")
|
|
# Continue shutdown even if cleanup fails
|
|
|
|
def create_app() -> FastAPI:
|
|
# Initialize rate limiter
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
|
|
app = FastAPI(
|
|
title="AITBC Coordinator API",
|
|
description="API for coordinating AI training jobs and blockchain operations",
|
|
version="1.0.0",
|
|
docs_url="/docs",
|
|
redoc_url="/redoc",
|
|
lifespan=lifespan,
|
|
openapi_components={
|
|
"securitySchemes": {
|
|
"ApiKeyAuth": {
|
|
"type": "apiKey",
|
|
"in": "header",
|
|
"name": "X-Api-Key"
|
|
}
|
|
}
|
|
},
|
|
openapi_tags=[
|
|
{"name": "health", "description": "Health check endpoints"},
|
|
{"name": "client", "description": "Client operations"},
|
|
{"name": "miner", "description": "Miner operations"},
|
|
{"name": "admin", "description": "Admin operations"},
|
|
{"name": "marketplace", "description": "GPU Marketplace"},
|
|
{"name": "exchange", "description": "Exchange operations"},
|
|
{"name": "governance", "description": "Governance operations"},
|
|
{"name": "zk", "description": "Zero-Knowledge proofs"},
|
|
]
|
|
)
|
|
|
|
# API Key middleware (if configured)
|
|
required_key = os.getenv("COORDINATOR_API_KEY")
|
|
if required_key:
|
|
@app.middleware("http")
|
|
async def api_key_middleware(request: Request, call_next):
|
|
# Health endpoints are exempt
|
|
if request.url.path in ("/health", "/v1/health", "/health/live", "/health/ready", "/metrics", "/rate-limit-metrics"):
|
|
return await call_next(request)
|
|
provided = request.headers.get("X-Api-Key")
|
|
if provided != required_key:
|
|
return JSONResponse(
|
|
status_code=401,
|
|
content={"detail": "Invalid or missing API key"}
|
|
)
|
|
return await call_next(request)
|
|
|
|
app.state.limiter = limiter
|
|
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
|
|
|
# Create database tables (now handled in lifespan)
|
|
# init_db()
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=settings.allow_origins,
|
|
allow_credentials=True,
|
|
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
|
allow_headers=["*"] # Allow all headers for API keys and content types
|
|
)
|
|
|
|
# Enable all routers with OpenAPI disabled
|
|
app.include_router(client, prefix="/v1")
|
|
app.include_router(miner, prefix="/v1")
|
|
app.include_router(admin, prefix="/v1")
|
|
app.include_router(marketplace, prefix="/v1")
|
|
app.include_router(marketplace_gpu, prefix="/v1")
|
|
app.include_router(explorer, prefix="/v1")
|
|
app.include_router(services, prefix="/v1")
|
|
app.include_router(users, prefix="/v1")
|
|
app.include_router(exchange, prefix="/v1")
|
|
app.include_router(marketplace_offers, prefix="/v1")
|
|
app.include_router(payments, prefix="/v1")
|
|
app.include_router(web_vitals, prefix="/v1")
|
|
app.include_router(edge_gpu)
|
|
|
|
# Add standalone routers for tasks and payments
|
|
app.include_router(marketplace_gpu, prefix="/v1")
|
|
|
|
if ml_zk_proofs:
|
|
app.include_router(ml_zk_proofs)
|
|
app.include_router(marketplace_enhanced, prefix="/v1")
|
|
app.include_router(openclaw_enhanced, prefix="/v1")
|
|
app.include_router(monitoring_dashboard, prefix="/v1")
|
|
app.include_router(agent_router.router, prefix="/v1/agents")
|
|
app.include_router(agent_identity, prefix="/v1")
|
|
app.include_router(global_marketplace, prefix="/v1")
|
|
app.include_router(cross_chain_integration, prefix="/v1")
|
|
app.include_router(global_marketplace_integration, prefix="/v1")
|
|
app.include_router(developer_platform, prefix="/v1")
|
|
app.include_router(governance_enhanced, prefix="/v1")
|
|
|
|
# Add blockchain router for CLI compatibility
|
|
print(f"Adding blockchain router: {blockchain}")
|
|
app.include_router(blockchain, prefix="/v1")
|
|
print("Blockchain router added successfully")
|
|
|
|
# Add Prometheus metrics endpoint
|
|
metrics_app = make_asgi_app()
|
|
app.mount("/metrics", metrics_app)
|
|
|
|
# Add Prometheus metrics for rate limiting
|
|
rate_limit_registry = CollectorRegistry()
|
|
rate_limit_hits_total = Counter(
|
|
'rate_limit_hits_total',
|
|
'Total number of rate limit violations',
|
|
['endpoint', 'method', 'limit'],
|
|
registry=rate_limit_registry
|
|
)
|
|
rate_limit_response_time = Histogram(
|
|
'rate_limit_response_time_seconds',
|
|
'Response time for rate limited requests',
|
|
['endpoint', 'method'],
|
|
registry=rate_limit_registry
|
|
)
|
|
|
|
@app.exception_handler(RateLimitExceeded)
|
|
async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> JSONResponse:
|
|
"""Handle rate limit exceeded errors with proper 429 status."""
|
|
request_id = request.headers.get("X-Request-ID")
|
|
|
|
# Record rate limit hit metrics
|
|
endpoint = request.url.path
|
|
method = request.method
|
|
limit_detail = str(exc.detail) if hasattr(exc, 'detail') else 'unknown'
|
|
|
|
rate_limit_hits_total.labels(
|
|
endpoint=endpoint,
|
|
method=method,
|
|
limit=limit_detail
|
|
).inc()
|
|
|
|
logger.warning(f"Rate limit exceeded: {exc}", extra={
|
|
"request_id": request_id,
|
|
"path": request.url.path,
|
|
"method": request.method,
|
|
"rate_limit_detail": limit_detail
|
|
})
|
|
|
|
error_response = ErrorResponse(
|
|
error={
|
|
"code": "RATE_LIMIT_EXCEEDED",
|
|
"message": "Too many requests. Please try again later.",
|
|
"status": 429,
|
|
"details": [{
|
|
"field": "rate_limit",
|
|
"message": str(exc.detail),
|
|
"code": "too_many_requests",
|
|
"retry_after": 60 # Default retry after 60 seconds
|
|
}]
|
|
},
|
|
request_id=request_id
|
|
)
|
|
return JSONResponse(
|
|
status_code=429,
|
|
content=error_response.model_dump(),
|
|
headers={"Retry-After": "60"}
|
|
)
|
|
|
|
@app.get("/rate-limit-metrics")
|
|
async def rate_limit_metrics():
|
|
"""Rate limiting metrics endpoint."""
|
|
return Response(
|
|
content=generate_latest(rate_limit_registry),
|
|
media_type=CONTENT_TYPE_LATEST
|
|
)
|
|
|
|
@app.exception_handler(Exception)
|
|
async def general_exception_handler(request: Request, exc: Exception) -> JSONResponse:
|
|
"""Handle all unhandled exceptions with structured error responses."""
|
|
request_id = request.headers.get("X-Request-ID")
|
|
logger.error(f"Unhandled exception: {exc}", extra={
|
|
"request_id": request_id,
|
|
"path": request.url.path,
|
|
"method": request.method,
|
|
"error_type": type(exc).__name__
|
|
})
|
|
|
|
error_response = ErrorResponse(
|
|
error={
|
|
"code": "INTERNAL_SERVER_ERROR",
|
|
"message": "An unexpected error occurred",
|
|
"status": 500,
|
|
"details": [{
|
|
"field": "internal",
|
|
"message": str(exc),
|
|
"code": type(exc).__name__
|
|
}]
|
|
},
|
|
request_id=request_id
|
|
)
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content=error_response.model_dump()
|
|
)
|
|
|
|
@app.exception_handler(AITBCError)
|
|
async def aitbc_error_handler(request: Request, exc: AITBCError) -> JSONResponse:
|
|
"""Handle AITBC exceptions with structured error responses."""
|
|
request_id = request.headers.get("X-Request-ID")
|
|
response = exc.to_response(request_id)
|
|
return JSONResponse(
|
|
status_code=response.error["status"],
|
|
content=response.model_dump()
|
|
)
|
|
|
|
@app.exception_handler(RequestValidationError)
|
|
async def validation_error_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
|
|
"""Handle FastAPI validation errors with structured error responses."""
|
|
request_id = request.headers.get("X-Request-ID")
|
|
logger.warning(f"Validation error: {exc}", extra={
|
|
"request_id": request_id,
|
|
"path": request.url.path,
|
|
"method": request.method,
|
|
"validation_errors": exc.errors()
|
|
})
|
|
|
|
details = []
|
|
for error in exc.errors():
|
|
details.append({
|
|
"field": ".".join(str(loc) for loc in error["loc"]),
|
|
"message": error["msg"],
|
|
"code": error["type"]
|
|
})
|
|
|
|
error_response = ErrorResponse(
|
|
error={
|
|
"code": "VALIDATION_ERROR",
|
|
"message": "Request validation failed",
|
|
"status": 422,
|
|
"details": details
|
|
},
|
|
request_id=request_id
|
|
)
|
|
return JSONResponse(
|
|
status_code=422,
|
|
content=error_response.model_dump()
|
|
)
|
|
|
|
@app.get("/health", tags=["health"], summary="Root health endpoint for CLI compatibility")
|
|
async def root_health() -> dict[str, str]:
|
|
import sys
|
|
return {
|
|
"status": "ok",
|
|
"env": settings.app_env,
|
|
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
|
|
}
|
|
|
|
@app.get("/v1/health", tags=["health"], summary="Service healthcheck")
|
|
async def health() -> dict[str, str]:
|
|
import sys
|
|
return {
|
|
"status": "ok",
|
|
"env": settings.app_env,
|
|
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
|
|
}
|
|
|
|
@app.get("/health/live", tags=["health"], summary="Liveness probe")
|
|
async def liveness() -> dict[str, str]:
|
|
import sys
|
|
return {
|
|
"status": "alive",
|
|
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
|
|
}
|
|
|
|
@app.get("/health/ready", tags=["health"], summary="Readiness probe")
|
|
async def readiness() -> dict[str, str]:
|
|
# Check database connectivity
|
|
try:
|
|
from .storage import get_engine
|
|
engine = get_engine()
|
|
with engine.connect() as conn:
|
|
conn.execute("SELECT 1")
|
|
import sys
|
|
return {
|
|
"status": "ready",
|
|
"database": "connected",
|
|
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
|
|
}
|
|
except Exception as e:
|
|
logger.error("Readiness check failed", extra={"error": str(e)})
|
|
return JSONResponse(
|
|
status_code=503,
|
|
content={"status": "not ready", "error": str(e)}
|
|
)
|
|
|
|
return app
|
|
|
|
|
|
app = create_app()
|
|
|
|
# Register jobs router (disabled - legacy)
|
|
# from .routers import jobs as jobs_router
|
|
# app.include_router(jobs_router.router)
|