refactor: consolidate logging to shared aitbc-core package and upgrade database dependencies
- Upgrade SQLAlchemy to 2.0.47 with asyncio extras in blockchain-node and coordinator-api - Add asyncpg >=0.29.0 for PostgreSQL async support - Remove uvloop as optional dependency, make it required >=0.22.0 - Delete duplicate logger.py from blockchain-node (117 lines) - Refactor coordinator-api logging to use shared aitbc.logging from aitbc-core package - Add aitbc-core package dependency to coordinator
This commit is contained in:
@@ -1,101 +1,3 @@
|
||||
"""
|
||||
Logging configuration for the AITBC Coordinator API
|
||||
from aitbc.logging import get_logger, setup_logger
|
||||
|
||||
Provides structured JSON logging for better observability and log parsing.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import json
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Optional
|
||||
from pythonjsonlogger import jsonlogger
|
||||
|
||||
|
||||
class StructuredLogFormatter(jsonlogger.JsonFormatter):
|
||||
"""Custom JSON formatter for structured logging."""
|
||||
|
||||
def add_fields(self, log_record: Dict[str, Any], record: logging.LogRecord, message_dict: Dict[str, Any]) -> None:
|
||||
super().add_fields(log_record, record, message_dict)
|
||||
log_record['timestamp'] = datetime.utcnow().isoformat() + 'Z'
|
||||
log_record['service'] = 'aitbc-coordinator-api'
|
||||
log_record['level'] = record.levelname
|
||||
log_record['logger'] = record.name
|
||||
log_record['module'] = record.module
|
||||
log_record['function'] = record.funcName
|
||||
log_record['line'] = record.lineno
|
||||
|
||||
if record.exc_info:
|
||||
log_record['exception'] = self.format_exception(record.exc_info)
|
||||
|
||||
@staticmethod
|
||||
def format_exception(exc_info) -> Optional[Dict[str, Any]]:
|
||||
"""Format exception info for JSON output."""
|
||||
if exc_info is None:
|
||||
return None
|
||||
import traceback
|
||||
return {
|
||||
'type': exc_info[0].__name__ if exc_info[0] else None,
|
||||
'message': str(exc_info[1]) if exc_info[1] else None,
|
||||
'traceback': traceback.format_exception(*exc_info) if exc_info[0] else None
|
||||
}
|
||||
|
||||
|
||||
class AuditLogger:
|
||||
"""Audit logger for tracking sensitive operations."""
|
||||
|
||||
def __init__(self, logger: logging.Logger):
|
||||
self.logger = logger
|
||||
|
||||
def log(self, action: str, user_id: Optional[str] = None, resource_id: Optional[str] = None,
|
||||
details: Optional[Dict[str, Any]] = None, success: bool = True) -> None:
|
||||
"""Log an audit event."""
|
||||
self.logger.info(
|
||||
"audit_event",
|
||||
extra={
|
||||
'audit': {
|
||||
'action': action,
|
||||
'user_id': user_id,
|
||||
'resource_id': resource_id,
|
||||
'details': details or {},
|
||||
'success': success
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def setup_logging(level: str = "INFO", json_format: bool = True) -> None:
|
||||
"""Setup structured logging for the application."""
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.handlers.clear()
|
||||
|
||||
if json_format:
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setFormatter(StructuredLogFormatter(
|
||||
'%(timestamp)s %(level)s %(message)s'
|
||||
))
|
||||
else:
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setFormatter(
|
||||
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
)
|
||||
|
||||
root_logger.addHandler(handler)
|
||||
root_logger.setLevel(getattr(logging, level.upper()))
|
||||
|
||||
logging.getLogger('uvicorn').setLevel(level)
|
||||
logging.getLogger('uvicorn.access').setLevel(level)
|
||||
|
||||
|
||||
def get_logger(name: str) -> logging.Logger:
|
||||
"""Get a logger instance."""
|
||||
return logging.getLogger(name)
|
||||
|
||||
|
||||
def get_audit_logger(name: str = "audit") -> AuditLogger:
|
||||
"""Get an audit logger instance."""
|
||||
return AuditLogger(get_logger(name))
|
||||
|
||||
|
||||
# Initialize default logging on import
|
||||
setup_logging()
|
||||
__all__ = ["get_logger", "setup_logger"]
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
from slowapi import Limiter, _rate_limit_exceeded_handler
|
||||
from slowapi.util import get_remote_address
|
||||
from slowapi.errors import RateLimitExceeded
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse
|
||||
@@ -32,14 +35,28 @@ from .routers.monitoring_dashboard import router as monitoring_dashboard
|
||||
from .routers.multi_modal_rl import router as multi_modal_rl_router
|
||||
from .storage.models_governance import GovernanceProposal, ProposalVote, TreasuryTransaction, GovernanceParameter
|
||||
from .exceptions import AITBCError, ErrorResponse
|
||||
from .logging import get_logger
|
||||
from aitbc.logging import get_logger
|
||||
from .config import settings
|
||||
from .storage.db import init_db
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Lifecycle events for the Coordinator API."""
|
||||
logger.info("Starting Coordinator API")
|
||||
# Initialize database if needed
|
||||
init_db()
|
||||
yield
|
||||
logger.info("Shutting down Coordinator API")
|
||||
|
||||
def create_app() -> FastAPI:
|
||||
# Initialize rate limiter
|
||||
limiter = Limiter(key_func=get_remote_address)
|
||||
|
||||
app = FastAPI(
|
||||
title="AITBC Coordinator API",
|
||||
version="0.1.0",
|
||||
@@ -47,6 +64,7 @@ def create_app() -> FastAPI:
|
||||
docs_url="/docs",
|
||||
redoc_url="/redoc",
|
||||
openapi_url="/openapi.json",
|
||||
lifespan=lifespan,
|
||||
openapi_tags=[
|
||||
{"name": "health", "description": "Health check endpoints"},
|
||||
{"name": "client", "description": "Client operations"},
|
||||
@@ -59,8 +77,11 @@ def create_app() -> FastAPI:
|
||||
]
|
||||
)
|
||||
|
||||
# Create database tables
|
||||
init_db()
|
||||
app.state.limiter = limiter
|
||||
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
||||
|
||||
# Create database tables (now handled in lifespan)
|
||||
# init_db()
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
|
||||
@@ -30,7 +30,7 @@ from .routers.marketplace_enhanced_simple import router as marketplace_enhanced
|
||||
from .routers.openclaw_enhanced_simple import router as openclaw_enhanced
|
||||
from .storage.models_governance import GovernanceProposal, ProposalVote, TreasuryTransaction, GovernanceParameter
|
||||
from .exceptions import AITBCError, ErrorResponse
|
||||
from .logging import get_logger
|
||||
from aitbc.logging import get_logger
|
||||
from .config import settings
|
||||
from .storage.db import init_db
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ from .routers.marketplace_offers import router as marketplace_offers
|
||||
from .routers.marketplace_enhanced_simple import router as marketplace_enhanced
|
||||
from .routers.openclaw_enhanced_simple import router as openclaw_enhanced
|
||||
from .exceptions import AITBCError, ErrorResponse
|
||||
from .logging import get_logger
|
||||
from aitbc.logging import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, Request
|
||||
from slowapi import Limiter
|
||||
from slowapi.util import get_remote_address
|
||||
|
||||
from ..deps import require_client_key
|
||||
from ..schemas import JobCreate, JobView, JobResult, JobPaymentCreate
|
||||
@@ -7,12 +9,15 @@ from ..services import JobService
|
||||
from ..services.payments import PaymentService
|
||||
from ..storage import SessionDep
|
||||
|
||||
limiter = Limiter(key_func=get_remote_address)
|
||||
router = APIRouter(tags=["client"])
|
||||
|
||||
|
||||
@router.post("/jobs", response_model=JobView, status_code=status.HTTP_201_CREATED, summary="Submit a job")
|
||||
@limiter.limit("100/minute")
|
||||
async def submit_job(
|
||||
req: JobCreate,
|
||||
request: Request,
|
||||
session: SessionDep,
|
||||
client_id: str = Depends(require_client_key()),
|
||||
) -> JobView: # type: ignore[arg-type]
|
||||
|
||||
@@ -86,29 +86,45 @@ class JobService:
|
||||
return AssignedJob(job_id=job.id, payload=job.payload, constraints=constraints)
|
||||
|
||||
def acquire_next_job(self, miner: Miner) -> Optional[Job]:
|
||||
now = datetime.utcnow()
|
||||
statement = (
|
||||
select(Job)
|
||||
.where(Job.state == JobState.queued)
|
||||
.order_by(Job.requested_at.asc())
|
||||
)
|
||||
try:
|
||||
now = datetime.utcnow()
|
||||
statement = (
|
||||
select(Job)
|
||||
.where(Job.state == JobState.queued)
|
||||
.order_by(Job.requested_at.asc())
|
||||
)
|
||||
|
||||
jobs = self.session.exec(statement).all()
|
||||
for job in jobs:
|
||||
job = self._ensure_not_expired(job)
|
||||
if job.state != JobState.queued:
|
||||
continue
|
||||
if job.expires_at <= now:
|
||||
continue
|
||||
if not self._satisfies_constraints(job, miner):
|
||||
continue
|
||||
job.state = JobState.running
|
||||
job.assigned_miner_id = miner.id
|
||||
self.session.add(job)
|
||||
self.session.commit()
|
||||
self.session.refresh(job)
|
||||
return job
|
||||
return None
|
||||
jobs = self.session.exec(statement).all()
|
||||
for job in jobs:
|
||||
try:
|
||||
job = self._ensure_not_expired(job)
|
||||
if job.state != JobState.queued:
|
||||
continue
|
||||
if job.expires_at <= now:
|
||||
continue
|
||||
if not self._satisfies_constraints(job, miner):
|
||||
continue
|
||||
|
||||
# Update job state
|
||||
job.state = JobState.running
|
||||
job.assigned_miner_id = miner.id
|
||||
self.session.add(job)
|
||||
self.session.commit()
|
||||
self.session.refresh(job)
|
||||
return job
|
||||
except Exception as e:
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.warning(f"Error checking job {job.id}: {e}")
|
||||
self.session.rollback() # Rollback on individual job failure
|
||||
continue
|
||||
|
||||
return None
|
||||
except Exception as e:
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.error(f"Error acquiring next job: {e}")
|
||||
raise # Propagate for caller to handle
|
||||
|
||||
def _ensure_not_expired(self, job: Job) -> Job:
|
||||
if job.state in {JobState.queued, JobState.running} and job.expires_at <= datetime.utcnow():
|
||||
|
||||
Reference in New Issue
Block a user