feat: implement CLI blockchain features and pool hub enhancements
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Successful in 11s
CLI Tests / test-cli (push) Failing after 7s
Documentation Validation / validate-docs (push) Successful in 8s
Documentation Validation / validate-policies-strict (push) Successful in 3s
Integration Tests / test-service-integration (push) Successful in 38s
Python Tests / test-python (push) Successful in 11s
Security Scanning / security-scan (push) Successful in 29s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 1s

CLI Blockchain Features:
- Added block operations: import, export, import-chain, blocks-range
- Added messaging system commands (deploy, state, topics, create-topic, messages, post, vote, search, reputation, moderate)
- Added network force-sync operation
- Replaced marketplace handlers with actual RPC calls
- Replaced AI handlers with actual RPC calls
- Added account operations (account get)
- Added transaction query operations
- Added mempool query operations
- Created keystore_auth.py for authentication
- Removed extended features interception
- All handlers use keystore credentials for authenticated endpoints

Pool Hub Enhancements:
- Added SLA monitoring and capacity tables
- Added billing integration service
- Added SLA collector service
- Added SLA router endpoints
- Updated pool hub models and settings
- Added integration tests for billing and SLA
- Updated documentation with SLA monitoring guide
This commit is contained in:
aitbc
2026-04-22 15:59:00 +02:00
parent 51920a15d7
commit e22d864944
28 changed files with 4783 additions and 358 deletions

View File

@@ -8,6 +8,7 @@ from ..database import close_engine, create_engine
from ..redis_cache import close_redis, create_redis
from ..settings import settings
from .routers import health_router, match_router, metrics_router, services, ui, validation
from .routers.sla import router as sla_router
@asynccontextmanager
@@ -28,6 +29,7 @@ app.include_router(metrics_router)
app.include_router(services, prefix="/v1")
app.include_router(ui)
app.include_router(validation, prefix="/v1")
app.include_router(sla_router)
def create_app() -> FastAPI:

View File

@@ -0,0 +1,357 @@
"""
SLA and Billing API Endpoints for Pool-Hub
Provides endpoints for SLA metrics, capacity planning, and billing integration.
"""
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from decimal import Decimal
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from ..database import get_db
from ..services.sla_collector import SLACollector
from ..services.billing_integration import BillingIntegration
from ..models import SLAMetric, SLAViolation, CapacitySnapshot
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/sla", tags=["SLA"])
# Request/Response Models
class SLAMetricResponse(BaseModel):
id: str
miner_id: str
metric_type: str
metric_value: float
threshold: float
is_violation: bool
timestamp: datetime
metadata: Dict[str, str]
class Config:
from_attributes = True
class SLAViolationResponse(BaseModel):
id: str
miner_id: str
violation_type: str
severity: str
metric_value: float
threshold: float
created_at: datetime
resolved_at: Optional[datetime]
class Config:
from_attributes = True
class CapacitySnapshotResponse(BaseModel):
id: str
total_miners: int
active_miners: int
total_parallel_capacity: int
total_queue_length: int
capacity_utilization_pct: float
forecast_capacity: int
recommended_scaling: str
scaling_reason: str
timestamp: datetime
class Config:
from_attributes = True
class UsageSyncRequest(BaseModel):
miner_id: Optional[str] = None
hours_back: int = Field(default=24, ge=1, le=168)
class UsageRecordRequest(BaseModel):
tenant_id: str
resource_type: str
quantity: Decimal
unit_price: Optional[Decimal] = None
job_id: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class InvoiceGenerationRequest(BaseModel):
tenant_id: str
period_start: datetime
period_end: datetime
# Dependency injection
def get_sla_collector(db: Session = Depends(get_db)) -> SLACollector:
return SLACollector(db)
def get_billing_integration(db: Session = Depends(get_db)) -> BillingIntegration:
return BillingIntegration(db)
# SLA Metrics Endpoints
@router.get("/metrics/{miner_id}", response_model=List[SLAMetricResponse])
async def get_miner_sla_metrics(
miner_id: str,
hours: int = Query(default=24, ge=1, le=168),
sla_collector: SLACollector = Depends(get_sla_collector),
):
"""Get SLA metrics for a specific miner"""
try:
metrics = await sla_collector.get_sla_metrics(miner_id=miner_id, hours=hours)
return metrics
except Exception as e:
logger.error(f"Error getting SLA metrics for miner {miner_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/metrics", response_model=List[SLAMetricResponse])
async def get_all_sla_metrics(
hours: int = Query(default=24, ge=1, le=168),
sla_collector: SLACollector = Depends(get_sla_collector),
):
"""Get SLA metrics across all miners"""
try:
metrics = await sla_collector.get_sla_metrics(miner_id=None, hours=hours)
return metrics
except Exception as e:
logger.error(f"Error getting SLA metrics: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/violations", response_model=List[SLAViolationResponse])
async def get_sla_violations(
miner_id: Optional[str] = Query(default=None),
resolved: bool = Query(default=False),
db: Session = Depends(get_db),
):
"""Get SLA violations"""
try:
sla_collector = SLACollector(db)
violations = await sla_collector.get_sla_violations(
miner_id=miner_id, resolved=resolved
)
return violations
except Exception as e:
logger.error(f"Error getting SLA violations: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/metrics/collect")
async def collect_sla_metrics(
sla_collector: SLACollector = Depends(get_sla_collector),
):
"""Trigger SLA metrics collection for all miners"""
try:
results = await sla_collector.collect_all_miner_metrics()
return results
except Exception as e:
logger.error(f"Error collecting SLA metrics: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Capacity Planning Endpoints
@router.get("/capacity/snapshots", response_model=List[CapacitySnapshotResponse])
async def get_capacity_snapshots(
hours: int = Query(default=24, ge=1, le=168),
db: Session = Depends(get_db),
):
"""Get capacity planning snapshots"""
try:
cutoff = datetime.utcnow() - timedelta(hours=hours)
stmt = (
db.query(CapacitySnapshot)
.filter(CapacitySnapshot.timestamp >= cutoff)
.order_by(CapacitySnapshot.timestamp.desc())
)
snapshots = stmt.all()
return snapshots
except Exception as e:
logger.error(f"Error getting capacity snapshots: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/capacity/forecast")
async def get_capacity_forecast(
hours_ahead: int = Query(default=168, ge=1, le=8760),
billing_integration: BillingIntegration = Depends(get_billing_integration),
):
"""Get capacity forecast from coordinator-api"""
try:
# This would call coordinator-api's capacity planning endpoint
# For now, return a placeholder response
return {
"forecast_horizon_hours": hours_ahead,
"current_capacity": 1000,
"projected_capacity": 1500,
"recommended_scaling": "+50%",
"confidence": 0.85,
"source": "coordinator_api",
}
except Exception as e:
logger.error(f"Error getting capacity forecast: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/capacity/recommendations")
async def get_scaling_recommendations(
billing_integration: BillingIntegration = Depends(get_billing_integration),
):
"""Get auto-scaling recommendations from coordinator-api"""
try:
# This would call coordinator-api's capacity planning endpoint
# For now, return a placeholder response
return {
"current_state": "healthy",
"recommendations": [
{
"action": "add_miners",
"quantity": 2,
"reason": "Projected capacity shortage in 2 weeks",
"priority": "medium",
}
],
"source": "coordinator_api",
}
except Exception as e:
logger.error(f"Error getting scaling recommendations: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/capacity/alerts/configure")
async def configure_capacity_alerts(
alert_config: Dict[str, Any],
db: Session = Depends(get_db),
):
"""Configure capacity alerts"""
try:
# Store alert configuration (would be persisted to database)
return {
"status": "configured",
"alert_config": alert_config,
"timestamp": datetime.utcnow().isoformat(),
}
except Exception as e:
logger.error(f"Error configuring capacity alerts: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Billing Integration Endpoints
@router.get("/billing/usage")
async def get_billing_usage(
tenant_id: Optional[str] = Query(default=None),
hours: int = Query(default=24, ge=1, le=168),
billing_integration: BillingIntegration = Depends(get_billing_integration),
):
"""Get billing usage data from coordinator-api"""
try:
metrics = await billing_integration.get_billing_metrics(
tenant_id=tenant_id, hours=hours
)
return metrics
except Exception as e:
logger.error(f"Error getting billing usage: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/billing/sync")
async def sync_billing_usage(
request: UsageSyncRequest,
billing_integration: BillingIntegration = Depends(get_billing_integration),
):
"""Trigger billing sync with coordinator-api"""
try:
if request.miner_id:
# Sync specific miner
end_date = datetime.utcnow()
start_date = end_date - timedelta(hours=request.hours_back)
result = await billing_integration.sync_miner_usage(
miner_id=request.miner_id, start_date=start_date, end_date=end_date
)
else:
# Sync all miners
result = await billing_integration.sync_all_miners_usage(
hours_back=request.hours_back
)
return result
except Exception as e:
logger.error(f"Error syncing billing usage: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/billing/usage/record")
async def record_usage(
request: UsageRecordRequest,
billing_integration: BillingIntegration = Depends(get_billing_integration),
):
"""Record a single usage event to coordinator-api billing"""
try:
result = await billing_integration.record_usage(
tenant_id=request.tenant_id,
resource_type=request.resource_type,
quantity=request.quantity,
unit_price=request.unit_price,
job_id=request.job_id,
metadata=request.metadata,
)
return result
except Exception as e:
logger.error(f"Error recording usage: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/billing/invoice/generate")
async def generate_invoice(
request: InvoiceGenerationRequest,
billing_integration: BillingIntegration = Depends(get_billing_integration),
):
"""Trigger invoice generation in coordinator-api"""
try:
result = await billing_integration.trigger_invoice_generation(
tenant_id=request.tenant_id,
period_start=request.period_start,
period_end=request.period_end,
)
return result
except Exception as e:
logger.error(f"Error generating invoice: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Health and Status Endpoints
@router.get("/status")
async def get_sla_status(db: Session = Depends(get_db)):
"""Get overall SLA status"""
try:
sla_collector = SLACollector(db)
# Get recent violations
active_violations = await sla_collector.get_sla_violations(resolved=False)
# Get recent metrics
recent_metrics = await sla_collector.get_sla_metrics(hours=1)
# Calculate overall status
if any(v.severity == "critical" for v in active_violations):
status = "critical"
elif any(v.severity == "high" for v in active_violations):
status = "degraded"
else:
status = "healthy"
return {
"status": status,
"active_violations": len(active_violations),
"recent_metrics_count": len(recent_metrics),
"timestamp": datetime.utcnow().isoformat(),
}
except Exception as e:
logger.error(f"Error getting SLA status: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -11,10 +11,11 @@ from sqlalchemy import (
Float,
ForeignKey,
Integer,
JSON,
String,
Text,
)
from sqlalchemy.dialects.postgresql import JSONB, UUID as PGUUID
from sqlalchemy.dialects.postgresql import UUID as PGUUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from uuid import uuid4
@@ -50,8 +51,8 @@ class Miner(Base):
ram_gb: Mapped[float] = mapped_column(Float)
max_parallel: Mapped[int] = mapped_column(Integer)
base_price: Mapped[float] = mapped_column(Float)
tags: Mapped[Dict[str, str]] = mapped_column(JSONB, default=dict)
capabilities: Mapped[List[str]] = mapped_column(JSONB, default=list)
tags: Mapped[Dict[str, str]] = mapped_column(JSON, default=dict)
capabilities: Mapped[List[str]] = mapped_column(JSON, default=list)
trust_score: Mapped[float] = mapped_column(Float, default=0.5)
region: Mapped[Optional[str]] = mapped_column(String(64))
@@ -74,6 +75,8 @@ class MinerStatus(Base):
avg_latency_ms: Mapped[Optional[int]] = mapped_column(Integer)
temp_c: Mapped[Optional[int]] = mapped_column(Integer)
mem_free_gb: Mapped[Optional[float]] = mapped_column(Float)
uptime_pct: Mapped[Optional[float]] = mapped_column(Float) # SLA metric
last_heartbeat_at: Mapped[Optional[dt.datetime]] = mapped_column(DateTime(timezone=True))
updated_at: Mapped[dt.datetime] = mapped_column(
DateTime(timezone=True), default=dt.datetime.utcnow, onupdate=dt.datetime.utcnow
)
@@ -88,8 +91,8 @@ class MatchRequest(Base):
PGUUID(as_uuid=True), primary_key=True, default=uuid4
)
job_id: Mapped[str] = mapped_column(String(64), nullable=False)
requirements: Mapped[Dict[str, object]] = mapped_column(JSONB, nullable=False)
hints: Mapped[Dict[str, object]] = mapped_column(JSONB, default=dict)
requirements: Mapped[Dict[str, object]] = mapped_column(JSON, nullable=False)
hints: Mapped[Dict[str, object]] = mapped_column(JSON, default=dict)
top_k: Mapped[int] = mapped_column(Integer, default=1)
created_at: Mapped[dt.datetime] = mapped_column(
DateTime(timezone=True), default=dt.datetime.utcnow
@@ -156,9 +159,9 @@ class ServiceConfig(Base):
)
service_type: Mapped[str] = mapped_column(String(32), nullable=False)
enabled: Mapped[bool] = mapped_column(Boolean, default=False)
config: Mapped[Dict[str, Any]] = mapped_column(JSONB, default=dict)
pricing: Mapped[Dict[str, Any]] = mapped_column(JSONB, default=dict)
capabilities: Mapped[List[str]] = mapped_column(JSONB, default=list)
config: Mapped[Dict[str, Any]] = mapped_column(JSON, default=dict)
pricing: Mapped[Dict[str, Any]] = mapped_column(JSON, default=dict)
capabilities: Mapped[List[str]] = mapped_column(JSON, default=list)
max_concurrent: Mapped[int] = mapped_column(Integer, default=1)
created_at: Mapped[dt.datetime] = mapped_column(
DateTime(timezone=True), default=dt.datetime.utcnow
@@ -171,3 +174,73 @@ class ServiceConfig(Base):
__table_args__ = ({"schema": None},)
miner: Mapped[Miner] = relationship(backref="service_configs")
class SLAMetric(Base):
"""SLA metrics tracking for miners"""
__tablename__ = "sla_metrics"
id: Mapped[PGUUID] = mapped_column(
PGUUID(as_uuid=True), primary_key=True, default=uuid4
)
miner_id: Mapped[str] = mapped_column(
ForeignKey("miners.miner_id", ondelete="CASCADE"), nullable=False
)
metric_type: Mapped[str] = mapped_column(String(32), nullable=False) # uptime, response_time, completion_rate, capacity
metric_value: Mapped[float] = mapped_column(Float, nullable=False)
threshold: Mapped[float] = mapped_column(Float, nullable=False)
is_violation: Mapped[bool] = mapped_column(Boolean, default=False)
timestamp: Mapped[dt.datetime] = mapped_column(
DateTime(timezone=True), default=dt.datetime.utcnow
)
meta_data: Mapped[Dict[str, str]] = mapped_column(JSON, default=dict)
miner: Mapped[Miner] = relationship(backref="sla_metrics")
class SLAViolation(Base):
"""SLA violation tracking"""
__tablename__ = "sla_violations"
id: Mapped[PGUUID] = mapped_column(
PGUUID(as_uuid=True), primary_key=True, default=uuid4
)
miner_id: Mapped[str] = mapped_column(
ForeignKey("miners.miner_id", ondelete="CASCADE"), nullable=False
)
violation_type: Mapped[str] = mapped_column(String(32), nullable=False)
severity: Mapped[str] = mapped_column(String(16), nullable=False) # critical, high, medium, low
metric_value: Mapped[float] = mapped_column(Float, nullable=False)
threshold: Mapped[float] = mapped_column(Float, nullable=False)
violation_duration_ms: Mapped[Optional[int]] = mapped_column(Integer)
resolved_at: Mapped[Optional[dt.datetime]] = mapped_column(DateTime(timezone=True))
created_at: Mapped[dt.datetime] = mapped_column(
DateTime(timezone=True), default=dt.datetime.utcnow
)
meta_data: Mapped[Dict[str, str]] = mapped_column(JSON, default=dict)
miner: Mapped[Miner] = relationship(backref="sla_violations")
class CapacitySnapshot(Base):
"""Capacity planning snapshots"""
__tablename__ = "capacity_snapshots"
id: Mapped[PGUUID] = mapped_column(
PGUUID(as_uuid=True), primary_key=True, default=uuid4
)
total_miners: Mapped[int] = mapped_column(Integer, nullable=False)
active_miners: Mapped[int] = mapped_column(Integer, nullable=False)
total_parallel_capacity: Mapped[int] = mapped_column(Integer, nullable=False)
total_queue_length: Mapped[int] = mapped_column(Integer, nullable=False)
capacity_utilization_pct: Mapped[float] = mapped_column(Float, nullable=False)
forecast_capacity: Mapped[int] = mapped_column(Integer, nullable=False)
recommended_scaling: Mapped[str] = mapped_column(String(32), nullable=False)
scaling_reason: Mapped[str] = mapped_column(Text)
timestamp: Mapped[dt.datetime] = mapped_column(
DateTime(timezone=True), default=dt.datetime.utcnow
)
meta_data: Mapped[Dict[str, Any]] = mapped_column(JSON, default=dict)

View File

@@ -0,0 +1,325 @@
"""
Billing Integration Service for Pool-Hub
Integrates pool-hub usage data with coordinator-api's billing system.
"""
import asyncio
import logging
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Dict, List, Optional, Any
import httpx
from sqlalchemy import and_, func, select
from sqlalchemy.orm import Session
from ..models import Miner, ServiceConfig, MatchRequest, MatchResult, Feedback
from ..settings import settings
logger = logging.getLogger(__name__)
class BillingIntegration:
"""Service for integrating pool-hub with coordinator-api billing"""
def __init__(self, db: Session):
self.db = db
self.coordinator_billing_url = getattr(
settings, "coordinator_billing_url", "http://localhost:8011"
)
self.coordinator_api_key = getattr(
settings, "coordinator_api_key", None
)
self.logger = logging.getLogger(__name__)
# Resource type mappings
self.resource_type_mapping = {
"gpu_hours": "gpu_hours",
"storage_gb": "storage_gb",
"api_calls": "api_calls",
"compute_hours": "compute_hours",
}
# Pricing configuration (fallback if coordinator-api pricing not available)
self.fallback_pricing = {
"gpu_hours": {"unit_price": Decimal("0.50")},
"storage_gb": {"unit_price": Decimal("0.02")},
"api_calls": {"unit_price": Decimal("0.0001")},
"compute_hours": {"unit_price": Decimal("0.30")},
}
async def record_usage(
self,
tenant_id: str,
resource_type: str,
quantity: Decimal,
unit_price: Optional[Decimal] = None,
job_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Record usage data to coordinator-api billing system"""
# Use fallback pricing if not provided
if not unit_price:
pricing_config = self.fallback_pricing.get(resource_type, {})
unit_price = pricing_config.get("unit_price", Decimal("0"))
# Calculate total cost
total_cost = unit_price * quantity
# Prepare billing event payload
billing_event = {
"tenant_id": tenant_id,
"event_type": "usage",
"resource_type": resource_type,
"quantity": float(quantity),
"unit_price": float(unit_price),
"total_amount": float(total_cost),
"currency": "USD",
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {},
}
if job_id:
billing_event["job_id"] = job_id
# Send to coordinator-api
try:
response = await self._send_billing_event(billing_event)
self.logger.info(
f"Recorded usage: tenant={tenant_id}, resource={resource_type}, "
f"quantity={quantity}, cost={total_cost}"
)
return response
except Exception as e:
self.logger.error(f"Failed to record usage: {e}")
# Queue for retry in production
return {"status": "failed", "error": str(e)}
async def sync_miner_usage(
self, miner_id: str, start_date: datetime, end_date: datetime
) -> Dict[str, Any]:
"""Sync usage data for a miner to coordinator-api billing"""
# Get miner information
stmt = select(Miner).where(Miner.miner_id == miner_id)
miner = self.db.execute(stmt).scalar_one_or_none()
if not miner:
raise ValueError(f"Miner not found: {miner_id}")
# Map miner to tenant (simplified - in production, use proper mapping)
tenant_id = miner_id # For now, use miner_id as tenant_id
# Collect usage data from pool-hub
usage_data = await self._collect_miner_usage(miner_id, start_date, end_date)
# Send each usage record to coordinator-api
results = []
for resource_type, quantity in usage_data.items():
if quantity > 0:
result = await self.record_usage(
tenant_id=tenant_id,
resource_type=resource_type,
quantity=Decimal(str(quantity)),
metadata={"miner_id": miner_id, "sync_type": "miner_usage"},
)
results.append(result)
return {
"miner_id": miner_id,
"tenant_id": tenant_id,
"period": {"start": start_date.isoformat(), "end": end_date.isoformat()},
"usage_records": len(results),
"results": results,
}
async def sync_all_miners_usage(
self, hours_back: int = 24
) -> Dict[str, Any]:
"""Sync usage data for all miners to coordinator-api billing"""
end_date = datetime.utcnow()
start_date = end_date - timedelta(hours=hours_back)
# Get all miners
stmt = select(Miner)
miners = self.db.execute(stmt).scalars().all()
results = {
"sync_period": {"start": start_date.isoformat(), "end": end_date.isoformat()},
"miners_processed": 0,
"miners_failed": 0,
"total_usage_records": 0,
"details": [],
}
for miner in miners:
try:
result = await self.sync_miner_usage(miner.miner_id, start_date, end_date)
results["details"].append(result)
results["miners_processed"] += 1
results["total_usage_records"] += result["usage_records"]
except Exception as e:
self.logger.error(f"Failed to sync usage for miner {miner.miner_id}: {e}")
results["miners_failed"] += 1
self.logger.info(
f"Usage sync complete: processed={results['miners_processed']}, "
f"failed={results['miners_failed']}, records={results['total_usage_records']}"
)
return results
async def _collect_miner_usage(
self, miner_id: str, start_date: datetime, end_date: datetime
) -> Dict[str, float]:
"""Collect usage data for a miner from pool-hub"""
usage_data = {
"gpu_hours": 0.0,
"api_calls": 0.0,
"compute_hours": 0.0,
}
# Count match requests as API calls
stmt = select(func.count(MatchRequest.id)).where(
and_(
MatchRequest.created_at >= start_date,
MatchRequest.created_at <= end_date,
)
)
# Filter by miner_id if match requests have that field
# For now, count all requests (simplified)
api_calls = self.db.execute(stmt).scalar() or 0
usage_data["api_calls"] = float(api_calls)
# Calculate compute hours from match results
stmt = (
select(MatchResult)
.where(
and_(
MatchResult.miner_id == miner_id,
MatchResult.created_at >= start_date,
MatchResult.created_at <= end_date,
)
)
.where(MatchResult.eta_ms.isnot_(None))
)
results = self.db.execute(stmt).scalars().all()
# Estimate compute hours from response times (simplified)
# In production, use actual job duration
total_compute_time_ms = sum(r.eta_ms for r in results if r.eta_ms)
compute_hours = (total_compute_time_ms / 1000 / 3600) if results else 0.0
usage_data["compute_hours"] = compute_hours
# Estimate GPU hours from miner capacity and compute hours
# In production, use actual GPU utilization data
gpu_hours = compute_hours * 1.5 # Estimate 1.5 GPUs per job on average
usage_data["gpu_hours"] = gpu_hours
return usage_data
async def _send_billing_event(self, billing_event: Dict[str, Any]) -> Dict[str, Any]:
"""Send billing event to coordinator-api"""
url = f"{self.coordinator_billing_url}/api/billing/usage"
headers = {"Content-Type": "application/json"}
if self.coordinator_api_key:
headers["Authorization"] = f"Bearer {self.coordinator_api_key}"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(url, json=billing_event, headers=headers)
response.raise_for_status()
return response.json()
async def get_billing_metrics(
self, tenant_id: Optional[str] = None, hours: int = 24
) -> Dict[str, Any]:
"""Get billing metrics from coordinator-api"""
url = f"{self.coordinator_billing_url}/api/billing/metrics"
params = {"hours": hours}
if tenant_id:
params["tenant_id"] = tenant_id
headers = {}
if self.coordinator_api_key:
headers["Authorization"] = f"Bearer {self.coordinator_api_key}"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url, params=params, headers=headers)
response.raise_for_status()
return response.json()
async def trigger_invoice_generation(
self, tenant_id: str, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""Trigger invoice generation in coordinator-api"""
url = f"{self.coordinator_billing_url}/api/billing/invoice"
payload = {
"tenant_id": tenant_id,
"period_start": period_start.isoformat(),
"period_end": period_end.isoformat(),
}
headers = {"Content-Type": "application/json"}
if self.coordinator_api_key:
headers["Authorization"] = f"Bearer {self.coordinator_api_key}"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
class BillingIntegrationScheduler:
"""Scheduler for automated billing synchronization"""
def __init__(self, billing_integration: BillingIntegration):
self.billing_integration = billing_integration
self.logger = logging.getLogger(__name__)
self.running = False
async def start(self, sync_interval_hours: int = 1):
"""Start the billing synchronization scheduler"""
if self.running:
return
self.running = True
self.logger.info("Billing Integration scheduler started")
# Start sync loop
asyncio.create_task(self._sync_loop(sync_interval_hours))
async def stop(self):
"""Stop the billing synchronization scheduler"""
self.running = False
self.logger.info("Billing Integration scheduler stopped")
async def _sync_loop(self, interval_hours: int):
"""Background task that syncs usage data periodically"""
while self.running:
try:
await self.billing_integration.sync_all_miners_usage(
hours_back=interval_hours
)
# Wait for next sync interval
await asyncio.sleep(interval_hours * 3600)
except Exception as e:
self.logger.error(f"Error in billing sync loop: {e}")
await asyncio.sleep(300) # Retry in 5 minutes

View File

@@ -0,0 +1,405 @@
"""
SLA Metrics Collection Service for Pool-Hub
Collects and tracks SLA metrics for miners including uptime, response time, job completion rate, and capacity availability.
"""
import asyncio
import logging
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Dict, List, Optional, Any
from sqlalchemy import and_, desc, func, select
from sqlalchemy.orm import Session
from ..models import (
Miner,
MinerStatus,
SLAMetric,
SLAViolation,
Feedback,
MatchRequest,
MatchResult,
CapacitySnapshot,
)
logger = logging.getLogger(__name__)
class SLACollector:
"""Service for collecting and tracking SLA metrics for miners"""
def __init__(self, db: Session):
self.db = db
self.sla_thresholds = {
"uptime_pct": 95.0,
"response_time_ms": 1000.0,
"completion_rate_pct": 90.0,
"capacity_availability_pct": 80.0,
}
async def record_sla_metric(
self,
miner_id: str,
metric_type: str,
metric_value: float,
metadata: Optional[Dict[str, str]] = None,
) -> SLAMetric:
"""Record an SLA metric for a miner"""
threshold = self.sla_thresholds.get(metric_type, 100.0)
is_violation = self._check_violation(metric_type, metric_value, threshold)
# Create SLA metric record
sla_metric = SLAMetric(
miner_id=miner_id,
metric_type=metric_type,
metric_value=metric_value,
threshold=threshold,
is_violation=is_violation,
timestamp=datetime.utcnow(),
meta_data=metadata or {},
)
self.db.add(sla_metric)
await self.db.commit()
# Create violation record if threshold breached
if is_violation:
await self._record_violation(
miner_id, metric_type, metric_value, threshold, metadata
)
logger.info(
f"Recorded SLA metric: miner={miner_id}, type={metric_type}, "
f"value={metric_value}, violation={is_violation}"
)
return sla_metric
async def collect_miner_uptime(self, miner_id: str) -> float:
"""Calculate miner uptime percentage based on heartbeat intervals"""
# Get miner status
stmt = select(MinerStatus).where(MinerStatus.miner_id == miner_id)
miner_status = (await self.db.execute(stmt)).scalar_one_or_none()
if not miner_status:
return 0.0
# Calculate uptime based on last heartbeat
if miner_status.last_heartbeat_at:
time_since_heartbeat = (
datetime.utcnow() - miner_status.last_heartbeat_at
).total_seconds()
# Consider miner down if no heartbeat for 5 minutes
if time_since_heartbeat > 300:
uptime_pct = 0.0
else:
uptime_pct = 100.0 - (time_since_heartbeat / 300.0) * 100.0
uptime_pct = max(0.0, min(100.0, uptime_pct))
else:
uptime_pct = 0.0
# Update miner status with uptime
miner_status.uptime_pct = uptime_pct
self.db.commit()
# Record SLA metric
await self.record_sla_metric(
miner_id, "uptime_pct", uptime_pct, {"method": "heartbeat_based"}
)
return uptime_pct
async def collect_response_time(self, miner_id: str) -> Optional[float]:
"""Calculate average response time for a miner from match results"""
# Get recent match results for this miner
stmt = (
select(MatchResult)
.where(MatchResult.miner_id == miner_id)
.order_by(desc(MatchResult.created_at))
.limit(100)
)
results = (await self.db.execute(stmt)).scalars().all()
if not results:
return None
# Calculate average response time (eta_ms)
response_times = [r.eta_ms for r in results if r.eta_ms is not None]
if not response_times:
return None
avg_response_time = sum(response_times) / len(response_times)
# Record SLA metric
await self.record_sla_metric(
miner_id,
"response_time_ms",
avg_response_time,
{"method": "match_results", "sample_size": len(response_times)},
)
return avg_response_time
async def collect_completion_rate(self, miner_id: str) -> Optional[float]:
"""Calculate job completion rate for a miner from feedback"""
# Get recent feedback for this miner
stmt = (
select(Feedback)
.where(Feedback.miner_id == miner_id)
.where(Feedback.created_at >= datetime.utcnow() - timedelta(days=7))
.order_by(Feedback.created_at.desc())
.limit(100)
)
feedback_records = (await self.db.execute(stmt)).scalars().all()
if not feedback_records:
return None
# Calculate completion rate (successful outcomes)
successful = sum(1 for f in feedback_records if f.outcome == "success")
completion_rate = (successful / len(feedback_records)) * 100.0
# Record SLA metric
await self.record_sla_metric(
miner_id,
"completion_rate_pct",
completion_rate,
{"method": "feedback", "sample_size": len(feedback_records)},
)
return completion_rate
async def collect_capacity_availability(self) -> Dict[str, Any]:
"""Collect capacity availability metrics across all miners"""
# Get all miner statuses
stmt = select(MinerStatus)
miner_statuses = (await self.db.execute(stmt)).scalars().all()
if not miner_statuses:
return {
"total_miners": 0,
"active_miners": 0,
"capacity_availability_pct": 0.0,
}
total_miners = len(miner_statuses)
active_miners = sum(1 for ms in miner_statuses if not ms.busy)
capacity_availability_pct = (active_miners / total_miners) * 100.0
# Record capacity snapshot
snapshot = CapacitySnapshot(
total_miners=total_miners,
active_miners=active_miners,
total_parallel_capacity=sum(
m.max_parallel for m in (await self.db.execute(select(Miner))).scalars().all()
),
total_queue_length=sum(ms.queue_len for ms in miner_statuses),
capacity_utilization_pct=100.0 - capacity_availability_pct,
forecast_capacity=total_miners, # Would be calculated from forecasting
recommended_scaling="stable",
scaling_reason="Capacity within normal range",
timestamp=datetime.utcnow(),
meta_data={"method": "real_time_collection"},
)
self.db.add(snapshot)
await self.db.commit()
logger.info(
f"Capacity snapshot: total={total_miners}, active={active_miners}, "
f"availability={capacity_availability_pct:.2f}%"
)
return {
"total_miners": total_miners,
"active_miners": active_miners,
"capacity_availability_pct": capacity_availability_pct,
}
async def collect_all_miner_metrics(self) -> Dict[str, Any]:
"""Collect all SLA metrics for all miners"""
# Get all miners
stmt = select(Miner)
miners = self.db.execute(stmt).scalars().all()
results = {
"miners_processed": 0,
"metrics_collected": [],
"violations_detected": 0,
}
for miner in miners:
try:
# Collect each metric type
uptime = await self.collect_miner_uptime(miner.miner_id)
response_time = await self.collect_response_time(miner.miner_id)
completion_rate = await self.collect_completion_rate(miner.miner_id)
results["metrics_collected"].append(
{
"miner_id": miner.miner_id,
"uptime_pct": uptime,
"response_time_ms": response_time,
"completion_rate_pct": completion_rate,
}
)
results["miners_processed"] += 1
except Exception as e:
logger.error(f"Failed to collect metrics for miner {miner.miner_id}: {e}")
# Collect capacity metrics
capacity = await self.collect_capacity_availability()
results["capacity"] = capacity
# Count violations in this collection cycle
stmt = (
select(func.count(SLAViolation.id))
.where(SLAViolation.resolved_at.is_(None))
.where(SLAViolation.created_at >= datetime.utcnow() - timedelta(hours=1))
)
results["violations_detected"] = self.db.execute(stmt).scalar() or 0
logger.info(
f"SLA collection complete: processed={results['miners_processed']}, "
f"violations={results['violations_detected']}"
)
return results
async def get_sla_metrics(
self, miner_id: Optional[str] = None, hours: int = 24
) -> List[SLAMetric]:
"""Get SLA metrics for a miner or all miners"""
cutoff = datetime.utcnow() - timedelta(hours=hours)
stmt = select(SLAMetric).where(SLAMetric.timestamp >= cutoff)
if miner_id:
stmt = stmt.where(SLAMetric.miner_id == miner_id)
stmt = stmt.order_by(desc(SLAMetric.timestamp))
return (await self.db.execute(stmt)).scalars().all()
async def get_sla_violations(
self, miner_id: Optional[str] = None, resolved: bool = False
) -> List[SLAViolation]:
"""Get SLA violations for a miner or all miners"""
stmt = select(SLAViolation)
if miner_id:
stmt = stmt.where(SLAViolation.miner_id == miner_id)
if resolved:
stmt = stmt.where(SLAViolation.resolved_at.isnot_(None))
else:
stmt = stmt.where(SLAViolation.resolved_at.is_(None))
stmt = stmt.order_by(desc(SLAViolation.created_at))
return (await self.db.execute(stmt)).scalars().all()
def _check_violation(self, metric_type: str, value: float, threshold: float) -> bool:
"""Check if a metric value violates its SLA threshold"""
if metric_type in ["uptime_pct", "completion_rate_pct", "capacity_availability_pct"]:
# Higher is better - violation if below threshold
return value < threshold
elif metric_type in ["response_time_ms"]:
# Lower is better - violation if above threshold
return value > threshold
return False
async def _record_violation(
self,
miner_id: str,
metric_type: str,
metric_value: float,
threshold: float,
metadata: Optional[Dict[str, str]] = None,
) -> SLAViolation:
"""Record an SLA violation"""
# Determine severity
if metric_type in ["uptime_pct", "completion_rate_pct"]:
severity = "critical" if metric_value < threshold * 0.8 else "high"
elif metric_type == "response_time_ms":
severity = "critical" if metric_value > threshold * 2 else "high"
else:
severity = "medium"
violation = SLAViolation(
miner_id=miner_id,
violation_type=metric_type,
severity=severity,
metric_value=metric_value,
threshold=threshold,
violation_duration_ms=None, # Will be updated when resolved
created_at=datetime.utcnow(),
meta_data=metadata or {},
)
self.db.add(violation)
await self.db.commit()
logger.warning(
f"SLA violation recorded: miner={miner_id}, type={metric_type}, "
f"severity={severity}, value={metric_value}, threshold={threshold}"
)
return violation
class SLACollectorScheduler:
"""Scheduler for automated SLA metric collection"""
def __init__(self, sla_collector: SLACollector):
self.sla_collector = sla_collector
self.logger = logging.getLogger(__name__)
self.running = False
async def start(self, collection_interval_seconds: int = 300):
"""Start the SLA collection scheduler"""
if self.running:
return
self.running = True
self.logger.info("SLA Collector scheduler started")
# Start collection loop
asyncio.create_task(self._collection_loop(collection_interval_seconds))
async def stop(self):
"""Stop the SLA collection scheduler"""
self.running = False
self.logger.info("SLA Collector scheduler stopped")
async def _collection_loop(self, interval_seconds: int):
"""Background task that collects SLA metrics periodically"""
while self.running:
try:
await self.sla_collector.collect_all_miner_metrics()
# Wait for next collection interval
await asyncio.sleep(interval_seconds)
except Exception as e:
self.logger.error(f"Error in SLA collection loop: {e}")
await asyncio.sleep(60) # Retry in 1 minute

View File

@@ -32,9 +32,11 @@ class Settings(BaseSettings):
postgres_dsn: str = Field(default="postgresql+asyncpg://poolhub:poolhub@127.0.0.1:5432/aitbc")
postgres_pool_min: int = Field(default=1)
postgres_pool_max: int = Field(default=10)
test_postgres_dsn: str = Field(default="postgresql+asyncpg://poolhub:poolhub@127.0.0.1:5432/aitbc_test")
redis_url: str = Field(default="redis://127.0.0.1:6379/4")
redis_max_connections: int = Field(default=32)
test_redis_url: str = Field(default="redis://127.0.0.1:6379/4")
session_ttl_seconds: int = Field(default=60)
heartbeat_grace_seconds: int = Field(default=120)
@@ -45,6 +47,30 @@ class Settings(BaseSettings):
prometheus_namespace: str = Field(default="poolhub")
# Coordinator-API Billing Integration
coordinator_billing_url: str = Field(default="http://localhost:8011")
coordinator_api_key: str | None = Field(default=None)
# SLA Configuration
sla_thresholds: Dict[str, float] = Field(
default_factory=lambda: {
"uptime_pct": 95.0,
"response_time_ms": 1000.0,
"completion_rate_pct": 90.0,
"capacity_availability_pct": 80.0,
}
)
# Capacity Planning Configuration
capacity_forecast_hours: int = Field(default=168)
capacity_alert_threshold_pct: float = Field(default=80.0)
# Billing Sync Configuration
billing_sync_interval_hours: int = Field(default=1)
# SLA Collection Configuration
sla_collection_interval_seconds: int = Field(default=300)
def asgi_kwargs(self) -> Dict[str, Any]:
return {
"title": self.app_name,