fix: address gateway resilience and readiness review items
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Successful in 10s
Deploy to Testnet / deploy-testnet (push) Successful in 1m13s
Integration Tests / test-service-integration (push) Successful in 1m37s
Multi-Node Stress Testing / stress-test (push) Successful in 1s
Node Failover Simulation / failover-test (push) Failing after 3s
Python Tests / test-python (push) Successful in 20s
Security Scanning / security-scan (push) Failing after 26s
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Successful in 10s
Deploy to Testnet / deploy-testnet (push) Successful in 1m13s
Integration Tests / test-service-integration (push) Successful in 1m37s
Multi-Node Stress Testing / stress-test (push) Successful in 1s
Node Failover Simulation / failover-test (push) Failing after 3s
Python Tests / test-python (push) Successful in 20s
Security Scanning / security-scan (push) Failing after 26s
This commit is contained in:
@@ -40,7 +40,7 @@ class Job(SQLModel, table=True):
|
||||
payment_status: str = Field(default="none", index=True)
|
||||
|
||||
# Timestamps
|
||||
created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False, index=True)
|
||||
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=False, index=True)
|
||||
requested_at: datetime | None = Field(default=None)
|
||||
started_at: datetime | None = Field(default=None)
|
||||
completed_at: datetime | None = Field(default=None)
|
||||
|
||||
@@ -4,6 +4,7 @@ Routes requests to microservices
|
||||
"""
|
||||
|
||||
import os
|
||||
import hmac
|
||||
import httpx
|
||||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
@@ -42,6 +43,12 @@ if SLOWAPI_AVAILABLE:
|
||||
else:
|
||||
limiter = None
|
||||
|
||||
|
||||
def rate_limit(limit: str):
|
||||
if limiter is None:
|
||||
return lambda func: func
|
||||
return limiter.limit(limit)
|
||||
|
||||
# Authentication setup
|
||||
security = HTTPBearer(auto_error=False)
|
||||
API_KEY = os.getenv("API_GATEWAY_KEY", "")
|
||||
@@ -145,7 +152,7 @@ def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)) -
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="API gateway key not configured",
|
||||
)
|
||||
if credentials.credentials != API_KEY:
|
||||
if not hmac.compare_digest(credentials.credentials, API_KEY):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Invalid authentication credentials",
|
||||
@@ -154,35 +161,40 @@ def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)) -
|
||||
|
||||
# Circuit breaker state
|
||||
circuit_breaker_state = {
|
||||
"failures": 0,
|
||||
"last_failure_time": None,
|
||||
"is_open": False,
|
||||
name: {
|
||||
"failures": 0,
|
||||
"last_failure_time": None,
|
||||
"is_open": False,
|
||||
}
|
||||
for name in SERVICES
|
||||
}
|
||||
|
||||
CIRCUIT_BREAKER_THRESHOLD = 5
|
||||
CIRCUIT_BREAKER_TIMEOUT = 60 # seconds
|
||||
|
||||
|
||||
def check_circuit_breaker() -> bool:
|
||||
def check_circuit_breaker(service_name: str) -> bool:
|
||||
"""Check if circuit breaker is open."""
|
||||
if circuit_breaker_state["is_open"]:
|
||||
state = circuit_breaker_state[service_name]
|
||||
if state["is_open"]:
|
||||
# Check if timeout has elapsed
|
||||
if (asyncio.get_event_loop().time() - circuit_breaker_state["last_failure_time"]) > CIRCUIT_BREAKER_TIMEOUT:
|
||||
circuit_breaker_state["is_open"] = False
|
||||
circuit_breaker_state["failures"] = 0
|
||||
logger.info("Circuit breaker reset")
|
||||
if (asyncio.get_event_loop().time() - state["last_failure_time"]) > CIRCUIT_BREAKER_TIMEOUT:
|
||||
state["is_open"] = False
|
||||
state["failures"] = 0
|
||||
logger.info("Circuit breaker reset", service=service_name)
|
||||
return True
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def record_failure():
|
||||
def record_failure(service_name: str):
|
||||
"""Record a failure for circuit breaker."""
|
||||
circuit_breaker_state["failures"] += 1
|
||||
circuit_breaker_state["last_failure_time"] = asyncio.get_event_loop().time()
|
||||
if circuit_breaker_state["failures"] >= CIRCUIT_BREAKER_THRESHOLD:
|
||||
circuit_breaker_state["is_open"] = True
|
||||
logger.warning("Circuit breaker opened")
|
||||
state = circuit_breaker_state[service_name]
|
||||
state["failures"] += 1
|
||||
state["last_failure_time"] = asyncio.get_event_loop().time()
|
||||
if state["failures"] >= CIRCUIT_BREAKER_THRESHOLD:
|
||||
state["is_open"] = True
|
||||
logger.warning("Circuit breaker opened", service=service_name)
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
@@ -234,23 +246,13 @@ async def proxy_with_retry(client, method, url, **kwargs):
|
||||
|
||||
|
||||
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
|
||||
@rate_limit("100/minute")
|
||||
async def proxy_request(
|
||||
path: str,
|
||||
request: Request,
|
||||
authenticated: bool = Depends(verify_auth),
|
||||
) -> Response:
|
||||
"""Proxy request to appropriate microservice with rate limiting and circuit breaker."""
|
||||
# Apply rate limiting if available
|
||||
if SLOWAPI_AVAILABLE:
|
||||
# Rate limit: 100 requests per minute per IP
|
||||
limiter.limit("100/minute")(lambda: None)()
|
||||
|
||||
# Check circuit breaker
|
||||
if not check_circuit_breaker():
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
content={"error": "Circuit breaker is open, service temporarily unavailable"},
|
||||
)
|
||||
# Determine which service should handle the request
|
||||
service_name = None
|
||||
for name, config in SERVICES.items():
|
||||
@@ -262,6 +264,13 @@ async def proxy_request(
|
||||
# Default to coordinator-api for unknown paths
|
||||
service_name = "coordinator"
|
||||
|
||||
# Check circuit breaker
|
||||
if not check_circuit_breaker(service_name):
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
content={"error": f"Circuit breaker is open for {service_name}, service temporarily unavailable"},
|
||||
)
|
||||
|
||||
service_config = SERVICES[service_name]
|
||||
|
||||
# Build target URL
|
||||
@@ -271,8 +280,6 @@ async def proxy_request(
|
||||
target_path = path[len(prefix):].lstrip("/")
|
||||
|
||||
target_url = f"{service_config['base_url']}/{target_path}"
|
||||
if request.url.query:
|
||||
target_url += f"?{request.url.query}"
|
||||
|
||||
# Proxy the request using pooled HTTP client with retry logic
|
||||
client = app.state.http_client
|
||||
@@ -304,7 +311,7 @@ async def proxy_request(
|
||||
target_url=target_url,
|
||||
error=str(e),
|
||||
)
|
||||
record_failure()
|
||||
record_failure(service_name)
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={
|
||||
@@ -321,7 +328,7 @@ async def proxy_request(
|
||||
service=service_name,
|
||||
error=str(e),
|
||||
)
|
||||
record_failure()
|
||||
record_failure(service_name)
|
||||
return JSONResponse(
|
||||
status_code=500,
|
||||
content={
|
||||
|
||||
@@ -7,6 +7,7 @@ from contextlib import asynccontextmanager
|
||||
from typing import AsyncIterator
|
||||
|
||||
from fastapi import FastAPI, Depends
|
||||
from fastapi.responses import JSONResponse
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
@@ -78,7 +79,10 @@ async def ready() -> dict[str, str]:
|
||||
return {"status": "ready", "service": "governance-service"}
|
||||
except Exception as e:
|
||||
logger.error(f"Readiness check failed: {e}")
|
||||
return {"status": "not_ready", "service": "governance-service", "error": str(e)}
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={"status": "not_ready", "service": "governance-service", "error": str(e)},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/live")
|
||||
|
||||
@@ -7,6 +7,7 @@ from contextlib import asynccontextmanager
|
||||
from typing import AsyncIterator
|
||||
|
||||
from fastapi import FastAPI, Depends
|
||||
from fastapi.responses import JSONResponse
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
@@ -73,7 +74,10 @@ async def ready() -> dict[str, str]:
|
||||
return {"status": "ready", "service": "gpu-service"}
|
||||
except Exception as e:
|
||||
logger.error(f"Readiness check failed: {e}")
|
||||
return {"status": "not_ready", "service": "gpu-service", "error": str(e)}
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={"status": "not_ready", "service": "gpu-service", "error": str(e)},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/live")
|
||||
|
||||
@@ -7,6 +7,7 @@ from contextlib import asynccontextmanager
|
||||
from typing import AsyncIterator
|
||||
|
||||
from fastapi import FastAPI, Depends
|
||||
from fastapi.responses import JSONResponse
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
@@ -79,7 +80,10 @@ async def ready() -> dict[str, str]:
|
||||
return {"status": "ready", "service": "marketplace-service"}
|
||||
except Exception as e:
|
||||
logger.error(f"Readiness check failed: {e}")
|
||||
return {"status": "not_ready", "service": "marketplace-service", "error": str(e)}
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={"status": "not_ready", "service": "marketplace-service", "error": str(e)},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/live")
|
||||
|
||||
@@ -122,7 +122,7 @@ async def services_summary() -> dict[str, Any]:
|
||||
try:
|
||||
health_data = await collect_all_health_data()
|
||||
|
||||
summary = {"timestamp": datetime.utcnow().isoformat(), "services": {}}
|
||||
summary = {"timestamp": datetime.now(timezone.utc).isoformat(), "services": {}}
|
||||
|
||||
for service_id, service_info in SERVICES.items():
|
||||
health = health_data.get(service_id, {})
|
||||
@@ -138,7 +138,7 @@ async def services_summary() -> dict[str, Any]:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to generate services summary: {e}")
|
||||
return {"error": "Failed to generate summary", "timestamp": datetime.utcnow().isoformat()}
|
||||
return {"error": "Failed to generate summary", "timestamp": datetime.now(timezone.utc).isoformat()}
|
||||
|
||||
|
||||
@app.get("/dashboard/metrics")
|
||||
@@ -185,7 +185,7 @@ async def system_metrics() -> dict[str, Any]:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to collect system metrics: {e}")
|
||||
return {"error": "Failed to collect metrics", "timestamp": datetime.utcnow().isoformat()}
|
||||
return {"error": "Failed to collect metrics", "timestamp": datetime.now(timezone.utc).isoformat()}
|
||||
|
||||
|
||||
async def collect_all_health_data() -> dict[str, Any]:
|
||||
@@ -224,7 +224,7 @@ async def check_service_health(service_name: str, service_config: dict[str, Any]
|
||||
return {
|
||||
"status": "healthy",
|
||||
"response_time": 0.1,
|
||||
"last_check": datetime.utcnow().isoformat(),
|
||||
"last_check": datetime.now(timezone.utc).isoformat(),
|
||||
"details": response.json(),
|
||||
}
|
||||
except Exception as e:
|
||||
@@ -232,7 +232,7 @@ async def check_service_health(service_name: str, service_config: dict[str, Any]
|
||||
return {
|
||||
"status": "unhealthy",
|
||||
"error": str(e),
|
||||
"last_check": datetime.utcnow().isoformat(),
|
||||
"last_check": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ from contextlib import asynccontextmanager
|
||||
from typing import AsyncIterator
|
||||
|
||||
from fastapi import FastAPI, Depends
|
||||
from fastapi.responses import JSONResponse
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
@@ -79,7 +80,10 @@ async def ready() -> dict[str, str]:
|
||||
return {"status": "ready", "service": "trading-service"}
|
||||
except Exception as e:
|
||||
logger.error(f"Readiness check failed: {e}")
|
||||
return {"status": "not_ready", "service": "trading-service", "error": str(e)}
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={"status": "not_ready", "service": "trading-service", "error": str(e)},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/live")
|
||||
|
||||
Reference in New Issue
Block a user