docs: add code quality and type checking workflows to master index
Some checks failed
Documentation Validation / validate-docs (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
API Endpoint Tests / test-api-endpoints (push) Has been cancelled
CLI Tests / test-cli (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Package Tests / test-python-packages (map[name:aitbc-agent-sdk path:packages/py/aitbc-agent-sdk]) (push) Has been cancelled
Package Tests / test-python-packages (map[name:aitbc-core path:packages/py/aitbc-core]) (push) Has been cancelled
Package Tests / test-python-packages (map[name:aitbc-crypto path:packages/py/aitbc-crypto]) (push) Has been cancelled
Package Tests / test-python-packages (map[name:aitbc-sdk path:packages/py/aitbc-sdk]) (push) Has been cancelled
Package Tests / test-javascript-packages (map[name:aitbc-sdk-js path:packages/js/aitbc-sdk]) (push) Has been cancelled
Package Tests / test-javascript-packages (map[name:aitbc-token path:packages/solidity/aitbc-token]) (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
Systemd Sync / sync-systemd (push) Has been cancelled

- Add Code Quality Module section with pre-commit hooks and quality checks
- Add Type Checking CI/CD Module section with MyPy workflow and coverage
- Update README with code quality achievements and project structure
- Migrate FastAPI apps from deprecated on_event to lifespan context manager
- Update pyproject.toml files to reference consolidated dependencies
- Remove unused app.py import in coordinator-api
- Add type hints to agent
This commit is contained in:
aitbc
2026-03-31 21:45:43 +02:00
parent 26592ddf55
commit 9db720add8
308 changed files with 34194 additions and 34575 deletions

View File

@@ -2,102 +2,87 @@
Caching strategy for expensive queries
"""
from datetime import datetime, timedelta
from typing import Any, Optional, Dict
from functools import wraps
import hashlib
import json
import logging
from datetime import datetime, timedelta
from functools import wraps
from typing import Any
logger = logging.getLogger(__name__)
class CacheManager:
"""Simple in-memory cache with TTL support"""
def __init__(self):
self._cache: Dict[str, Dict[str, Any]] = {}
self._stats = {
"hits": 0,
"misses": 0,
"sets": 0,
"evictions": 0
}
def get(self, key: str) -> Optional[Any]:
self._cache: dict[str, dict[str, Any]] = {}
self._stats = {"hits": 0, "misses": 0, "sets": 0, "evictions": 0}
def get(self, key: str) -> Any | None:
"""Get value from cache"""
if key not in self._cache:
self._stats["misses"] += 1
return None
cache_entry = self._cache[key]
# Check if expired
if datetime.now() > cache_entry["expires_at"]:
del self._cache[key]
self._stats["evictions"] += 1
self._stats["misses"] += 1
return None
self._stats["hits"] += 1
logger.debug(f"Cache hit for key: {key}")
return cache_entry["value"]
def set(self, key: str, value: Any, ttl_seconds: int = 300) -> None:
"""Set value in cache with TTL"""
expires_at = datetime.now() + timedelta(seconds=ttl_seconds)
self._cache[key] = {
"value": value,
"expires_at": expires_at,
"created_at": datetime.now(),
"ttl": ttl_seconds
}
self._cache[key] = {"value": value, "expires_at": expires_at, "created_at": datetime.now(), "ttl": ttl_seconds}
self._stats["sets"] += 1
logger.debug(f"Cache set for key: {key}, TTL: {ttl_seconds}s")
def delete(self, key: str) -> bool:
"""Delete key from cache"""
if key in self._cache:
del self._cache[key]
return True
return False
def clear(self) -> None:
"""Clear all cache entries"""
self._cache.clear()
logger.info("Cache cleared")
def cleanup_expired(self) -> int:
"""Remove expired entries and return count removed"""
now = datetime.now()
expired_keys = [
key for key, entry in self._cache.items()
if now > entry["expires_at"]
]
expired_keys = [key for key, entry in self._cache.items() if now > entry["expires_at"]]
for key in expired_keys:
del self._cache[key]
self._stats["evictions"] += len(expired_keys)
if expired_keys:
logger.info(f"Cleaned up {len(expired_keys)} expired cache entries")
return len(expired_keys)
def get_stats(self) -> Dict[str, Any]:
def get_stats(self) -> dict[str, Any]:
"""Get cache statistics"""
total_requests = self._stats["hits"] + self._stats["misses"]
hit_rate = (self._stats["hits"] / total_requests * 100) if total_requests > 0 else 0
return {
**self._stats,
"total_entries": len(self._cache),
"hit_rate_percent": round(hit_rate, 2),
"total_requests": total_requests
"total_requests": total_requests,
}
@@ -109,19 +94,19 @@ def cache_key_generator(*args, **kwargs) -> str:
"""Generate a cache key from function arguments"""
# Create a deterministic string representation
key_parts = []
# Add function args
for arg in args:
if hasattr(arg, '__dict__'):
if hasattr(arg, "__dict__"):
# For objects, use their dict representation
key_parts.append(str(sorted(arg.__dict__.items())))
else:
key_parts.append(str(arg))
# Add function kwargs
if kwargs:
key_parts.append(str(sorted(kwargs.items())))
# Create hash for consistent key length
key_string = "|".join(key_parts)
return hashlib.md5(key_string.encode()).hexdigest()
@@ -129,60 +114,62 @@ def cache_key_generator(*args, **kwargs) -> str:
def cached(ttl_seconds: int = 300, key_prefix: str = ""):
"""Decorator for caching function results"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
# Generate cache key
cache_key = f"{key_prefix}{func.__name__}_{cache_key_generator(*args, **kwargs)}"
# Try to get from cache
cached_result = cache_manager.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function and cache result
result = await func(*args, **kwargs)
cache_manager.set(cache_key, result, ttl_seconds)
return result
@wraps(func)
def sync_wrapper(*args, **kwargs):
# Generate cache key
cache_key = f"{key_prefix}{func.__name__}_{cache_key_generator(*args, **kwargs)}"
# Try to get from cache
cached_result = cache_manager.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function and cache result
result = func(*args, **kwargs)
cache_manager.set(cache_key, result, ttl_seconds)
return result
# Return appropriate wrapper based on whether function is async
import asyncio
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
# Cache configurations for different query types
CACHE_CONFIGS = {
"marketplace_stats": {"ttl_seconds": 300, "key_prefix": "marketplace_"}, # 5 minutes
"job_list": {"ttl_seconds": 60, "key_prefix": "jobs_"}, # 1 minute
"miner_list": {"ttl_seconds": 120, "key_prefix": "miners_"}, # 2 minutes
"user_balance": {"ttl_seconds": 30, "key_prefix": "balance_"}, # 30 seconds
"exchange_rates": {"ttl_seconds": 600, "key_prefix": "rates_"}, # 10 minutes
"job_list": {"ttl_seconds": 60, "key_prefix": "jobs_"}, # 1 minute
"miner_list": {"ttl_seconds": 120, "key_prefix": "miners_"}, # 2 minutes
"user_balance": {"ttl_seconds": 30, "key_prefix": "balance_"}, # 30 seconds
"exchange_rates": {"ttl_seconds": 600, "key_prefix": "rates_"}, # 10 minutes
}
def get_cache_config(cache_type: str) -> Dict[str, Any]:
def get_cache_config(cache_type: str) -> dict[str, Any]:
"""Get cache configuration for a specific type"""
return CACHE_CONFIGS.get(cache_type, {"ttl_seconds": 300, "key_prefix": ""})
@@ -195,10 +182,10 @@ async def cleanup_expired_cache():
removed_count = cache_manager.cleanup_expired()
if removed_count > 0:
logger.info(f"Background cleanup removed {removed_count} expired entries")
# Run cleanup every 5 minutes
await asyncio.sleep(300)
except Exception as e:
logger.error(f"Cache cleanup error: {e}")
await asyncio.sleep(60) # Retry after 1 minute on error
@@ -207,25 +194,26 @@ async def cleanup_expired_cache():
# Cache warming utilities
class CacheWarmer:
"""Utility class for warming up cache with common queries"""
def __init__(self, session):
self.session = session
async def warm_marketplace_stats(self):
"""Warm up marketplace statistics cache"""
try:
from ..services.marketplace import MarketplaceService
service = MarketplaceService(self.session)
# Cache common stats queries
stats = await service.get_stats()
cache_manager.set("marketplace_stats_overview", stats, ttl_seconds=300)
logger.info("Marketplace stats cache warmed up")
except Exception as e:
logger.error(f"Failed to warm marketplace stats cache: {e}")
async def warm_exchange_rates(self):
"""Warm up exchange rates cache"""
try:
@@ -233,9 +221,9 @@ class CacheWarmer:
# For now, just set a placeholder
rates = {"AITBC_BTC": 0.00001, "AITBC_USD": 0.10}
cache_manager.set("exchange_rates_current", rates, ttl_seconds=600)
logger.info("Exchange rates cache warmed up")
except Exception as e:
logger.error(f"Failed to warm exchange rates cache: {e}")
@@ -244,11 +232,11 @@ class CacheWarmer:
async def cache_middleware(request, call_next):
"""FastAPI middleware to add cache headers and track cache performance"""
response = await call_next(request)
# Add cache statistics to response headers (for debugging)
stats = cache_manager.get_stats()
response.headers["X-Cache-Hits"] = str(stats["hits"])
response.headers["X-Cache-Misses"] = str(stats["misses"])
response.headers["X-Cache-Hit-Rate"] = f"{stats['hit_rate_percent']}%"
return response

View File

@@ -2,25 +2,24 @@
Cache management utilities for endpoints
"""
from ..utils.cache import cache_manager, cleanup_expired_cache
from ..config import settings
import logging
from ..utils.cache import cache_manager, cleanup_expired_cache
logger = logging.getLogger(__name__)
def invalidate_cache_pattern(pattern: str):
"""Invalidate cache entries matching a pattern"""
keys_to_delete = []
for key in cache_manager._cache.keys():
if pattern in key:
keys_to_delete.append(key)
for key in keys_to_delete:
cache_manager.delete(key)
logger.info(f"Invalidated {len(keys_to_delete)} cache entries matching pattern: {pattern}")
return len(keys_to_delete)
@@ -28,7 +27,7 @@ def invalidate_cache_pattern(pattern: str):
def get_cache_health() -> dict:
"""Get cache health statistics"""
stats = cache_manager.get_stats()
# Determine health status
total_requests = stats["total_requests"]
if total_requests == 0:
@@ -44,21 +43,21 @@ def get_cache_health() -> dict:
health_status = "fair"
else:
health_status = "poor"
return {
"health_status": health_status,
"hit_rate_percent": hit_rate,
"total_entries": stats["total_entries"],
"total_requests": total_requests,
"memory_usage_mb": round(len(str(cache_manager._cache)) / 1024 / 1024, 2),
"last_cleanup": stats.get("last_cleanup", "never")
"last_cleanup": stats.get("last_cleanup", "never"),
}
# Cache invalidation strategies for different events
class CacheInvalidationStrategy:
"""Strategies for cache invalidation based on events"""
@staticmethod
def on_job_created(job_id: str):
"""Invalidate caches when a job is created"""
@@ -66,7 +65,7 @@ class CacheInvalidationStrategy:
invalidate_cache_pattern("jobs_")
invalidate_cache_pattern("admin_stats")
logger.info(f"Invalidated job-related caches for new job: {job_id}")
@staticmethod
def on_job_updated(job_id: str):
"""Invalidate caches when a job is updated"""
@@ -75,13 +74,13 @@ class CacheInvalidationStrategy:
invalidate_cache_pattern("jobs_")
invalidate_cache_pattern("admin_stats")
logger.info(f"Invalidated job caches for updated job: {job_id}")
@staticmethod
def on_marketplace_change():
"""Invalidate caches when marketplace data changes"""
invalidate_cache_pattern("marketplace_")
logger.info("Invalidated marketplace caches due to data change")
@staticmethod
def on_payment_created(payment_id: str):
"""Invalidate caches when a payment is created"""
@@ -89,11 +88,11 @@ class CacheInvalidationStrategy:
invalidate_cache_pattern("payment_")
invalidate_cache_pattern("admin_stats")
logger.info(f"Invalidated payment caches for new payment: {payment_id}")
@staticmethod
def on_payment_updated(payment_id: str):
"""Invalidate caches when a payment is updated"""
invalidate_cache_pattern(f"balance_")
invalidate_cache_pattern("balance_")
invalidate_cache_pattern(f"payment_{payment_id}")
logger.info(f"Invalidated payment caches for updated payment: {payment_id}")
@@ -105,18 +104,21 @@ async def cache_management_task():
try:
# Clean up expired entries
removed_count = cleanup_expired_cache()
# Log cache health periodically
if removed_count > 0:
health = get_cache_health()
logger.info(f"Cache cleanup completed: {removed_count} entries removed, "
f"hit rate: {health['hit_rate_percent']}%, "
f"entries: {health['total_entries']}")
logger.info(
f"Cache cleanup completed: {removed_count} entries removed, "
f"hit rate: {health['hit_rate_percent']}%, "
f"entries: {health['total_entries']}"
)
# Run cache management every 5 minutes
import asyncio
await asyncio.sleep(300)
except Exception as e:
logger.error(f"Cache management error: {e}")
await asyncio.sleep(60) # Retry after 1 minute on error
@@ -125,92 +127,95 @@ async def cache_management_task():
# Cache warming utilities for startup
class CacheWarmer:
"""Cache warming utilities for common endpoints"""
def __init__(self, session):
self.session = session
async def warm_common_queries(self):
"""Warm up cache with common queries"""
try:
logger.info("Starting cache warming...")
# Warm marketplace stats (most commonly accessed)
await self._warm_marketplace_stats()
# Warm admin stats
await self._warm_admin_stats()
# Warm exchange rates
await self._warm_exchange_rates()
logger.info("Cache warming completed successfully")
except Exception as e:
logger.error(f"Cache warming failed: {e}")
async def _warm_marketplace_stats(self):
"""Warm marketplace statistics cache"""
try:
from ..services.marketplace import MarketplaceService
service = MarketplaceService(self.session)
stats = service.get_stats()
# Manually cache the result
from ..utils.cache import cache_manager
cache_manager.set("marketplace_stats_get_marketplace_stats", stats, ttl_seconds=300)
logger.info("Marketplace stats cache warmed")
except Exception as e:
logger.warning(f"Failed to warm marketplace stats: {e}")
async def _warm_admin_stats(self):
"""Warm admin statistics cache"""
try:
from ..services import JobService, MinerService
from sqlmodel import func, select
from ..domain import Job
job_service = JobService(self.session)
from ..services import JobService, MinerService
JobService(self.session)
miner_service = MinerService(self.session)
# Simulate admin stats query
total_jobs = self.session.exec(select(func.count()).select_from(Job)).one()
active_jobs = self.session.exec(select(func.count()).select_from(Job).where(Job.state.in_(["QUEUED", "RUNNING"]))).one()
miners = miner_service.list_records()
active_jobs = self.session.exec(
select(func.count()).select_from(Job).where(Job.state.in_(["QUEUED", "RUNNING"]))
).one()
miner_service.list_records()
stats = {
"total_jobs": int(total_jobs or 0),
"active_jobs": int(active_jobs or 0),
"online_miners": miner_service.online_count(),
"avg_miner_job_duration_ms": 0,
}
# Manually cache the result
from ..utils.cache import cache_manager
cache_manager.set("job_list_get_stats", stats, ttl_seconds=60)
logger.info("Admin stats cache warmed")
except Exception as e:
logger.warning(f"Failed to warm admin stats: {e}")
async def _warm_exchange_rates(self):
"""Warm exchange rates cache"""
try:
# Mock exchange rates - in production this would call an exchange API
rates = {
"AITBC_BTC": 0.00001,
"AITBC_USD": 0.10,
"BTC_USD": 50000.0
}
rates = {"AITBC_BTC": 0.00001, "AITBC_USD": 0.10, "BTC_USD": 50000.0}
# Manually cache the result
from ..utils.cache import cache_manager
cache_manager.set("rates_current", rates, ttl_seconds=600)
logger.info("Exchange rates cache warmed")
except Exception as e:
logger.warning(f"Failed to warm exchange rates: {e}")

View File

@@ -2,62 +2,58 @@
Circuit breaker pattern for external services
"""
from enum import Enum
from datetime import datetime, timedelta
from typing import Any, Callable, Optional, Dict
from functools import wraps
import asyncio
import logging
from collections.abc import Callable
from datetime import datetime, timedelta
from enum import Enum
from functools import wraps
from typing import Any
logger = logging.getLogger(__name__)
class CircuitState(Enum):
"""Circuit breaker states"""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreakerError(Exception):
"""Custom exception for circuit breaker failures"""
pass
class CircuitBreaker:
"""Circuit breaker implementation for external service calls"""
def __init__(
self,
failure_threshold: int = 5,
timeout_seconds: int = 60,
expected_exception: type = Exception,
name: str = "circuit_breaker"
name: str = "circuit_breaker",
):
self.failure_threshold = failure_threshold
self.timeout_seconds = timeout_seconds
self.expected_exception = expected_exception
self.name = name
self.failures = 0
self.state = CircuitState.CLOSED
self.last_failure_time: Optional[datetime] = None
self.last_failure_time: datetime | None = None
self.success_count = 0
# Statistics
self.stats = {
"total_calls": 0,
"successful_calls": 0,
"failed_calls": 0,
"circuit_opens": 0,
"circuit_closes": 0
}
self.stats = {"total_calls": 0, "successful_calls": 0, "failed_calls": 0, "circuit_opens": 0, "circuit_closes": 0}
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
self.stats["total_calls"] += 1
# Check if circuit is open
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
@@ -66,34 +62,34 @@ class CircuitBreaker:
else:
self.stats["failed_calls"] += 1
raise CircuitBreakerError(f"Circuit breaker '{self.name}' is OPEN")
try:
# Execute the protected function
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
# Success - reset circuit if needed
self._on_success()
self.stats["successful_calls"] += 1
return result
except self.expected_exception as e:
# Expected failure - update circuit state
self._on_failure()
self.stats["failed_calls"] += 1
logger.warning(f"Circuit breaker '{self.name}' failure: {e}")
raise
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt circuit reset"""
if self.last_failure_time is None:
return True
return datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout_seconds)
def _on_success(self):
"""Handle successful call"""
if self.state == CircuitState.HALF_OPEN:
@@ -106,12 +102,12 @@ class CircuitBreaker:
elif self.state == CircuitState.CLOSED:
# Reset failure count on success in closed state
self.failures = 0
def _on_failure(self):
"""Handle failed call"""
self.failures += 1
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
# Failure in half-open - reopen circuit
self.state = CircuitState.OPEN
@@ -121,8 +117,8 @@ class CircuitBreaker:
self.state = CircuitState.OPEN
self.stats["circuit_opens"] += 1
logger.error(f"Circuit breaker '{self.name}' OPEN after {self.failures} failures")
def get_state(self) -> Dict[str, Any]:
def get_state(self) -> dict[str, Any]:
"""Get current circuit breaker state and statistics"""
return {
"name": self.name,
@@ -133,11 +129,10 @@ class CircuitBreaker:
"last_failure_time": self.last_failure_time.isoformat() if self.last_failure_time else None,
"stats": self.stats.copy(),
"success_rate": (
(self.stats["successful_calls"] / self.stats["total_calls"] * 100)
if self.stats["total_calls"] > 0 else 0
)
(self.stats["successful_calls"] / self.stats["total_calls"] * 100) if self.stats["total_calls"] > 0 else 0
),
}
def reset(self):
"""Manually reset circuit breaker to closed state"""
self.state = CircuitState.CLOSED
@@ -148,87 +143,73 @@ class CircuitBreaker:
def circuit_breaker(
failure_threshold: int = 5,
timeout_seconds: int = 60,
expected_exception: type = Exception,
name: str = None
failure_threshold: int = 5, timeout_seconds: int = 60, expected_exception: type = Exception, name: str = None
):
"""Decorator for adding circuit breaker protection to functions"""
def decorator(func):
breaker_name = name or f"{func.__module__}.{func.__name__}"
breaker = CircuitBreaker(
failure_threshold=failure_threshold,
timeout_seconds=timeout_seconds,
expected_exception=expected_exception,
name=breaker_name
name=breaker_name,
)
# Store breaker on function for access to stats
func._circuit_breaker = breaker
@wraps(func)
async def async_wrapper(*args, **kwargs):
return await breaker.call(func, *args, **kwargs)
@wraps(func)
def sync_wrapper(*args, **kwargs):
return asyncio.run(breaker.call(func, *args, **kwargs))
# Return appropriate wrapper
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
# Pre-configured circuit breakers for common external services
class CircuitBreakers:
"""Collection of pre-configured circuit breakers"""
def __init__(self):
# Blockchain RPC circuit breaker
self.blockchain_rpc = CircuitBreaker(
failure_threshold=3,
timeout_seconds=30,
expected_exception=ConnectionError,
name="blockchain_rpc"
failure_threshold=3, timeout_seconds=30, expected_exception=ConnectionError, name="blockchain_rpc"
)
# Exchange API circuit breaker
self.exchange_api = CircuitBreaker(
failure_threshold=5,
timeout_seconds=60,
expected_exception=Exception,
name="exchange_api"
failure_threshold=5, timeout_seconds=60, expected_exception=Exception, name="exchange_api"
)
# Wallet daemon circuit breaker
self.wallet_daemon = CircuitBreaker(
failure_threshold=3,
timeout_seconds=45,
expected_exception=ConnectionError,
name="wallet_daemon"
failure_threshold=3, timeout_seconds=45, expected_exception=ConnectionError, name="wallet_daemon"
)
# External payment processor circuit breaker
self.payment_processor = CircuitBreaker(
failure_threshold=2,
timeout_seconds=120,
expected_exception=Exception,
name="payment_processor"
failure_threshold=2, timeout_seconds=120, expected_exception=Exception, name="payment_processor"
)
def get_all_states(self) -> Dict[str, Dict[str, Any]]:
def get_all_states(self) -> dict[str, dict[str, Any]]:
"""Get state of all circuit breakers"""
return {
"blockchain_rpc": self.blockchain_rpc.get_state(),
"exchange_api": self.exchange_api.get_state(),
"wallet_daemon": self.wallet_daemon.get_state(),
"payment_processor": self.payment_processor.get_state()
"payment_processor": self.payment_processor.get_state(),
}
def reset_all(self):
"""Reset all circuit breakers"""
self.blockchain_rpc.reset()
@@ -245,31 +226,24 @@ circuit_breakers = CircuitBreakers()
# Usage examples and utilities
class ProtectedServiceClient:
"""Example of a service client with circuit breaker protection"""
def __init__(self, base_url: str):
self.base_url = base_url
self.circuit_breaker = CircuitBreaker(
failure_threshold=3,
timeout_seconds=60,
name=f"service_client_{base_url}"
)
self.circuit_breaker = CircuitBreaker(failure_threshold=3, timeout_seconds=60, name=f"service_client_{base_url}")
@circuit_breaker(failure_threshold=3, timeout_seconds=60)
async def call_api(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
async def call_api(self, endpoint: str, data: dict[str, Any]) -> dict[str, Any]:
"""Protected API call"""
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(f"{self.base_url}{endpoint}", json=data)
response.raise_for_status()
return response.json()
def get_health_status(self) -> Dict[str, Any]:
def get_health_status(self) -> dict[str, Any]:
"""Get health status including circuit breaker state"""
return {
"service_url": self.base_url,
"circuit_breaker": self.circuit_breaker.get_state()
}
return {"service_url": self.base_url, "circuit_breaker": self.circuit_breaker.get_state()}
# FastAPI endpoint for circuit breaker monitoring
@@ -284,15 +258,15 @@ async def reset_circuit_breaker(breaker_name: str):
"blockchain_rpc": circuit_breakers.blockchain_rpc,
"exchange_api": circuit_breakers.exchange_api,
"wallet_daemon": circuit_breakers.wallet_daemon,
"payment_processor": circuit_breakers.payment_processor
"payment_processor": circuit_breakers.payment_processor,
}
if breaker_name not in breaker_map:
raise ValueError(f"Unknown circuit breaker: {breaker_name}")
breaker_map[breaker_name].reset()
logger.info(f"Circuit breaker '{breaker_name}' reset via admin API")
return {"status": "reset", "breaker": breaker_name}
@@ -302,24 +276,24 @@ async def monitor_circuit_breakers():
while True:
try:
states = circuit_breakers.get_all_states()
# Log any open circuits
for name, state in states.items():
if state["state"] == "open":
logger.warning(f"Circuit breaker '{name}' is OPEN - check service health")
elif state["state"] == "half_open":
logger.info(f"Circuit breaker '{name}' is HALF_OPEN - testing recovery")
# Check for circuits with high failure rates
for name, state in states.items():
if state["stats"]["total_calls"] > 10: # Only check if enough calls
success_rate = state["success_rate"]
if success_rate < 80: # Less than 80% success rate
logger.warning(f"Circuit breaker '{name}' has low success rate: {success_rate:.1f}%")
# Run monitoring every 30 seconds
await asyncio.sleep(30)
except Exception as e:
logger.error(f"Circuit breaker monitoring error: {e}")
await asyncio.sleep(60) # Retry after 1 minute on error