Files
aitbc/apps/coordinator-api/src/app/config.py
aitbc 55060730b2 Migrate coordinator-api to centralized aitbc package utilities
- Migrate 69 service files from logging to aitbc.get_logger
- Migrate explorer.py HTTP client from httpx.Client to AITBCHTTPClient
- Migrate config.py hardcoded paths to use DATA_DIR and LOG_DIR constants from aitbc.constants
- Remove duplicate LOG_DIR import in config.py
- All routers already using aitbc utilities
2026-04-25 06:45:04 +02:00

220 lines
7.7 KiB
Python
Executable File

"""
Unified configuration for AITBC Coordinator API
Provides environment-based adapter selection and consolidated settings.
"""
import os
from aitbc.constants import DATA_DIR, LOG_DIR
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class DatabaseConfig(BaseSettings):
"""Database configuration with adapter selection."""
adapter: str = "sqlite" # sqlite, postgresql
url: str | None = None
pool_size: int = 10
max_overflow: int = 20
pool_pre_ping: bool = True
@property
def effective_url(self) -> str:
"""Get the effective database URL."""
if self.url:
return self.url
# Default SQLite path - consistent with blockchain-node pattern
if self.adapter == "sqlite":
return f"sqlite:///{DATA_DIR}/data/coordinator.db"
# Default PostgreSQL connection string
return f"{self.adapter}://localhost:5432/coordinator"
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="allow")
class Settings(BaseSettings):
"""Unified application settings with environment-based configuration."""
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="allow")
# Environment
app_env: str = "dev"
app_host: str = "127.0.0.1"
app_port: int = 8011
audit_log_dir: str = str(LOG_DIR / "audit")
# Database
database: DatabaseConfig = DatabaseConfig()
# Database Connection Pooling
db_pool_size: int = Field(default=20, description="Database connection pool size")
db_max_overflow: int = Field(default=40, description="Maximum overflow connections")
db_pool_recycle: int = Field(default=3600, description="Connection recycle time in seconds")
db_pool_pre_ping: bool = Field(default=True, description="Test connections before using")
db_echo: bool = Field(default=False, description="Enable SQL query logging")
# API Keys
client_api_keys: list[str] = []
miner_api_keys: list[str] = []
admin_api_keys: list[str] = []
@field_validator("client_api_keys", "miner_api_keys", "admin_api_keys", mode="before")
@classmethod
def parse_api_keys(cls, v: str | list[str]) -> list[str]:
import json
if isinstance(v, str):
try:
parsed = json.loads(v)
if isinstance(parsed, list):
return parsed
except (json.JSONDecodeError, TypeError):
pass
# Fall back to comma-separated
return [k.strip() for k in v.split(",") if k.strip()]
return v
@field_validator("client_api_keys", "miner_api_keys", "admin_api_keys")
@classmethod
def validate_api_keys(cls, v: list[str]) -> list[str]:
# Allow empty API keys in development/test environments
import os
if os.getenv("APP_ENV", "dev") != "production" and not v:
return v
if not v:
raise ValueError("API keys cannot be empty in production")
for key in v:
if not key or key.startswith("$") or key == "your_api_key_here":
raise ValueError("API keys must be set to valid values")
if len(key) < 16:
raise ValueError("API keys must be at least 16 characters long")
return v
# Security
hmac_secret: str | None = None
jwt_secret: str | None = None
jwt_algorithm: str = "HS256"
jwt_expiration_hours: int = 24
@field_validator("hmac_secret")
@classmethod
def validate_hmac_secret(cls, v: str | None) -> str | None:
# Allow None in development/test environments
import os
if os.getenv("APP_ENV", "dev") != "production" and not v:
return v
if not v or v.startswith("$") or v == "your_secret_here":
raise ValueError("HMAC_SECRET must be set to a secure value")
if len(v) < 32:
raise ValueError("HMAC_SECRET must be at least 32 characters long")
return v
@field_validator("jwt_secret")
@classmethod
def validate_jwt_secret(cls, v: str | None) -> str | None:
# Allow None in development/test environments
import os
if os.getenv("APP_ENV", "dev") != "production" and not v:
return v
if not v or v.startswith("$") or v == "your_secret_here":
raise ValueError("JWT_SECRET must be set to a secure value")
if len(v) < 32:
raise ValueError("JWT_SECRET must be at least 32 characters long")
return v
# CORS
allow_origins: list[str] = [
"http://localhost:8000", # Coordinator API
"http://localhost:8001", # Exchange API
"http://localhost:8002", # Blockchain Node
"http://localhost:8003", # Blockchain RPC
"http://localhost:8010", # Multimodal GPU
"http://localhost:8011", # GPU Multimodal
"http://localhost:8012", # Modality Optimization
"http://localhost:8013", # Adaptive Learning
"http://localhost:8014", # Marketplace Enhanced
"http://localhost:8015", # OpenClaw Enhanced
"http://localhost:8016", # Web UI
]
# Job Configuration
job_ttl_seconds: int = 900
heartbeat_interval_seconds: int = 10
heartbeat_timeout_seconds: int = 30
# Rate Limiting
rate_limit_requests: int = 60
rate_limit_window_seconds: int = 60
# Configurable Rate Limits (per minute)
rate_limit_jobs_submit: str = "100/minute"
rate_limit_miner_register: str = "30/minute"
rate_limit_miner_heartbeat: str = "60/minute"
rate_limit_admin_stats: str = "20/minute"
rate_limit_marketplace_list: str = "100/minute"
rate_limit_marketplace_stats: str = "50/minute"
rate_limit_marketplace_bid: str = "30/minute"
rate_limit_exchange_payment: str = "20/minute"
# Receipt Signing
receipt_signing_key_hex: str | None = None
receipt_attestation_key_hex: str | None = None
# Logging
log_level: str = "INFO"
log_format: str = "json" # json or text
# Mempool
mempool_backend: str = "database" # database, memory
# Blockchain RPC
blockchain_rpc_url: str = "http://localhost:8082"
# Test Configuration
test_mode: bool = False
test_database_url: str | None = None
def validate_secrets(self) -> None:
"""Validate that all required secrets are provided."""
if self.app_env == "production":
if not self.jwt_secret:
raise ValueError("JWT_SECRET environment variable is required in production")
if self.jwt_secret == "change-me-in-production":
raise ValueError("JWT_SECRET must be changed from default value")
@property
def database_url(self) -> str:
"""Get the database URL (backward compatibility)."""
# Use test database if in test mode and test_database_url is set
if self.test_mode and self.test_database_url:
return self.test_database_url
if self.database.url:
return self.database.url
# Default SQLite path - consistent with blockchain-node pattern
return f"sqlite:///{DATA_DIR}/data/coordinator.db"
@database_url.setter
def database_url(self, value: str):
"""Allow setting database URL for tests"""
if not self.test_mode:
raise RuntimeError("Cannot set database_url outside of test mode")
self.test_database_url = value
settings = Settings()
# Enable test mode if environment variable is set
if os.getenv("TEST_MODE") == "true":
settings.test_mode = True
if os.getenv("TEST_DATABASE_URL"):
settings.test_database_url = os.getenv("TEST_DATABASE_URL")
# Validate secrets on import
settings.validate_secrets()