From 07f988bc1f4cedd672ba075718ab4069bacdd002 Mon Sep 17 00:00:00 2001 From: aitbc Date: Sat, 2 May 2026 16:17:15 +0200 Subject: [PATCH] fix: address gateway resilience and readiness review items --- apps/ai-service/src/ai_service/domain/jobs.py | 2 +- apps/api-gateway/src/api_gateway/main.py | 69 ++++++++++--------- .../src/governance_service/main.py | 6 +- apps/gpu-service/src/gpu_service/main.py | 6 +- .../src/marketplace_service/main.py | 6 +- .../src/monitoring_service/main.py | 10 +-- .../src/trading_service/main.py | 6 +- 7 files changed, 64 insertions(+), 41 deletions(-) diff --git a/apps/ai-service/src/ai_service/domain/jobs.py b/apps/ai-service/src/ai_service/domain/jobs.py index d5303a48..6b4f8dcf 100644 --- a/apps/ai-service/src/ai_service/domain/jobs.py +++ b/apps/ai-service/src/ai_service/domain/jobs.py @@ -40,7 +40,7 @@ class Job(SQLModel, table=True): payment_status: str = Field(default="none", index=True) # Timestamps - created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False, index=True) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=False, index=True) requested_at: datetime | None = Field(default=None) started_at: datetime | None = Field(default=None) completed_at: datetime | None = Field(default=None) diff --git a/apps/api-gateway/src/api_gateway/main.py b/apps/api-gateway/src/api_gateway/main.py index dde4cfc9..3116b456 100644 --- a/apps/api-gateway/src/api_gateway/main.py +++ b/apps/api-gateway/src/api_gateway/main.py @@ -4,6 +4,7 @@ Routes requests to microservices """ import os +import hmac import httpx import asyncio from contextlib import asynccontextmanager @@ -42,6 +43,12 @@ if SLOWAPI_AVAILABLE: else: limiter = None + +def rate_limit(limit: str): + if limiter is None: + return lambda func: func + return limiter.limit(limit) + # Authentication setup security = HTTPBearer(auto_error=False) API_KEY = os.getenv("API_GATEWAY_KEY", "") @@ -145,7 +152,7 @@ def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)) - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="API gateway key not configured", ) - if credentials.credentials != API_KEY: + if not hmac.compare_digest(credentials.credentials, API_KEY): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid authentication credentials", @@ -154,35 +161,40 @@ def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)) - # Circuit breaker state circuit_breaker_state = { - "failures": 0, - "last_failure_time": None, - "is_open": False, + name: { + "failures": 0, + "last_failure_time": None, + "is_open": False, + } + for name in SERVICES } CIRCUIT_BREAKER_THRESHOLD = 5 CIRCUIT_BREAKER_TIMEOUT = 60 # seconds -def check_circuit_breaker() -> bool: +def check_circuit_breaker(service_name: str) -> bool: """Check if circuit breaker is open.""" - if circuit_breaker_state["is_open"]: + state = circuit_breaker_state[service_name] + if state["is_open"]: # Check if timeout has elapsed - if (asyncio.get_event_loop().time() - circuit_breaker_state["last_failure_time"]) > CIRCUIT_BREAKER_TIMEOUT: - circuit_breaker_state["is_open"] = False - circuit_breaker_state["failures"] = 0 - logger.info("Circuit breaker reset") + if (asyncio.get_event_loop().time() - state["last_failure_time"]) > CIRCUIT_BREAKER_TIMEOUT: + state["is_open"] = False + state["failures"] = 0 + logger.info("Circuit breaker reset", service=service_name) return True return False return True -def record_failure(): +def record_failure(service_name: str): """Record a failure for circuit breaker.""" - circuit_breaker_state["failures"] += 1 - circuit_breaker_state["last_failure_time"] = asyncio.get_event_loop().time() - if circuit_breaker_state["failures"] >= CIRCUIT_BREAKER_THRESHOLD: - circuit_breaker_state["is_open"] = True - logger.warning("Circuit breaker opened") + state = circuit_breaker_state[service_name] + state["failures"] += 1 + state["last_failure_time"] = asyncio.get_event_loop().time() + if state["failures"] >= CIRCUIT_BREAKER_THRESHOLD: + state["is_open"] = True + logger.warning("Circuit breaker opened", service=service_name) @app.get("/health") @@ -234,23 +246,13 @@ async def proxy_with_retry(client, method, url, **kwargs): @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]) +@rate_limit("100/minute") async def proxy_request( path: str, request: Request, authenticated: bool = Depends(verify_auth), ) -> Response: """Proxy request to appropriate microservice with rate limiting and circuit breaker.""" - # Apply rate limiting if available - if SLOWAPI_AVAILABLE: - # Rate limit: 100 requests per minute per IP - limiter.limit("100/minute")(lambda: None)() - - # Check circuit breaker - if not check_circuit_breaker(): - return JSONResponse( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - content={"error": "Circuit breaker is open, service temporarily unavailable"}, - ) # Determine which service should handle the request service_name = None for name, config in SERVICES.items(): @@ -262,6 +264,13 @@ async def proxy_request( # Default to coordinator-api for unknown paths service_name = "coordinator" + # Check circuit breaker + if not check_circuit_breaker(service_name): + return JSONResponse( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + content={"error": f"Circuit breaker is open for {service_name}, service temporarily unavailable"}, + ) + service_config = SERVICES[service_name] # Build target URL @@ -271,8 +280,6 @@ async def proxy_request( target_path = path[len(prefix):].lstrip("/") target_url = f"{service_config['base_url']}/{target_path}" - if request.url.query: - target_url += f"?{request.url.query}" # Proxy the request using pooled HTTP client with retry logic client = app.state.http_client @@ -304,7 +311,7 @@ async def proxy_request( target_url=target_url, error=str(e), ) - record_failure() + record_failure(service_name) return JSONResponse( status_code=503, content={ @@ -321,7 +328,7 @@ async def proxy_request( service=service_name, error=str(e), ) - record_failure() + record_failure(service_name) return JSONResponse( status_code=500, content={ diff --git a/apps/governance-service/src/governance_service/main.py b/apps/governance-service/src/governance_service/main.py index cb725a78..a5218ba3 100644 --- a/apps/governance-service/src/governance_service/main.py +++ b/apps/governance-service/src/governance_service/main.py @@ -7,6 +7,7 @@ from contextlib import asynccontextmanager from typing import AsyncIterator from fastapi import FastAPI, Depends +from fastapi.responses import JSONResponse from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession @@ -78,7 +79,10 @@ async def ready() -> dict[str, str]: return {"status": "ready", "service": "governance-service"} except Exception as e: logger.error(f"Readiness check failed: {e}") - return {"status": "not_ready", "service": "governance-service", "error": str(e)} + return JSONResponse( + status_code=503, + content={"status": "not_ready", "service": "governance-service", "error": str(e)}, + ) @app.get("/live") diff --git a/apps/gpu-service/src/gpu_service/main.py b/apps/gpu-service/src/gpu_service/main.py index e02ad70f..620061b4 100644 --- a/apps/gpu-service/src/gpu_service/main.py +++ b/apps/gpu-service/src/gpu_service/main.py @@ -7,6 +7,7 @@ from contextlib import asynccontextmanager from typing import AsyncIterator from fastapi import FastAPI, Depends +from fastapi.responses import JSONResponse from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession @@ -73,7 +74,10 @@ async def ready() -> dict[str, str]: return {"status": "ready", "service": "gpu-service"} except Exception as e: logger.error(f"Readiness check failed: {e}") - return {"status": "not_ready", "service": "gpu-service", "error": str(e)} + return JSONResponse( + status_code=503, + content={"status": "not_ready", "service": "gpu-service", "error": str(e)}, + ) @app.get("/live") diff --git a/apps/marketplace-service/src/marketplace_service/main.py b/apps/marketplace-service/src/marketplace_service/main.py index b33b56bc..223fd305 100644 --- a/apps/marketplace-service/src/marketplace_service/main.py +++ b/apps/marketplace-service/src/marketplace_service/main.py @@ -7,6 +7,7 @@ from contextlib import asynccontextmanager from typing import AsyncIterator from fastapi import FastAPI, Depends +from fastapi.responses import JSONResponse from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession @@ -79,7 +80,10 @@ async def ready() -> dict[str, str]: return {"status": "ready", "service": "marketplace-service"} except Exception as e: logger.error(f"Readiness check failed: {e}") - return {"status": "not_ready", "service": "marketplace-service", "error": str(e)} + return JSONResponse( + status_code=503, + content={"status": "not_ready", "service": "marketplace-service", "error": str(e)}, + ) @app.get("/live") diff --git a/apps/monitoring-service/src/monitoring_service/main.py b/apps/monitoring-service/src/monitoring_service/main.py index 00f4ddb5..df7d1f59 100644 --- a/apps/monitoring-service/src/monitoring_service/main.py +++ b/apps/monitoring-service/src/monitoring_service/main.py @@ -122,7 +122,7 @@ async def services_summary() -> dict[str, Any]: try: health_data = await collect_all_health_data() - summary = {"timestamp": datetime.utcnow().isoformat(), "services": {}} + summary = {"timestamp": datetime.now(timezone.utc).isoformat(), "services": {}} for service_id, service_info in SERVICES.items(): health = health_data.get(service_id, {}) @@ -138,7 +138,7 @@ async def services_summary() -> dict[str, Any]: except Exception as e: logger.error(f"Failed to generate services summary: {e}") - return {"error": "Failed to generate summary", "timestamp": datetime.utcnow().isoformat()} + return {"error": "Failed to generate summary", "timestamp": datetime.now(timezone.utc).isoformat()} @app.get("/dashboard/metrics") @@ -185,7 +185,7 @@ async def system_metrics() -> dict[str, Any]: except Exception as e: logger.error(f"Failed to collect system metrics: {e}") - return {"error": "Failed to collect metrics", "timestamp": datetime.utcnow().isoformat()} + return {"error": "Failed to collect metrics", "timestamp": datetime.now(timezone.utc).isoformat()} async def collect_all_health_data() -> dict[str, Any]: @@ -224,7 +224,7 @@ async def check_service_health(service_name: str, service_config: dict[str, Any] return { "status": "healthy", "response_time": 0.1, - "last_check": datetime.utcnow().isoformat(), + "last_check": datetime.now(timezone.utc).isoformat(), "details": response.json(), } except Exception as e: @@ -232,7 +232,7 @@ async def check_service_health(service_name: str, service_config: dict[str, Any] return { "status": "unhealthy", "error": str(e), - "last_check": datetime.utcnow().isoformat(), + "last_check": datetime.now(timezone.utc).isoformat(), } diff --git a/apps/trading-service/src/trading_service/main.py b/apps/trading-service/src/trading_service/main.py index 270711b1..855e34d3 100644 --- a/apps/trading-service/src/trading_service/main.py +++ b/apps/trading-service/src/trading_service/main.py @@ -7,6 +7,7 @@ from contextlib import asynccontextmanager from typing import AsyncIterator from fastapi import FastAPI, Depends +from fastapi.responses import JSONResponse from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession @@ -79,7 +80,10 @@ async def ready() -> dict[str, str]: return {"status": "ready", "service": "trading-service"} except Exception as e: logger.error(f"Readiness check failed: {e}") - return {"status": "not_ready", "service": "trading-service", "error": str(e)} + return JSONResponse( + status_code=503, + content={"status": "not_ready", "service": "trading-service", "error": str(e)}, + ) @app.get("/live")