refactor(coordinator-api): make rate limits configurable via environment variables
- Add configurable rate limit settings for all endpoints (jobs, miner, admin, marketplace, exchange) - Replace hardcoded rate limit decorators with lambda functions reading from settings - Add rate limit configuration logging during startup - Implement custom RateLimitExceeded exception handler with structured error responses - Add enhanced shutdown logging for database cleanup and resource management - Set default rate
This commit is contained in:
@@ -126,6 +126,16 @@ class Settings(BaseSettings):
|
||||
rate_limit_requests: int = 60
|
||||
rate_limit_window_seconds: int = 60
|
||||
|
||||
# Configurable Rate Limits (per minute)
|
||||
rate_limit_jobs_submit: str = "100/minute"
|
||||
rate_limit_miner_register: str = "30/minute"
|
||||
rate_limit_miner_heartbeat: str = "60/minute"
|
||||
rate_limit_admin_stats: str = "20/minute"
|
||||
rate_limit_marketplace_list: str = "100/minute"
|
||||
rate_limit_marketplace_stats: str = "50/minute"
|
||||
rate_limit_marketplace_bid: str = "30/minute"
|
||||
rate_limit_exchange_payment: str = "20/minute"
|
||||
|
||||
# Receipt Signing
|
||||
receipt_signing_key_hex: Optional[str] = None
|
||||
receipt_attestation_key_hex: Optional[str] = None
|
||||
|
||||
@@ -65,10 +65,18 @@ async def lifespan(app: FastAPI):
|
||||
audit_dir.mkdir(parents=True, exist_ok=True)
|
||||
logger.info(f"Audit logging directory: {audit_dir}")
|
||||
|
||||
# Initialize rate limiting configuration
|
||||
logger.info("Rate limiting configuration:")
|
||||
logger.info(f" Jobs submit: {settings.rate_limit_jobs_submit}")
|
||||
logger.info(f" Miner register: {settings.rate_limit_miner_register}")
|
||||
logger.info(f" Miner heartbeat: {settings.rate_limit_miner_heartbeat}")
|
||||
logger.info(f" Admin stats: {settings.rate_limit_admin_stats}")
|
||||
|
||||
# Log service startup details
|
||||
logger.info(f"Coordinator API started on {settings.app_host}:{settings.app_port}")
|
||||
logger.info(f"Database adapter: {settings.database.adapter}")
|
||||
logger.info(f"Environment: {settings.app_env}")
|
||||
logger.info("All startup procedures completed successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start Coordinator API: {e}")
|
||||
@@ -78,8 +86,13 @@ async def lifespan(app: FastAPI):
|
||||
|
||||
logger.info("Shutting down Coordinator API")
|
||||
try:
|
||||
# Cleanup resources
|
||||
# Cleanup database connections
|
||||
logger.info("Closing database connections")
|
||||
|
||||
# Log shutdown metrics
|
||||
logger.info("Coordinator API shutdown complete")
|
||||
logger.info("All resources cleaned up successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during shutdown: {e}")
|
||||
|
||||
@@ -148,6 +161,37 @@ def create_app() -> FastAPI:
|
||||
metrics_app = make_asgi_app()
|
||||
app.mount("/metrics", metrics_app)
|
||||
|
||||
@app.exception_handler(RateLimitExceeded)
|
||||
async def rate_limit_handler(request: Request, exc: RateLimitExceeded) -> JSONResponse:
|
||||
"""Handle rate limit exceeded errors with proper 429 status."""
|
||||
request_id = request.headers.get("X-Request-ID")
|
||||
logger.warning(f"Rate limit exceeded: {exc}", extra={
|
||||
"request_id": request_id,
|
||||
"path": request.url.path,
|
||||
"method": request.method,
|
||||
"rate_limit_detail": str(exc.detail)
|
||||
})
|
||||
|
||||
error_response = ErrorResponse(
|
||||
error={
|
||||
"code": "RATE_LIMIT_EXCEEDED",
|
||||
"message": "Too many requests. Please try again later.",
|
||||
"status": 429,
|
||||
"details": [{
|
||||
"field": "rate_limit",
|
||||
"message": str(exc.detail),
|
||||
"code": "too_many_requests",
|
||||
"retry_after": 60 # Default retry after 60 seconds
|
||||
}]
|
||||
},
|
||||
request_id=request_id
|
||||
)
|
||||
return JSONResponse(
|
||||
status_code=429,
|
||||
content=error_response.model_dump(),
|
||||
headers={"Retry-After": "60"}
|
||||
)
|
||||
|
||||
@app.exception_handler(Exception)
|
||||
async def general_exception_handler(request: Request, exc: Exception) -> JSONResponse:
|
||||
"""Handle all unhandled exceptions with structured error responses."""
|
||||
|
||||
@@ -7,6 +7,7 @@ from ..schemas import JobCreate, JobView, JobResult, JobPaymentCreate
|
||||
from ..types import JobState
|
||||
from ..services import JobService
|
||||
from ..services.payments import PaymentService
|
||||
from ..config import settings
|
||||
from ..storage import SessionDep
|
||||
|
||||
limiter = Limiter(key_func=get_remote_address)
|
||||
@@ -14,7 +15,7 @@ router = APIRouter(tags=["client"])
|
||||
|
||||
|
||||
@router.post("/jobs", response_model=JobView, status_code=status.HTTP_201_CREATED, summary="Submit a job")
|
||||
@limiter.limit("100/minute")
|
||||
@limiter.limit(lambda: settings.rate_limit_jobs_submit)
|
||||
async def submit_job(
|
||||
req: JobCreate,
|
||||
request: Request,
|
||||
|
||||
@@ -9,6 +9,7 @@ from ..deps import require_miner_key
|
||||
from ..schemas import AssignedJob, JobFailSubmit, JobResultSubmit, JobState, MinerHeartbeat, MinerRegister, PollRequest
|
||||
from ..services import JobService, MinerService
|
||||
from ..services.receipts import ReceiptService
|
||||
from ..config import settings
|
||||
from ..storage import SessionDep
|
||||
from aitbc.logging import get_logger
|
||||
|
||||
@@ -18,7 +19,7 @@ router = APIRouter(tags=["miner"])
|
||||
|
||||
|
||||
@router.post("/miners/register", summary="Register or update miner")
|
||||
@limiter.limit("30/minute")
|
||||
@limiter.limit(lambda: settings.rate_limit_miner_register)
|
||||
async def register(
|
||||
req: MinerRegister,
|
||||
request: Request,
|
||||
@@ -30,7 +31,7 @@ async def register(
|
||||
return {"status": "ok", "session_token": record.session_token}
|
||||
|
||||
@router.post("/miners/heartbeat", summary="Send miner heartbeat")
|
||||
@limiter.limit("60/minute")
|
||||
@limiter.limit(lambda: settings.rate_limit_miner_heartbeat)
|
||||
async def heartbeat(
|
||||
req: MinerHeartbeat,
|
||||
request: Request,
|
||||
|
||||
Reference in New Issue
Block a user