chore: standardize configuration, logging, and error handling across blockchain node and coordinator API

- Add infrastructure.md and workflow files to .gitignore to prevent sensitive info leaks
- Change blockchain node mempool backend default from memory to database for persistence
- Refactor blockchain node logger with StructuredLogFormatter and AuditLogger (consistent with coordinator)
- Add structured logging fields: service, module, function, line number
- Unify coordinator config with Database
This commit is contained in:
oib
2026-02-13 22:39:43 +01:00
parent 0cbd2b507c
commit 06e48ef34b
196 changed files with 4660 additions and 20090 deletions

View File

@@ -32,7 +32,7 @@ class ChainSettings(BaseSettings):
min_fee: int = 0 # Minimum fee to accept into mempool
# Mempool settings
mempool_backend: str = "memory" # "memory" or "database"
mempool_backend: str = "database" # "database" or "memory" (database recommended for persistence)
mempool_max_size: int = 10_000
mempool_eviction_interval: int = 60 # seconds

View File

@@ -1,13 +1,16 @@
from __future__ import annotations
import logging
import sys
from datetime import datetime
from typing import Any, Optional
import json
class JsonFormatter(logging.Formatter):
class StructuredLogFormatter(logging.Formatter):
"""Custom JSON formatter for structured logging - consistent with coordinator API."""
RESERVED = {
"name",
"msg",
@@ -34,8 +37,12 @@ class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str: # type: ignore[override]
payload: dict[str, Any] = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"service": "aitbc-blockchain-node",
"level": record.levelname,
"logger": record.name,
"module": record.module,
"function": record.funcName,
"line": record.lineno,
"message": record.getMessage(),
}
@@ -45,27 +52,66 @@ class JsonFormatter(logging.Formatter):
payload[key] = value
if record.exc_info:
payload["exc_info"] = self.formatException(record.exc_info)
payload["exception"] = self.formatException(record.exc_info)
if record.stack_info:
payload["stack"] = record.stack_info
return json.dumps(payload, default=str)
def configure_logging(level: Optional[str] = None) -> None:
class AuditLogger:
"""Audit logger for tracking sensitive operations - consistent with coordinator API."""
def __init__(self, logger: logging.Logger):
self.logger = logger
def log(self, action: str, user_id: Optional[str] = None, resource_id: Optional[str] = None,
details: Optional[dict] = None, success: bool = True) -> None:
"""Log an audit event."""
self.logger.info(
"audit_event",
extra={
"audit": {
"action": action,
"user_id": user_id,
"resource_id": resource_id,
"details": details or {},
"success": success
}
}
)
def configure_logging(level: Optional[str] = None, json_format: bool = True) -> None:
"""Configure structured logging for the blockchain node."""
log_level = getattr(logging, (level or "INFO").upper(), logging.INFO)
root = logging.getLogger()
if root.handlers:
return
handler = logging.StreamHandler()
formatter = JsonFormatter()
handler.setFormatter(formatter)
root.handlers.clear()
if json_format:
handler = logging.StreamHandler(sys.stdout)
formatter = StructuredLogFormatter()
handler.setFormatter(formatter)
else:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
root.addHandler(handler)
root.setLevel(log_level)
logging.getLogger('uvicorn').setLevel(log_level)
logging.getLogger('uvicorn.access').setLevel(log_level)
def get_logger(name: str) -> logging.Logger:
"""Get a logger instance."""
if not logging.getLogger().handlers:
configure_logging()
return logging.getLogger(name)
def get_audit_logger(name: str = "audit") -> AuditLogger:
"""Get an audit logger instance."""
return AuditLogger(get_logger(name))

View File

@@ -1,55 +1,125 @@
"""
Unified configuration for AITBC Coordinator API
Provides environment-based adapter selection and consolidated settings.
"""
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List, Optional
from pathlib import Path
import os
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", case_sensitive=False)
app_env: str = "dev"
app_host: str = "127.0.0.1"
app_port: int = 8011
# Use absolute path to avoid database duplicates in different working directories
class DatabaseConfig(BaseSettings):
"""Database configuration with adapter selection."""
adapter: str = "sqlite" # sqlite, postgresql
url: Optional[str] = None
pool_size: int = 10
max_overflow: int = 20
pool_pre_ping: bool = True
@property
def database_url(self) -> str:
# Find project root by looking for .git directory
def effective_url(self) -> str:
"""Get the effective database URL."""
if self.url:
return self.url
# Auto-generate SQLite URL based on environment
if self.adapter == "sqlite":
project_root = self._find_project_root()
db_path = project_root / "data" / "coordinator.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
return f"sqlite:///{db_path}"
elif self.adapter == "postgresql":
return "postgresql://localhost:5432/aitbc_coordinator"
return "sqlite:///:memory:"
@staticmethod
def _find_project_root() -> Path:
"""Find project root by looking for .git directory."""
current = Path(__file__).resolve()
while current.parent != current:
if (current / ".git").exists():
project_root = current
break
return current
current = current.parent
else:
# Fallback to relative path if .git not found
project_root = Path(__file__).resolve().parents[3]
db_path = project_root / "data" / "coordinator.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
return f"sqlite:///{db_path}"
return Path(__file__).resolve().parents[3]
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = False
class Settings(BaseSettings):
"""Unified application settings with environment-based configuration."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="allow"
)
# Environment
app_env: str = "dev"
app_host: str = "127.0.0.1"
app_port: int = 8011
# Database
database: DatabaseConfig = DatabaseConfig()
# API Keys
client_api_keys: List[str] = []
miner_api_keys: List[str] = []
admin_api_keys: List[str] = []
# Security
hmac_secret: Optional[str] = None
jwt_secret: Optional[str] = None
jwt_algorithm: str = "HS256"
jwt_expiration_hours: int = 24
# CORS
allow_origins: List[str] = [
"http://localhost:3000",
"http://localhost:8080",
"http://localhost:8080",
"http://localhost:8000",
"http://localhost:8011"
]
# Job Configuration
job_ttl_seconds: int = 900
heartbeat_interval_seconds: int = 10
heartbeat_timeout_seconds: int = 30
# Rate Limiting
rate_limit_requests: int = 60
rate_limit_window_seconds: int = 60
# Receipt Signing
receipt_signing_key_hex: Optional[str] = None
receipt_attestation_key_hex: Optional[str] = None
# Logging
log_level: str = "INFO"
log_format: str = "json" # json or text
# Mempool
mempool_backend: str = "database" # database, memory
def validate_secrets(self) -> None:
"""Validate that all required secrets are provided."""
if self.app_env == "production":
if not self.jwt_secret:
raise ValueError("JWT_SECRET environment variable is required in production")
if self.jwt_secret == "change-me-in-production":
raise ValueError("JWT_SECRET must be changed from default value")
@property
def database_url(self) -> str:
"""Get the database URL (backward compatibility)."""
return self.database.effective_url
settings = Settings()
# Validate secrets on import
settings.validate_secrets()

View File

@@ -1,10 +1,19 @@
from typing import Callable, Annotated
"""
Dependency injection module for AITBC Coordinator API
Provides unified dependency injection using storage.SessionDep.
"""
from typing import Callable
from fastapi import Depends, Header, HTTPException
from .config import settings
from .storage import SessionDep
class APIKeyValidator:
"""Validator for API key authentication."""
def __init__(self, allowed_keys: list[str]):
self.allowed_keys = {key.strip() for key in allowed_keys if key}
@@ -15,12 +24,22 @@ class APIKeyValidator:
def require_client_key() -> Callable[[str | None], str]:
"""Dependency for client API key authentication."""
return APIKeyValidator(settings.client_api_keys)
def require_miner_key() -> Callable[[str | None], str]:
"""Dependency for miner API key authentication."""
return APIKeyValidator(settings.miner_api_keys)
def require_admin_key() -> Callable[[str | None], str]:
"""Dependency for admin API key authentication."""
return APIKeyValidator(settings.admin_api_keys)
# Legacy aliases for backward compatibility
def get_session():
"""Legacy alias - use SessionDep instead."""
from .storage import get_session
return get_session()

View File

@@ -1,83 +1,249 @@
"""
Exception classes for AITBC coordinator
Exception classes and error response schemas for AITBC coordinator
Provides structured error responses for consistent API error handling.
"""
from datetime import datetime
from typing import Any, Dict, Optional, List
from pydantic import BaseModel, Field
class ErrorDetail(BaseModel):
"""Detailed error information."""
field: Optional[str] = Field(None, description="Field that caused the error")
message: str = Field(..., description="Error message")
code: Optional[str] = Field(None, description="Error code for programmatic handling")
class ErrorResponse(BaseModel):
"""Standardized error response for all API errors."""
error: Dict[str, Any] = Field(..., description="Error information")
timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
request_id: Optional[str] = Field(None, description="Request ID for tracing")
class Config:
json_schema_extra = {
"example": {
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid input data",
"status": 422,
"details": [
{"field": "email", "message": "Invalid email format", "code": "invalid_format"}
]
},
"timestamp": "2026-02-13T21:00:00Z",
"request_id": "req_abc123"
}
}
class AITBCError(Exception):
"""Base exception for all AITBC errors"""
pass
error_code: str = "INTERNAL_ERROR"
status_code: int = 500
def to_response(self, request_id: Optional[str] = None) -> ErrorResponse:
"""Convert exception to standardized error response."""
return ErrorResponse(
error={
"code": self.error_code,
"message": str(self),
"status": self.status_code,
"details": []
},
request_id=request_id
)
class AuthenticationError(AITBCError):
"""Raised when authentication fails"""
pass
error_code: str = "AUTHENTICATION_ERROR"
status_code: int = 401
def __init__(self, message: str = "Authentication failed"):
super().__init__(message)
class AuthorizationError(AITBCError):
"""Raised when authorization fails"""
error_code: str = "AUTHORIZATION_ERROR"
status_code: int = 403
def __init__(self, message: str = "Not authorized to perform this action"):
super().__init__(message)
class RateLimitError(AITBCError):
"""Raised when rate limit is exceeded"""
def __init__(self, message: str, retry_after: int = None):
error_code: str = "RATE_LIMIT_EXCEEDED"
status_code: int = 429
def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60):
super().__init__(message)
self.retry_after = retry_after
def to_response(self, request_id: Optional[str] = None) -> ErrorResponse:
return ErrorResponse(
error={
"code": self.error_code,
"message": str(self),
"status": self.status_code,
"details": [{"retry_after": self.retry_after}]
},
request_id=request_id
)
class APIError(AITBCError):
"""Raised when API request fails"""
error_code: str = "API_ERROR"
status_code: int = 500
def __init__(self, message: str, status_code: int = None, response: dict = None):
super().__init__(message)
self.status_code = status_code
self.status_code = status_code or self.status_code
self.response = response
class ConfigurationError(AITBCError):
"""Raised when configuration is invalid"""
pass
error_code: str = "CONFIGURATION_ERROR"
status_code: int = 500
def __init__(self, message: str = "Invalid configuration"):
super().__init__(message)
class ConnectorError(AITBCError):
"""Raised when connector operation fails"""
pass
error_code: str = "CONNECTOR_ERROR"
status_code: int = 502
def __init__(self, message: str = "Connector operation failed"):
super().__init__(message)
class PaymentError(ConnectorError):
"""Raised when payment operation fails"""
pass
error_code: str = "PAYMENT_ERROR"
status_code: int = 402
def __init__(self, message: str = "Payment operation failed"):
super().__init__(message)
class ValidationError(AITBCError):
"""Raised when data validation fails"""
pass
error_code: str = "VALIDATION_ERROR"
status_code: int = 422
def __init__(self, message: str = "Validation failed", details: List[ErrorDetail] = None):
super().__init__(message)
self.details = details or []
def to_response(self, request_id: Optional[str] = None) -> ErrorResponse:
return ErrorResponse(
error={
"code": self.error_code,
"message": str(self),
"status": self.status_code,
"details": [{"field": d.field, "message": d.message, "code": d.code} for d in self.details]
},
request_id=request_id
)
class WebhookError(AITBCError):
"""Raised when webhook processing fails"""
pass
error_code: str = "WEBHOOK_ERROR"
status_code: int = 500
def __init__(self, message: str = "Webhook processing failed"):
super().__init__(message)
class ERPError(ConnectorError):
"""Raised when ERP operation fails"""
pass
error_code: str = "ERP_ERROR"
status_code: int = 502
def __init__(self, message: str = "ERP operation failed"):
super().__init__(message)
class SyncError(ConnectorError):
"""Raised when synchronization fails"""
pass
error_code: str = "SYNC_ERROR"
status_code: int = 500
def __init__(self, message: str = "Synchronization failed"):
super().__init__(message)
class TimeoutError(AITBCError):
"""Raised when operation times out"""
pass
error_code: str = "TIMEOUT_ERROR"
status_code: int = 504
def __init__(self, message: str = "Operation timed out"):
super().__init__(message)
class TenantError(ConnectorError):
"""Raised when tenant operation fails"""
pass
error_code: str = "TENANT_ERROR"
status_code: int = 400
def __init__(self, message: str = "Tenant operation failed"):
super().__init__(message)
class QuotaExceededError(ConnectorError):
"""Raised when resource quota is exceeded"""
pass
error_code: str = "QUOTA_EXCEEDED"
status_code: int = 429
def __init__(self, message: str = "Quota exceeded", limit: int = None):
super().__init__(message)
self.limit = limit
def to_response(self, request_id: Optional[str] = None) -> ErrorResponse:
details = [{"limit": self.limit}] if self.limit else []
return ErrorResponse(
error={
"code": self.error_code,
"message": str(self),
"status": self.status_code,
"details": details
},
request_id=request_id
)
class BillingError(ConnectorError):
"""Raised when billing operation fails"""
pass
error_code: str = "BILLING_ERROR"
status_code: int = 402
def __init__(self, message: str = "Billing operation failed"):
super().__init__(message)
class NotFoundError(AITBCError):
"""Raised when a resource is not found"""
error_code: str = "NOT_FOUND"
status_code: int = 404
def __init__(self, message: str = "Resource not found"):
super().__init__(message)
class ConflictError(AITBCError):
"""Raised when there's a conflict (e.g., duplicate resource)"""
error_code: str = "CONFLICT"
status_code: int = 409
def __init__(self, message: str = "Resource conflict"):
super().__init__(message)

View File

@@ -1,19 +1,90 @@
"""
Logging configuration for the AITBC Coordinator API
Provides structured JSON logging for better observability and log parsing.
"""
import logging
import sys
from typing import Any, Dict
import json
from datetime import datetime
from typing import Any, Dict, Optional
from pythonjsonlogger import jsonlogger
def setup_logging(level: str = "INFO") -> None:
class StructuredLogFormatter(jsonlogger.JsonFormatter):
"""Custom JSON formatter for structured logging."""
def add_fields(self, log_record: Dict[str, Any], record: logging.LogRecord, message_dict: Dict[str, Any]) -> None:
super().add_fields(log_record, record, message_dict)
log_record['timestamp'] = datetime.utcnow().isoformat() + 'Z'
log_record['service'] = 'aitbc-coordinator-api'
log_record['level'] = record.levelname
log_record['logger'] = record.name
log_record['module'] = record.module
log_record['function'] = record.funcName
log_record['line'] = record.lineno
if record.exc_info:
log_record['exception'] = self.format_exception(record.exc_info)
@staticmethod
def format_exception(exc_info) -> Optional[Dict[str, Any]]:
"""Format exception info for JSON output."""
if exc_info is None:
return None
import traceback
return {
'type': exc_info[0].__name__ if exc_info[0] else None,
'message': str(exc_info[1]) if exc_info[1] else None,
'traceback': traceback.format_exception(*exc_info) if exc_info[0] else None
}
class AuditLogger:
"""Audit logger for tracking sensitive operations."""
def __init__(self, logger: logging.Logger):
self.logger = logger
def log(self, action: str, user_id: Optional[str] = None, resource_id: Optional[str] = None,
details: Optional[Dict[str, Any]] = None, success: bool = True) -> None:
"""Log an audit event."""
self.logger.info(
"audit_event",
extra={
'audit': {
'action': action,
'user_id': user_id,
'resource_id': resource_id,
'details': details or {},
'success': success
}
}
)
def setup_logging(level: str = "INFO", json_format: bool = True) -> None:
"""Setup structured logging for the application."""
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
root_logger = logging.getLogger()
root_logger.handlers.clear()
if json_format:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(StructuredLogFormatter(
'%(timestamp)s %(level)s %(message)s'
))
else:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
root_logger.addHandler(handler)
root_logger.setLevel(getattr(logging, level.upper()))
logging.getLogger('uvicorn').setLevel(level)
logging.getLogger('uvicorn.access').setLevel(level)
def get_logger(name: str) -> logging.Logger:
@@ -21,5 +92,10 @@ def get_logger(name: str) -> logging.Logger:
return logging.getLogger(name)
def get_audit_logger(name: str = "audit") -> AuditLogger:
"""Get an audit logger instance."""
return AuditLogger(get_logger(name))
# Initialize default logging on import
setup_logging()

View File

@@ -1,5 +1,7 @@
from fastapi import FastAPI
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from prometheus_client import make_asgi_app
from .config import settings
@@ -21,6 +23,11 @@ from .routers import (
from .routers.governance import router as governance
from .routers.partners import router as partners
from .storage.models_governance import GovernanceProposal, ProposalVote, TreasuryTransaction, GovernanceParameter
from .exceptions import AITBCError, ErrorResponse
from .logging import get_logger
logger = get_logger(__name__)
def create_app() -> FastAPI:
@@ -28,6 +35,19 @@ def create_app() -> FastAPI:
title="AITBC Coordinator API",
version="0.1.0",
description="Stage 1 coordinator service handling job orchestration between clients and miners.",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
openapi_tags=[
{"name": "health", "description": "Health check endpoints"},
{"name": "client", "description": "Client operations"},
{"name": "miner", "description": "Miner operations"},
{"name": "admin", "description": "Admin operations"},
{"name": "marketplace", "description": "GPU Marketplace"},
{"name": "exchange", "description": "Exchange operations"},
{"name": "governance", "description": "Governance operations"},
{"name": "zk", "description": "Zero-Knowledge proofs"},
]
)
# Create database tables
@@ -60,10 +80,66 @@ def create_app() -> FastAPI:
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
@app.exception_handler(AITBCError)
async def aitbc_error_handler(request: Request, exc: AITBCError) -> JSONResponse:
"""Handle AITBC exceptions with structured error responses."""
request_id = request.headers.get("X-Request-ID")
response = exc.to_response(request_id)
return JSONResponse(
status_code=response.error["status"],
content=response.model_dump()
)
@app.exception_handler(RequestValidationError)
async def validation_error_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
"""Handle FastAPI validation errors with structured error responses."""
request_id = request.headers.get("X-Request-ID")
details = []
for error in exc.errors():
details.append({
"field": ".".join(str(loc) for loc in error["loc"]),
"message": error["msg"],
"code": error["type"]
})
error_response = ErrorResponse(
error={
"code": "VALIDATION_ERROR",
"message": "Request validation failed",
"status": 422,
"details": details
},
request_id=request_id
)
return JSONResponse(
status_code=422,
content=error_response.model_dump()
)
@app.get("/v1/health", tags=["health"], summary="Service healthcheck")
async def health() -> dict[str, str]:
return {"status": "ok", "env": settings.app_env}
@app.get("/health/live", tags=["health"], summary="Liveness probe")
async def liveness() -> dict[str, str]:
return {"status": "alive"}
@app.get("/health/ready", tags=["health"], summary="Readiness probe")
async def readiness() -> dict[str, str]:
# Check database connectivity
try:
from .storage import get_engine
engine = get_engine()
with engine.connect() as conn:
conn.execute("SELECT 1")
return {"status": "ready", "database": "connected"}
except Exception as e:
logger.error("Readiness check failed", extra={"error": str(e)})
return JSONResponse(
status_code=503,
content={"status": "not ready", "error": str(e)}
)
return app

View File

@@ -1,3 +1,9 @@
"""
Database storage module for AITBC Coordinator API
Provides unified database session management with connection pooling.
"""
from __future__ import annotations
from contextlib import contextmanager
@@ -5,6 +11,7 @@ from typing import Annotated, Generator
from fastapi import Depends
from sqlalchemy.engine import Engine
from sqlalchemy.pool import QueuePool
from sqlmodel import Session, SQLModel, create_engine
from ..config import settings
@@ -15,27 +22,48 @@ _engine: Engine | None = None
def get_engine() -> Engine:
"""Get or create the database engine with connection pooling."""
global _engine
if _engine is None:
connect_args = {"check_same_thread": False} if settings.database_url.startswith("sqlite") else {}
_engine = create_engine(settings.database_url, echo=False, connect_args=connect_args)
db_config = settings.database
connect_args = {"check_same_thread": False} if "sqlite" in db_config.effective_url else {}
_engine = create_engine(
db_config.effective_url,
echo=False,
connect_args=connect_args,
poolclass=QueuePool if "postgresql" in db_config.effective_url else None,
pool_size=db_config.pool_size,
max_overflow=db_config.max_overflow,
pool_pre_ping=db_config.pool_pre_ping,
)
return _engine
def init_db() -> None:
"""Initialize database tables."""
engine = get_engine()
SQLModel.metadata.create_all(engine)
@contextmanager
def session_scope() -> Generator[Session, None, None]:
"""Context manager for database sessions."""
engine = get_engine()
with Session(engine) as session:
session = Session(engine)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def get_session() -> Generator[Session, None, None]:
"""Get a database session (for FastAPI dependency)."""
with session_scope() as session:
yield session