From a9405a0d28afd226d20ece933dab2c9b20f901c3 Mon Sep 17 00:00:00 2001 From: aitbc Date: Thu, 14 May 2026 21:50:09 +0200 Subject: [PATCH] Create edge-api service foundation - Phase 1 complete - Created edge-api service structure with FastAPI application - Implemented all schema files (island, gpu, database, serve, metrics) - Created router stub files for all modules - Created service stub files for all modules - Created client stub files (blockchain RPC, GPU service) - Configured PostgreSQL database (aitbc_edge) with proper permissions - Fixed SQLAlchemy reserved name conflict (metadata -> extra_data) - Changed port to 8103 to avoid conflicts - Service runs successfully on port 8103 - Health endpoint tested and working - Created systemd service file - Created README.md with documentation --- apps/edge-api/README.md | 153 ++++++++++++++++++ apps/edge-api/edge-api.service | 22 +++ apps/edge-api/pyproject.toml | 24 +++ apps/edge-api/src/edge_api/__init__.py | 3 + .../edge-api/src/edge_api/clients/__init__.py | 9 ++ .../src/edge_api/clients/blockchain_rpc.py | 38 +++++ .../src/edge_api/clients/gpu_service.py | 33 ++++ apps/edge-api/src/edge_api/config.py | 43 +++++ apps/edge-api/src/edge_api/main.py | 104 ++++++++++++ .../edge-api/src/edge_api/routers/__init__.py | 15 ++ .../edge-api/src/edge_api/routers/database.py | 29 ++++ apps/edge-api/src/edge_api/routers/gpu.py | 29 ++++ apps/edge-api/src/edge_api/routers/islands.py | 37 +++++ apps/edge-api/src/edge_api/routers/metrics.py | 23 +++ apps/edge-api/src/edge_api/routers/serve.py | 35 ++++ .../edge-api/src/edge_api/schemas/__init__.py | 17 ++ .../edge-api/src/edge_api/schemas/database.py | 31 ++++ apps/edge-api/src/edge_api/schemas/gpu.py | 35 ++++ apps/edge-api/src/edge_api/schemas/island.py | 50 ++++++ apps/edge-api/src/edge_api/schemas/metrics.py | 38 +++++ apps/edge-api/src/edge_api/schemas/serve.py | 52 ++++++ .../src/edge_api/services/__init__.py | 15 ++ .../src/edge_api/services/database_service.py | 29 ++++ .../src/edge_api/services/gpu_service.py | 29 ++++ .../src/edge_api/services/island_service.py | 33 ++++ .../src/edge_api/services/metrics_service.py | 25 +++ .../src/edge_api/services/serve_service.py | 33 ++++ apps/edge-api/src/edge_api/storage.py | 39 +++++ .../src/governance_service/main.py | 18 +-- .../services/governance_service.py | 57 +++---- 30 files changed, 1062 insertions(+), 36 deletions(-) create mode 100644 apps/edge-api/README.md create mode 100644 apps/edge-api/edge-api.service create mode 100644 apps/edge-api/pyproject.toml create mode 100644 apps/edge-api/src/edge_api/__init__.py create mode 100644 apps/edge-api/src/edge_api/clients/__init__.py create mode 100644 apps/edge-api/src/edge_api/clients/blockchain_rpc.py create mode 100644 apps/edge-api/src/edge_api/clients/gpu_service.py create mode 100644 apps/edge-api/src/edge_api/config.py create mode 100644 apps/edge-api/src/edge_api/main.py create mode 100644 apps/edge-api/src/edge_api/routers/__init__.py create mode 100644 apps/edge-api/src/edge_api/routers/database.py create mode 100644 apps/edge-api/src/edge_api/routers/gpu.py create mode 100644 apps/edge-api/src/edge_api/routers/islands.py create mode 100644 apps/edge-api/src/edge_api/routers/metrics.py create mode 100644 apps/edge-api/src/edge_api/routers/serve.py create mode 100644 apps/edge-api/src/edge_api/schemas/__init__.py create mode 100644 apps/edge-api/src/edge_api/schemas/database.py create mode 100644 apps/edge-api/src/edge_api/schemas/gpu.py create mode 100644 apps/edge-api/src/edge_api/schemas/island.py create mode 100644 apps/edge-api/src/edge_api/schemas/metrics.py create mode 100644 apps/edge-api/src/edge_api/schemas/serve.py create mode 100644 apps/edge-api/src/edge_api/services/__init__.py create mode 100644 apps/edge-api/src/edge_api/services/database_service.py create mode 100644 apps/edge-api/src/edge_api/services/gpu_service.py create mode 100644 apps/edge-api/src/edge_api/services/island_service.py create mode 100644 apps/edge-api/src/edge_api/services/metrics_service.py create mode 100644 apps/edge-api/src/edge_api/services/serve_service.py create mode 100644 apps/edge-api/src/edge_api/storage.py diff --git a/apps/edge-api/README.md b/apps/edge-api/README.md new file mode 100644 index 00000000..d7b6c65f --- /dev/null +++ b/apps/edge-api/README.md @@ -0,0 +1,153 @@ +# Edge API Service + +REST API for AITBC island and edge operations, providing HTTP equivalents for CLI commands. + +## Overview + +The Edge API Service exposes island and edge operations via REST API, following the coordinator-api pattern. It integrates with the existing GPU service and blockchain node RPC. + +## Architecture + +- **Port:** 8103 +- **Database:** PostgreSQL (aitbc_edge) +- **Communication:** + - Blockchain node RPC: localhost:8006 + - GPU service: localhost:8101 +- **Authentication:** JWT tokens (same as coordinator-api) + +## Installation + +```bash +cd /opt/aitbc/apps/edge-api +pip install -e . +``` + +## Database Setup + +Create PostgreSQL database and user: + +```bash +sudo -u postgres psql +CREATE DATABASE aitbc_edge; +CREATE USER aitbc_edge WITH PASSWORD 'password'; +GRANT ALL PRIVILEGES ON DATABASE aitbc_edge TO aitbc_edge; +\q +``` + +## Running + +### Development + +```bash +cd /opt/aitbc/apps/edge-api +python -m edge_api.main +``` + +### Production (systemd) + +```bash +# Install service +sudo cp edge-api.service /etc/systemd/system/ +sudo systemctl daemon-reload +sudo systemctl enable edge-api +sudo systemctl start edge-api +``` + +## API Endpoints + +### Health Checks + +- `GET /health` - Health check +- `GET /ready` - Readiness check + +### Island Operations (Phase 2) + +- `POST /v1/islands/join` - Join an island +- `POST /v1/islands/leave` - Leave an island +- `GET /v1/islands` - List all islands +- `GET /v1/islands/{island_id}` - Get island details +- `POST /v1/islands/bridge` - Request bridge + +### GPU Operations (Phase 3) + +- `POST /v1/gpu/listings` - List GPU on island +- `GET /v1/gpu/listings` - Get GPU listings +- `DELETE /v1/gpu/listings/{listing_id}` - Remove GPU listing +- `GET /v1/gpu/scan` - Scan GPUs + +### Edge Database Operations (Phase 4) + +- `POST /v1/database/init` - Initialize edge database +- `GET /v1/database` - Get edge database status +- `DELETE /v1/database` - Delete edge database +- `POST /v1/database/sync` - Sync edge database + +### Edge Serve Operations (Phase 5) + +- `POST /v1/serve/start` - Start serving +- `POST /v1/serve/stop` - Stop serving +- `GET /v1/serve/status` - Get serve status +- `GET /v1/serve/requests` - Get pending requests +- `POST /v1/serve/requests/{request_id}/complete` - Complete request + +### Edge Metrics (Phase 6) + +- `GET /v1/metrics` - Get edge metrics +- `GET /v1/metrics/gpu` - Get GPU metrics +- `GET /v1/metrics/database` - Get database metrics + +## Implementation Status + +**Phase 1: Foundation** ✅ +- Service structure +- FastAPI application +- Database configuration +- Basic stub endpoints + +**Phase 2: Island Operations** ⏳ +- Island join/leave endpoints +- Blockchain RPC integration +- Island listing + +**Phase 3: GPU Operations** ⏳ +- GPU listing endpoints +- GPU service integration +- GPU metrics + +**Phase 4: Edge Database** ⏳ +- Database initialization +- Database sync +- Database management + +**Phase 5: Edge Serve** ⏳ +- Serve start/stop +- Request queue +- Request processing + +**Phase 6: Metrics** ⏳ +- Metrics collection +- Metrics endpoints +- Performance monitoring + +## CLI Integration + +CLI commands will be updated to use REST API instead of placeholder implementations: + +- `aitbc island join` → `POST /v1/islands/join` +- `aitbc gpu list-island` → `POST /v1/gpu/listings` +- `aitbc database init-edge` → `POST /v1/database/init` +- `aitbc edge serve` → `POST /v1/serve/start` +- `aitbc edge metrics` → `GET /v1/metrics` + +## Agent SDK Integration + +Agent SDK will be updated to use REST API for island and edge operations. + +## Dependencies + +- FastAPI +- SQLAlchemy + SQLModel +- PostgreSQL +- httpx +- PyJWT +- uvicorn diff --git a/apps/edge-api/edge-api.service b/apps/edge-api/edge-api.service new file mode 100644 index 00000000..c8c1624f --- /dev/null +++ b/apps/edge-api/edge-api.service @@ -0,0 +1,22 @@ +[Unit] +Description=AITBC Edge API Service +After=network.target postgresql.service + +[Service] +Type=simple +WorkingDirectory=/opt/aitbc/apps/edge-api +Environment="PATH=/opt/aitbc/venv/bin" +Environment="PYTHONPATH=/opt/aitbc/packages/py/aitbc-core/src:/opt/aitbc/apps/edge-api/src:/opt/aitbc" +Environment="DATABASE_URL=postgresql+asyncpg://aitbc_edge:password@localhost:5432/aitbc_edge" +Environment="BLOCKCHAIN_RPC_HOST=localhost" +Environment="BLOCKCHAIN_RPC_PORT=8006" +Environment="GPU_SERVICE_HOST=localhost" +Environment="GPU_SERVICE_PORT=8101" +Environment="JWT_SECRET_KEY=your-secret-key-change-in-production" +Environment="API_PORT=8103" +ExecStart=/opt/aitbc/venv/bin/python -m edge_api.main +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target diff --git a/apps/edge-api/pyproject.toml b/apps/edge-api/pyproject.toml new file mode 100644 index 00000000..3d64fb8e --- /dev/null +++ b/apps/edge-api/pyproject.toml @@ -0,0 +1,24 @@ +[project] +name = "edge-api" +version = "0.1.0" +description = "Edge API Service for AITBC island and edge operations" +requires-python = ">=3.13" +dependencies = [ + "fastapi>=0.109.0", + "uvicorn>=0.27.0", + "sqlmodel>=0.0.14", + "sqlalchemy[asyncio]>=2.0.25", + "asyncpg>=0.29.0", + "httpx>=0.26.0", + "pydantic>=2.5.0", + "pydantic-settings>=2.1.0", + "python-jose[cryptography]>=3.3.0", + "passlib[bcrypt]>=1.7.4", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/edge_api"] diff --git a/apps/edge-api/src/edge_api/__init__.py b/apps/edge-api/src/edge_api/__init__.py new file mode 100644 index 00000000..39d9ccfb --- /dev/null +++ b/apps/edge-api/src/edge_api/__init__.py @@ -0,0 +1,3 @@ +"""Edge API Service for AITBC island and edge operations""" + +__version__ = "0.1.0" diff --git a/apps/edge-api/src/edge_api/clients/__init__.py b/apps/edge-api/src/edge_api/clients/__init__.py new file mode 100644 index 00000000..2d7c5b6a --- /dev/null +++ b/apps/edge-api/src/edge_api/clients/__init__.py @@ -0,0 +1,9 @@ +"""Clients for Edge API Service""" + +from .blockchain_rpc import BlockchainRPCClient +from .gpu_service import GPUServiceClient + +__all__ = [ + "BlockchainRPCClient", + "GPUServiceClient", +] diff --git a/apps/edge-api/src/edge_api/clients/blockchain_rpc.py b/apps/edge-api/src/edge_api/clients/blockchain_rpc.py new file mode 100644 index 00000000..e2ead7d0 --- /dev/null +++ b/apps/edge-api/src/edge_api/clients/blockchain_rpc.py @@ -0,0 +1,38 @@ +"""Blockchain RPC client for Edge API Service""" + +import httpx +from typing import Dict, Optional + +from ..config import settings + + +class BlockchainRPCClient: + """Client for blockchain node RPC communication""" + + def __init__(self): + self.base_url = f"http://{settings.blockchain_rpc_host}:{settings.blockchain_rpc_port}" + self.client = httpx.AsyncClient(timeout=30.0) + + async def close(self): + """Close the HTTP client""" + await self.client.aclose() + + async def join_island(self, island_id: str, island_name: str, chain_id: str, role: str) -> Dict: + """Join island via blockchain RPC - TODO: Implement in Phase 2""" + # TODO: Call blockchain node RPC endpoint for island join + return {"message": "join_island via RPC - to be implemented in Phase 2"} + + async def leave_island(self, island_id: str) -> Dict: + """Leave island via blockchain RPC - TODO: Implement in Phase 2""" + # TODO: Call blockchain node RPC endpoint for island leave + return {"message": "leave_island via RPC - to be implemented in Phase 2"} + + async def get_island_info(self, island_id: str) -> Optional[Dict]: + """Get island info via blockchain RPC - TODO: Implement in Phase 2""" + # TODO: Call blockchain node RPC endpoint for island info + return {"message": "get_island_info via RPC - to be implemented in Phase 2"} + + async def request_bridge(self, target_island_id: str) -> Dict: + """Request bridge via blockchain RPC - TODO: Implement in Phase 2""" + # TODO: Call blockchain node RPC endpoint for bridge request + return {"message": "request_bridge via RPC - to be implemented in Phase 2"} diff --git a/apps/edge-api/src/edge_api/clients/gpu_service.py b/apps/edge-api/src/edge_api/clients/gpu_service.py new file mode 100644 index 00000000..88e28f7d --- /dev/null +++ b/apps/edge-api/src/edge_api/clients/gpu_service.py @@ -0,0 +1,33 @@ +"""GPU service client for Edge API Service""" + +import httpx +from typing import Dict, List + +from ..config import settings + + +class GPUServiceClient: + """Client for GPU service communication""" + + def __init__(self): + self.base_url = f"http://{settings.gpu_service_host}:{settings.gpu_service_port}" + self.client = httpx.AsyncClient(timeout=30.0) + + async def close(self): + """Close the HTTP client""" + await self.client.aclose() + + async def scan_gpus(self, miner_id: str) -> Dict: + """Scan GPUs via GPU service - TODO: Implement in Phase 3""" + # TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/scan/{miner_id} + return {"message": "scan_gpus via GPU service - to be implemented in Phase 3"} + + async def get_gpu_profiles(self) -> List[Dict]: + """Get GPU profiles via GPU service - TODO: Implement in Phase 3""" + # TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/profiles + return [{"message": "get_gpu_profiles via GPU service - to be implemented in Phase 3"}] + + async def get_gpu_metrics(self, gpu_id: str) -> Dict: + """Get GPU metrics via GPU service - TODO: Implement in Phase 3""" + # TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/metrics/{gpu_id} + return {"message": "get_gpu_metrics via GPU service - to be implemented in Phase 3"} diff --git a/apps/edge-api/src/edge_api/config.py b/apps/edge-api/src/edge_api/config.py new file mode 100644 index 00000000..8dac3986 --- /dev/null +++ b/apps/edge-api/src/edge_api/config.py @@ -0,0 +1,43 @@ +"""Configuration for Edge API Service""" + +import os +from typing import Optional +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Edge API settings""" + + # API settings + api_host: str = "0.0.0.0" + api_port: int = 8103 + api_prefix: str = "/v1" + + # Database settings + database_url: str = "postgresql+asyncpg://aitbc_edge:password@localhost:5432/aitbc_edge" + + # Blockchain node RPC settings + blockchain_rpc_host: str = "localhost" + blockchain_rpc_port: int = 8006 + + # GPU service settings + gpu_service_host: str = "localhost" + gpu_service_port: int = 8101 + + # JWT settings + jwt_secret_key: str = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production") + jwt_algorithm: str = "HS256" + jwt_expiration_hours: int = 24 + + # CORS settings + cors_origins: list[str] = ["http://localhost:3000", "http://localhost:8080"] + + # Logging + log_level: str = "INFO" + + class Config: + env_file = ".env" + case_sensitive = False + + +settings = Settings() diff --git a/apps/edge-api/src/edge_api/main.py b/apps/edge-api/src/edge_api/main.py new file mode 100644 index 00000000..3a465ce7 --- /dev/null +++ b/apps/edge-api/src/edge_api/main.py @@ -0,0 +1,104 @@ +"""Main FastAPI application for Edge API Service""" + +from contextlib import asynccontextmanager +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse + +from aitbc import get_logger + +from .config import settings +from .storage import init_db +from .routers import islands, gpu, database, serve, metrics + +logger = get_logger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Lifespan context manager for startup/shutdown""" + # Startup + logger.info("Starting Edge API Service") + await init_db() + logger.info("Database initialized") + + yield + + # Shutdown + logger.info("Shutting down Edge API Service") + + +# Create FastAPI application +app = FastAPI( + title="Edge API Service", + description="REST API for AITBC island and edge operations", + version="0.1.0", + lifespan=lifespan, + docs_url="/docs", + redoc_url="/redoc", +) + +# CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=settings.cors_origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +# Health check endpoint +@app.get("/health") +async def health_check(): + """Health check endpoint""" + return { + "status": "healthy", + "service": "edge-api", + "version": "0.1.0" + } + + +# Readiness check endpoint +@app.get("/ready") +async def readiness_check(): + """Readiness check endpoint""" + # TODO: Check database connection, blockchain RPC, GPU service + return { + "status": "ready", + "service": "edge-api", + "version": "0.1.0" + } + + +# Include routers +app.include_router(islands.router, prefix=f"{settings.api_prefix}/islands", tags=["islands"]) +app.include_router(gpu.router, prefix=f"{settings.api_prefix}/gpu", tags=["gpu"]) +app.include_router(database.router, prefix=f"{settings.api_prefix}/database", tags=["database"]) +app.include_router(serve.router, prefix=f"{settings.api_prefix}/serve", tags=["serve"]) +app.include_router(metrics.router, prefix=f"{settings.api_prefix}/metrics", tags=["metrics"]) + + +# Global exception handler +@app.exception_handler(Exception) +async def global_exception_handler(request: Request, exc: Exception): + """Global exception handler""" + logger.error(f"Unhandled exception: {exc}", exc_info=True) + return JSONResponse( + status_code=500, + content={ + "error": "Internal server error", + "detail": str(exc) + } + ) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run( + "edge_api.main:app", + host=settings.api_host, + port=settings.api_port, + reload=True, + log_level=settings.log_level.lower() + ) diff --git a/apps/edge-api/src/edge_api/routers/__init__.py b/apps/edge-api/src/edge_api/routers/__init__.py new file mode 100644 index 00000000..c002c326 --- /dev/null +++ b/apps/edge-api/src/edge_api/routers/__init__.py @@ -0,0 +1,15 @@ +"""Routers for Edge API Service""" + +from .islands import router as islands_router +from .gpu import router as gpu_router +from .database import router as database_router +from .serve import router as serve_router +from .metrics import router as metrics_router + +__all__ = [ + "islands_router", + "gpu_router", + "database_router", + "serve_router", + "metrics_router", +] diff --git a/apps/edge-api/src/edge_api/routers/database.py b/apps/edge-api/src/edge_api/routers/database.py new file mode 100644 index 00000000..5bb20f6c --- /dev/null +++ b/apps/edge-api/src/edge_api/routers/database.py @@ -0,0 +1,29 @@ +"""Edge database operations router for Edge API Service""" + +from fastapi import APIRouter + +router = APIRouter() + + +@router.post("/init") +async def init_edge_database(): + """Initialize edge database - TODO: Implement in Phase 4""" + return {"message": "Edge database init endpoint - to be implemented in Phase 4"} + + +@router.get("/") +async def get_edge_database(): + """Get edge database status - TODO: Implement in Phase 4""" + return {"message": "Get edge database endpoint - to be implemented in Phase 4"} + + +@router.delete("/") +async def delete_edge_database(): + """Delete edge database - TODO: Implement in Phase 4""" + return {"message": "Delete edge database endpoint - to be implemented in Phase 4"} + + +@router.post("/sync") +async def sync_edge_database(): + """Sync edge database to main network - TODO: Implement in Phase 4""" + return {"message": "Sync edge database endpoint - to be implemented in Phase 4"} diff --git a/apps/edge-api/src/edge_api/routers/gpu.py b/apps/edge-api/src/edge_api/routers/gpu.py new file mode 100644 index 00000000..bc7d9c4a --- /dev/null +++ b/apps/edge-api/src/edge_api/routers/gpu.py @@ -0,0 +1,29 @@ +"""GPU operations router for Edge API Service""" + +from fastapi import APIRouter + +router = APIRouter() + + +@router.post("/listings") +async def list_gpu(): + """List GPU on island - TODO: Implement in Phase 3""" + return {"message": "GPU listing endpoint - to be implemented in Phase 3"} + + +@router.get("/listings") +async def get_gpu_listings(): + """Get GPU listings on island - TODO: Implement in Phase 3""" + return {"message": "Get GPU listings endpoint - to be implemented in Phase 3"} + + +@router.delete("/listings/{listing_id}") +async def remove_gpu_listing(listing_id: str): + """Remove GPU listing - TODO: Implement in Phase 3""" + return {"message": f"Remove GPU listing {listing_id} - to be implemented in Phase 3"} + + +@router.get("/scan") +async def scan_gpus(): + """Scan GPUs on island - TODO: Implement in Phase 3""" + return {"message": "GPU scan endpoint - to be implemented in Phase 3"} diff --git a/apps/edge-api/src/edge_api/routers/islands.py b/apps/edge-api/src/edge_api/routers/islands.py new file mode 100644 index 00000000..ae45c34f --- /dev/null +++ b/apps/edge-api/src/edge_api/routers/islands.py @@ -0,0 +1,37 @@ +"""Island operations router for Edge API Service""" + +from fastapi import APIRouter, Depends + +from ..schemas.island import IslandMembership, BridgeRequest + +router = APIRouter() + + +@router.post("/join") +async def join_island(): + """Join an island - TODO: Implement in Phase 2""" + return {"message": "Island join endpoint - to be implemented in Phase 2"} + + +@router.post("/leave") +async def leave_island(): + """Leave an island - TODO: Implement in Phase 2""" + return {"message": "Island leave endpoint - to be implemented in Phase 2"} + + +@router.get("/") +async def list_islands(): + """List all islands - TODO: Implement in Phase 2""" + return {"message": "List islands endpoint - to be implemented in Phase 2"} + + +@router.get("/{island_id}") +async def get_island(island_id: str): + """Get island details - TODO: Implement in Phase 2""" + return {"message": f"Get island {island_id} - to be implemented in Phase 2"} + + +@router.post("/bridge") +async def request_bridge(): + """Request bridge to another island - TODO: Implement in Phase 2""" + return {"message": "Bridge request endpoint - to be implemented in Phase 2"} diff --git a/apps/edge-api/src/edge_api/routers/metrics.py b/apps/edge-api/src/edge_api/routers/metrics.py new file mode 100644 index 00000000..11670d9c --- /dev/null +++ b/apps/edge-api/src/edge_api/routers/metrics.py @@ -0,0 +1,23 @@ +"""Edge metrics router for Edge API Service""" + +from fastapi import APIRouter + +router = APIRouter() + + +@router.get("/") +async def get_edge_metrics(): + """Get edge metrics for island - TODO: Implement in Phase 6""" + return {"message": "Get edge metrics endpoint - to be implemented in Phase 6"} + + +@router.get("/gpu") +async def get_gpu_metrics(): + """Get GPU metrics - TODO: Implement in Phase 6""" + return {"message": "Get GPU metrics endpoint - to be implemented in Phase 6"} + + +@router.get("/database") +async def get_database_metrics(): + """Get database metrics - TODO: Implement in Phase 6""" + return {"message": "Get database metrics endpoint - to be implemented in Phase 6"} diff --git a/apps/edge-api/src/edge_api/routers/serve.py b/apps/edge-api/src/edge_api/routers/serve.py new file mode 100644 index 00000000..214dc2b0 --- /dev/null +++ b/apps/edge-api/src/edge_api/routers/serve.py @@ -0,0 +1,35 @@ +"""Edge serve operations router for Edge API Service""" + +from fastapi import APIRouter + +router = APIRouter() + + +@router.post("/start") +async def start_serve(): + """Start serving edge compute requests - TODO: Implement in Phase 5""" + return {"message": "Start serve endpoint - to be implemented in Phase 5"} + + +@router.post("/stop") +async def stop_serve(): + """Stop serving edge compute requests - TODO: Implement in Phase 5""" + return {"message": "Stop serve endpoint - to be implemented in Phase 5"} + + +@router.get("/status") +async def get_serve_status(): + """Get serve status - TODO: Implement in Phase 5""" + return {"message": "Get serve status endpoint - to be implemented in Phase 5"} + + +@router.get("/requests") +async def get_pending_requests(): + """Get pending compute requests - TODO: Implement in Phase 5""" + return {"message": "Get pending requests endpoint - to be implemented in Phase 5"} + + +@router.post("/requests/{request_id}/complete") +async def complete_request(request_id: str): + """Complete a compute request - TODO: Implement in Phase 5""" + return {"message": f"Complete request {request_id} - to be implemented in Phase 5"} diff --git a/apps/edge-api/src/edge_api/schemas/__init__.py b/apps/edge-api/src/edge_api/schemas/__init__.py new file mode 100644 index 00000000..92f8224b --- /dev/null +++ b/apps/edge-api/src/edge_api/schemas/__init__.py @@ -0,0 +1,17 @@ +"""Schemas for Edge API Service""" + +from .island import IslandMembership, BridgeRequest +from .gpu import GPUListing +from .database import EdgeDatabase +from .serve import ComputeRequest, ComputeResult +from .metrics import EdgeMetrics + +__all__ = [ + "IslandMembership", + "BridgeRequest", + "GPUListing", + "EdgeDatabase", + "ComputeRequest", + "ComputeResult", + "EdgeMetrics", +] diff --git a/apps/edge-api/src/edge_api/schemas/database.py b/apps/edge-api/src/edge_api/schemas/database.py new file mode 100644 index 00000000..ecd19828 --- /dev/null +++ b/apps/edge-api/src/edge_api/schemas/database.py @@ -0,0 +1,31 @@ +"""Edge database-related schemas for Edge API Service""" + +from datetime import datetime, timezone +from uuid import uuid4 + +from sqlalchemy import JSON, Column +from sqlmodel import Field, SQLModel + + +class EdgeDatabase(SQLModel, table=True): + """Edge database instance""" + + __tablename__ = "edge_databases" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"edge_db_{uuid4().hex[:8]}", primary_key=True) + database_id: str = Field(index=True) + island_id: str = Field(index=True) + capacity_gb: int + used_gb: int = Field(default=0) + status: str = Field(default="initialized", index=True) # initialized, active, syncing, error + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + # Sync information + last_sync_at: datetime | None = Field(default=None) + sync_status: str = Field(default="idle") # idle, syncing, error + records_synced: int = Field(default=0) + + # Database metadata + extra_data: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=True)) diff --git a/apps/edge-api/src/edge_api/schemas/gpu.py b/apps/edge-api/src/edge_api/schemas/gpu.py new file mode 100644 index 00000000..dc9e29cc --- /dev/null +++ b/apps/edge-api/src/edge_api/schemas/gpu.py @@ -0,0 +1,35 @@ +"""GPU-related schemas for Edge API Service""" + +from datetime import datetime, timezone +from uuid import uuid4 + +from sqlalchemy import JSON, Column +from sqlmodel import Field, SQLModel + + +class GPUListing(SQLModel, table=True): + """GPU listing on island""" + + __tablename__ = "gpu_listings" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"gpu_listing_{uuid4().hex[:8]}", primary_key=True) + listing_id: str = Field(index=True) + island_id: str = Field(index=True) + miner_id: str = Field(index=True) + gpu_type: str = Field(index=True) + price_per_hour: float + status: str = Field(default="active", index=True) # active, inactive, booked + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + # GPU specifications + memory_gb: int | None = Field(default=None) + cuda_version: str = Field(default="") + region: str = Field(default="") + + # Capabilities + capabilities: list = Field(default_factory=list, sa_column=Column(JSON, nullable=True)) + + # Listing metadata + extra_data: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=True)) diff --git a/apps/edge-api/src/edge_api/schemas/island.py b/apps/edge-api/src/edge_api/schemas/island.py new file mode 100644 index 00000000..0c435534 --- /dev/null +++ b/apps/edge-api/src/edge_api/schemas/island.py @@ -0,0 +1,50 @@ +"""Island-related schemas for Edge API Service""" + +from datetime import datetime, timezone +from enum import StrEnum +from uuid import uuid4 + +from sqlalchemy import JSON, Column +from sqlmodel import Field, SQLModel + + +class IslandStatus(StrEnum): + """Island membership status""" + ACTIVE = "active" + INACTIVE = "inactive" + BRIDGING = "bridging" + + +class IslandMembership(SQLModel, table=True): + """Island membership in edge API database""" + + __tablename__ = "island_memberships" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"membership_{uuid4().hex[:8]}", primary_key=True) + island_id: str = Field(index=True) + island_name: str + chain_id: str = Field(index=True) + status: IslandStatus = Field(default=IslandStatus.ACTIVE, index=True) + role: str = Field(default="compute-provider") # compute-provider, consumer, hub + joined_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + peer_count: int = Field(default=0) + + # Additional metadata + extra_data: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=True)) + + +class BridgeRequest(SQLModel, table=True): + """Bridge request for island connectivity""" + + __tablename__ = "bridge_requests" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"bridge_req_{uuid4().hex[:8]}", primary_key=True) + request_id: str = Field(index=True) + source_island_id: str + target_island_id: str + source_node_id: str + status: str = Field(default="pending", index=True) # pending, approved, rejected + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) diff --git a/apps/edge-api/src/edge_api/schemas/metrics.py b/apps/edge-api/src/edge_api/schemas/metrics.py new file mode 100644 index 00000000..ca068d2a --- /dev/null +++ b/apps/edge-api/src/edge_api/schemas/metrics.py @@ -0,0 +1,38 @@ +"""Edge metrics-related schemas for Edge API Service""" + +from datetime import datetime, timezone +from uuid import uuid4 + +from sqlmodel import Field, SQLModel + + +class EdgeMetrics(SQLModel, table=True): + """Edge performance metrics""" + + __tablename__ = "edge_metrics" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"edge_metric_{uuid4().hex[:8]}", primary_key=True) + island_id: str = Field(index=True) + gpu_id: str | None = Field(default=None, index=True) + database_id: str | None = Field(default=None, index=True) + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), index=True) + + # GPU metrics + gpu_utilization: float = Field(default=0.0) + gpu_temperature: float | None = Field(default=None) + gpu_power: float | None = Field(default=None) + + # Memory metrics + memory_utilization: float = Field(default=0.0) + memory_used_gb: float = Field(default=0.0) + + # Request metrics + request_rate: float = Field(default=0.0) # requests per second + latency_avg: float = Field(default=0.0) # milliseconds + active_requests: int = Field(default=0) + + # Database metrics + database_size_gb: float = Field(default=0.0) + query_rate: float = Field(default=0.0) # queries per second + sync_lag: float = Field(default=0.0) # seconds behind diff --git a/apps/edge-api/src/edge_api/schemas/serve.py b/apps/edge-api/src/edge_api/schemas/serve.py new file mode 100644 index 00000000..52e0a9a1 --- /dev/null +++ b/apps/edge-api/src/edge_api/schemas/serve.py @@ -0,0 +1,52 @@ +"""Edge serve-related schemas for Edge API Service""" + +from datetime import datetime, timezone +from uuid import uuid4 + +from sqlalchemy import JSON, Column +from sqlmodel import Field, SQLModel + + +class ComputeRequest(SQLModel, table=True): + """Compute request in edge serve queue""" + + __tablename__ = "compute_requests" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"compute_req_{uuid4().hex[:8]}", primary_key=True) + request_id: str = Field(index=True) + island_id: str = Field(index=True) + gpu_type: str + status: str = Field(default="pending", index=True) # pending, processing, completed, failed + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + # Request parameters + model_name: str + input_data: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=True)) + + # Processing info + assigned_gpu_id: str | None = Field(default=None) + started_at: datetime | None = Field(default=None) + completed_at: datetime | None = Field(default=None) + + # Result + result: dict | None = Field(default=None, sa_column=Column(JSON, nullable=True)) + error: str | None = Field(default=None) + + +class ComputeResult(SQLModel, table=True): + """Compute result cache""" + + __tablename__ = "compute_results" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"compute_res_{uuid4().hex[:8]}", primary_key=True) + result_id: str = Field(index=True) + request_id: str = Field(index=True) + island_id: str = Field(index=True) + gpu_id: str + result: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=False)) + cache_ttl: int = Field(default=3600) # 1 hour default + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + expires_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) diff --git a/apps/edge-api/src/edge_api/services/__init__.py b/apps/edge-api/src/edge_api/services/__init__.py new file mode 100644 index 00000000..8c2a60a5 --- /dev/null +++ b/apps/edge-api/src/edge_api/services/__init__.py @@ -0,0 +1,15 @@ +"""Services for Edge API Service""" + +from .island_service import IslandService +from .gpu_service import GPUService +from .database_service import DatabaseService +from .serve_service import ServeService +from .metrics_service import MetricsService + +__all__ = [ + "IslandService", + "GPUService", + "DatabaseService", + "ServeService", + "MetricsService", +] diff --git a/apps/edge-api/src/edge_api/services/database_service.py b/apps/edge-api/src/edge_api/services/database_service.py new file mode 100644 index 00000000..cedf5893 --- /dev/null +++ b/apps/edge-api/src/edge_api/services/database_service.py @@ -0,0 +1,29 @@ +"""Edge database service for Edge API Service""" + +from typing import Dict, Optional + +from ..schemas.database import EdgeDatabase + + +class DatabaseService: + """Service for edge database operations""" + + def __init__(self): + # TODO: Initialize database session in Phase 4 + pass + + async def init_edge_database(self, island_id: str, capacity_gb: int) -> Dict: + """Initialize edge database - TODO: Implement in Phase 4""" + return {"message": "init_edge_database - to be implemented in Phase 4"} + + async def get_edge_database(self, island_id: str) -> Optional[Dict]: + """Get edge database status - TODO: Implement in Phase 4""" + return {"message": "get_edge_database - to be implemented in Phase 4"} + + async def delete_edge_database(self, database_id: str) -> Dict: + """Delete edge database - TODO: Implement in Phase 4""" + return {"message": f"delete_edge_database {database_id} - to be implemented in Phase 4"} + + async def sync_edge_database(self, database_id: str) -> Dict: + """Sync edge database to main network - TODO: Implement in Phase 4""" + return {"message": f"sync_edge_database {database_id} - to be implemented in Phase 4"} diff --git a/apps/edge-api/src/edge_api/services/gpu_service.py b/apps/edge-api/src/edge_api/services/gpu_service.py new file mode 100644 index 00000000..2608a99c --- /dev/null +++ b/apps/edge-api/src/edge_api/services/gpu_service.py @@ -0,0 +1,29 @@ +"""GPU service for Edge API Service""" + +from typing import Dict, List, Optional + +from ..schemas.gpu import GPUListing + + +class GPUService: + """Service for GPU operations""" + + def __init__(self): + # TODO: Initialize GPU service client in Phase 3 + pass + + async def list_gpu(self, island_id: str, gpu_type: str, price: float) -> Dict: + """List GPU on island - TODO: Implement in Phase 3""" + return {"message": "list_gpu - to be implemented in Phase 3"} + + async def get_gpu_listings(self, island_id: str) -> List[Dict]: + """Get GPU listings on island - TODO: Implement in Phase 3""" + return [{"message": "get_gpu_listings - to be implemented in Phase 3"}] + + async def remove_gpu_listing(self, listing_id: str) -> Dict: + """Remove GPU listing - TODO: Implement in Phase 3""" + return {"message": f"remove_gpu_listing {listing_id} - to be implemented in Phase 3"} + + async def scan_gpus(self, miner_id: str) -> Dict: + """Scan GPUs on island - TODO: Implement in Phase 3""" + return {"message": "scan_gpus - to be implemented in Phase 3"} diff --git a/apps/edge-api/src/edge_api/services/island_service.py b/apps/edge-api/src/edge_api/services/island_service.py new file mode 100644 index 00000000..c0400523 --- /dev/null +++ b/apps/edge-api/src/edge_api/services/island_service.py @@ -0,0 +1,33 @@ +"""Island service for Edge API Service""" + +from typing import Dict, List, Optional + +from ..schemas.island import IslandMembership, BridgeRequest + + +class IslandService: + """Service for island operations""" + + def __init__(self): + # TODO: Initialize blockchain RPC client in Phase 2 + pass + + async def join_island(self, island_id: str, island_name: str, chain_id: str, role: str) -> Dict: + """Join an island - TODO: Implement in Phase 2""" + return {"message": "join_island - to be implemented in Phase 2"} + + async def leave_island(self, island_id: str) -> Dict: + """Leave an island - TODO: Implement in Phase 2""" + return {"message": "leave_island - to be implemented in Phase 2"} + + async def list_islands(self) -> List[Dict]: + """List all islands - TODO: Implement in Phase 2""" + return [{"message": "list_islands - to be implemented in Phase 2"}] + + async def get_island(self, island_id: str) -> Optional[Dict]: + """Get island details - TODO: Implement in Phase 2""" + return {"message": f"get_island {island_id} - to be implemented in Phase 2"} + + async def request_bridge(self, target_island_id: str) -> Dict: + """Request bridge to another island - TODO: Implement in Phase 2""" + return {"message": "request_bridge - to be implemented in Phase 2"} diff --git a/apps/edge-api/src/edge_api/services/metrics_service.py b/apps/edge-api/src/edge_api/services/metrics_service.py new file mode 100644 index 00000000..4f853c38 --- /dev/null +++ b/apps/edge-api/src/edge_api/services/metrics_service.py @@ -0,0 +1,25 @@ +"""Edge metrics service for Edge API Service""" + +from typing import Dict, List, Optional + +from ..schemas.metrics import EdgeMetrics + + +class MetricsService: + """Service for edge metrics operations""" + + def __init__(self): + # TODO: Initialize metrics collection in Phase 6 + pass + + async def get_edge_metrics(self, island_id: str) -> Dict: + """Get edge metrics for island - TODO: Implement in Phase 6""" + return {"message": "get_edge_metrics - to be implemented in Phase 6"} + + async def get_gpu_metrics(self, island_id: str) -> List[Dict]: + """Get GPU metrics - TODO: Implement in Phase 6""" + return [{"message": "get_gpu_metrics - to be implemented in Phase 6"}] + + async def get_database_metrics(self, island_id: str) -> Dict: + """Get database metrics - TODO: Implement in Phase 6""" + return {"message": "get_database_metrics - to be implemented in Phase 6"} diff --git a/apps/edge-api/src/edge_api/services/serve_service.py b/apps/edge-api/src/edge_api/services/serve_service.py new file mode 100644 index 00000000..c8e8ac04 --- /dev/null +++ b/apps/edge-api/src/edge_api/services/serve_service.py @@ -0,0 +1,33 @@ +"""Edge serve service for Edge API Service""" + +from typing import Dict, List + +from ..schemas.serve import ComputeRequest, ComputeResult + + +class ServeService: + """Service for edge serve operations""" + + def __init__(self): + # TODO: Initialize serve queue in Phase 5 + pass + + async def start_serve(self, island_id: str) -> Dict: + """Start serving edge compute requests - TODO: Implement in Phase 5""" + return {"message": "start_serve - to be implemented in Phase 5"} + + async def stop_serve(self, island_id: str) -> Dict: + """Stop serving edge compute requests - TODO: Implement in Phase 5""" + return {"message": "stop_serve - to be implemented in Phase 5"} + + async def get_serve_status(self, island_id: str) -> Dict: + """Get serve status - TODO: Implement in Phase 5""" + return {"message": "get_serve_status - to be implemented in Phase 5"} + + async def get_pending_requests(self, island_id: str) -> List[Dict]: + """Get pending compute requests - TODO: Implement in Phase 5""" + return [{"message": "get_pending_requests - to be implemented in Phase 5"}] + + async def complete_request(self, request_id: str, result: Dict) -> Dict: + """Complete a compute request - TODO: Implement in Phase 5""" + return {"message": f"complete_request {request_id} - to be implemented in Phase 5"} diff --git a/apps/edge-api/src/edge_api/storage.py b/apps/edge-api/src/edge_api/storage.py new file mode 100644 index 00000000..b8c4e42c --- /dev/null +++ b/apps/edge-api/src/edge_api/storage.py @@ -0,0 +1,39 @@ +"""Database session management for Edge API Service""" + +import os +from contextlib import asynccontextmanager +from typing import AsyncIterator + +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlmodel import SQLModel + +from aitbc import get_logger + +logger = get_logger(__name__) + +# Database URL from environment variable or default +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://aitbc_edge:password@localhost:5432/aitbc_edge") + +# Create async engine +engine = create_async_engine(DATABASE_URL, echo=False) + + +async def init_db() -> None: + """Initialize database tables""" + from .schemas.island import IslandMembership, BridgeRequest + from .schemas.gpu import GPUListing + from .schemas.database import EdgeDatabase + from .schemas.serve import ComputeRequest, ComputeResult + from .schemas.metrics import EdgeMetrics + + async with engine.begin() as conn: + await conn.run_sync(SQLModel.metadata.create_all) + + logger.info("Edge API database initialized") + + +@asynccontextmanager +async def get_session() -> AsyncIterator[AsyncSession]: + """Get database session""" + async with AsyncSession(engine) as session: + yield session diff --git a/apps/governance-service/src/governance_service/main.py b/apps/governance-service/src/governance_service/main.py index a5218ba3..d7f6d373 100644 --- a/apps/governance-service/src/governance_service/main.py +++ b/apps/governance-service/src/governance_service/main.py @@ -113,7 +113,7 @@ async def get_profiles( svc: GovernanceService = Depends(get_governance_service), ): """Get governance profiles""" - return svc.list_profiles(role=role, user_id=user_id) + return await svc.list_profiles(role=role, user_id=user_id) @app.get("/v1/governance/profiles/{profile_id}") @@ -122,7 +122,7 @@ async def get_profile( svc: GovernanceService = Depends(get_governance_service), ): """Get a specific governance profile""" - return svc.get_profile(profile_id) + return await svc.get_profile(profile_id) @app.post("/v1/governance/profiles") @@ -131,7 +131,7 @@ async def create_profile( svc: GovernanceService = Depends(get_governance_service), ): """Create a new governance profile""" - return svc.create_profile(profile_data) + return await svc.create_profile(profile_data) @app.get("/v1/governance/proposals") @@ -142,7 +142,7 @@ async def get_proposals( svc: GovernanceService = Depends(get_governance_service), ): """Get governance proposals""" - return svc.list_proposals(status=status, category=category, proposer_id=proposer_id) + return await svc.list_proposals(status=status, category=category, proposer_id=proposer_id) @app.get("/v1/governance/proposals/{proposal_id}") @@ -151,7 +151,7 @@ async def get_proposal( svc: GovernanceService = Depends(get_governance_service), ): """Get a specific proposal""" - return svc.get_proposal(proposal_id) + return await svc.get_proposal(proposal_id) @app.post("/v1/governance/proposals") @@ -160,7 +160,7 @@ async def create_proposal( svc: GovernanceService = Depends(get_governance_service), ): """Create a new proposal""" - return svc.create_proposal(proposal_data) + return await svc.create_proposal(proposal_data) @app.get("/v1/governance/votes") @@ -170,7 +170,7 @@ async def get_votes( svc: GovernanceService = Depends(get_governance_service), ): """Get votes""" - return svc.list_votes(proposal_id=proposal_id, voter_id=voter_id) + return await svc.list_votes(proposal_id=proposal_id, voter_id=voter_id) @app.post("/v1/governance/votes") @@ -179,7 +179,7 @@ async def create_vote( svc: GovernanceService = Depends(get_governance_service), ): """Create a new vote""" - return svc.create_vote(vote_data) + return await svc.create_vote(vote_data) @app.get("/v1/governance/treasury") @@ -187,7 +187,7 @@ async def get_treasury( svc: GovernanceService = Depends(get_governance_service), ): """Get DAO treasury""" - return svc.get_treasury() + return await svc.get_treasury() @app.get("/v1/governance/analytics") diff --git a/apps/governance-service/src/governance_service/services/governance_service.py b/apps/governance-service/src/governance_service/services/governance_service.py index 56e21c93..741ea222 100644 --- a/apps/governance-service/src/governance_service/services/governance_service.py +++ b/apps/governance-service/src/governance_service/services/governance_service.py @@ -4,16 +4,17 @@ Governance service for managing governance operations from typing import Any -from sqlmodel import Session, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlmodel import select from ..domain.governance import GovernanceProfile, Proposal, Vote, DaoTreasury class GovernanceService: - def __init__(self, session: Session): + def __init__(self, session: AsyncSession): self.session = session - def list_profiles( + async def list_profiles( self, role: str | None = None, user_id: str | None = None, @@ -24,23 +25,24 @@ class GovernanceService: stmt = stmt.where(GovernanceProfile.role == role) if user_id: stmt = stmt.where(GovernanceProfile.user_id == user_id) - return list(self.session.execute(stmt).all()) + result = await self.session.execute(stmt) + return list(result.scalars().all()) - def get_profile(self, profile_id: str) -> GovernanceProfile | None: + async def get_profile(self, profile_id: str) -> GovernanceProfile | None: """Get a specific governance profile""" stmt = select(GovernanceProfile).where(GovernanceProfile.profile_id == profile_id) - result = self.session.execute(stmt).first() - return result[0] if result else None + result = await self.session.execute(stmt) + return result.scalars().first() - def create_profile(self, profile_data: dict) -> GovernanceProfile: + async def create_profile(self, profile_data: dict) -> GovernanceProfile: """Create a new governance profile""" profile = GovernanceProfile(**profile_data) self.session.add(profile) - self.session.commit() - self.session.refresh(profile) + await self.session.commit() + await self.session.refresh(profile) return profile - def list_proposals( + async def list_proposals( self, status: str | None = None, category: str | None = None, @@ -54,23 +56,24 @@ class GovernanceService: stmt = stmt.where(Proposal.category == category) if proposer_id: stmt = stmt.where(Proposal.proposer_id == proposer_id) - return list(self.session.execute(stmt).all()) + result = await self.session.execute(stmt) + return list(result.scalars().all()) - def get_proposal(self, proposal_id: str) -> Proposal | None: + async def get_proposal(self, proposal_id: str) -> Proposal | None: """Get a specific proposal""" stmt = select(Proposal).where(Proposal.proposal_id == proposal_id) - result = self.session.execute(stmt).first() - return result[0] if result else None + result = await self.session.execute(stmt) + return result.scalars().first() - def create_proposal(self, proposal_data: dict) -> Proposal: + async def create_proposal(self, proposal_data: dict) -> Proposal: """Create a new proposal""" proposal = Proposal(**proposal_data) self.session.add(proposal) - self.session.commit() - self.session.refresh(proposal) + await self.session.commit() + await self.session.refresh(proposal) return proposal - def list_votes( + async def list_votes( self, proposal_id: str | None = None, voter_id: str | None = None, @@ -81,25 +84,25 @@ class GovernanceService: stmt = stmt.where(Vote.proposal_id == proposal_id) if voter_id: stmt = stmt.where(Vote.voter_id == voter_id) - return list(self.session.execute(stmt).all()) + result = await self.session.execute(stmt) + return list(result.scalars().all()) - def create_vote(self, vote_data: dict) -> Vote: + async def create_vote(self, vote_data: dict) -> Vote: """Create a new vote""" vote = Vote(**vote_data) self.session.add(vote) - self.session.commit() - self.session.refresh(vote) + await self.session.commit() + await self.session.refresh(vote) return vote - def get_treasury(self) -> DaoTreasury | None: + async def get_treasury(self) -> DaoTreasury | None: """Get DAO treasury""" stmt = select(DaoTreasury).where(DaoTreasury.treasury_id == "main_treasury") - result = self.session.execute(stmt).first() - return result[0] if result else None + result = await self.session.execute(stmt) + return result.scalars().first() async def get_analytics(self, period: str = "monthly") -> dict[str, Any]: """Get governance analytics""" - # Placeholder for analytics logic return { "period": period, "total_proposals": 0,