Create edge-api service foundation - Phase 1 complete
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled

- Created edge-api service structure with FastAPI application
- Implemented all schema files (island, gpu, database, serve, metrics)
- Created router stub files for all modules
- Created service stub files for all modules
- Created client stub files (blockchain RPC, GPU service)
- Configured PostgreSQL database (aitbc_edge) with proper permissions
- Fixed SQLAlchemy reserved name conflict (metadata -> extra_data)
- Changed port to 8103 to avoid conflicts
- Service runs successfully on port 8103
- Health endpoint tested and working
- Created systemd service file
- Created README.md with documentation
This commit is contained in:
aitbc
2026-05-14 21:50:09 +02:00
parent 4a1e39dd3c
commit a9405a0d28
30 changed files with 1062 additions and 36 deletions

153
apps/edge-api/README.md Normal file
View File

@@ -0,0 +1,153 @@
# Edge API Service
REST API for AITBC island and edge operations, providing HTTP equivalents for CLI commands.
## Overview
The Edge API Service exposes island and edge operations via REST API, following the coordinator-api pattern. It integrates with the existing GPU service and blockchain node RPC.
## Architecture
- **Port:** 8103
- **Database:** PostgreSQL (aitbc_edge)
- **Communication:**
- Blockchain node RPC: localhost:8006
- GPU service: localhost:8101
- **Authentication:** JWT tokens (same as coordinator-api)
## Installation
```bash
cd /opt/aitbc/apps/edge-api
pip install -e .
```
## Database Setup
Create PostgreSQL database and user:
```bash
sudo -u postgres psql
CREATE DATABASE aitbc_edge;
CREATE USER aitbc_edge WITH PASSWORD 'password';
GRANT ALL PRIVILEGES ON DATABASE aitbc_edge TO aitbc_edge;
\q
```
## Running
### Development
```bash
cd /opt/aitbc/apps/edge-api
python -m edge_api.main
```
### Production (systemd)
```bash
# Install service
sudo cp edge-api.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable edge-api
sudo systemctl start edge-api
```
## API Endpoints
### Health Checks
- `GET /health` - Health check
- `GET /ready` - Readiness check
### Island Operations (Phase 2)
- `POST /v1/islands/join` - Join an island
- `POST /v1/islands/leave` - Leave an island
- `GET /v1/islands` - List all islands
- `GET /v1/islands/{island_id}` - Get island details
- `POST /v1/islands/bridge` - Request bridge
### GPU Operations (Phase 3)
- `POST /v1/gpu/listings` - List GPU on island
- `GET /v1/gpu/listings` - Get GPU listings
- `DELETE /v1/gpu/listings/{listing_id}` - Remove GPU listing
- `GET /v1/gpu/scan` - Scan GPUs
### Edge Database Operations (Phase 4)
- `POST /v1/database/init` - Initialize edge database
- `GET /v1/database` - Get edge database status
- `DELETE /v1/database` - Delete edge database
- `POST /v1/database/sync` - Sync edge database
### Edge Serve Operations (Phase 5)
- `POST /v1/serve/start` - Start serving
- `POST /v1/serve/stop` - Stop serving
- `GET /v1/serve/status` - Get serve status
- `GET /v1/serve/requests` - Get pending requests
- `POST /v1/serve/requests/{request_id}/complete` - Complete request
### Edge Metrics (Phase 6)
- `GET /v1/metrics` - Get edge metrics
- `GET /v1/metrics/gpu` - Get GPU metrics
- `GET /v1/metrics/database` - Get database metrics
## Implementation Status
**Phase 1: Foundation**
- Service structure
- FastAPI application
- Database configuration
- Basic stub endpoints
**Phase 2: Island Operations**
- Island join/leave endpoints
- Blockchain RPC integration
- Island listing
**Phase 3: GPU Operations**
- GPU listing endpoints
- GPU service integration
- GPU metrics
**Phase 4: Edge Database**
- Database initialization
- Database sync
- Database management
**Phase 5: Edge Serve**
- Serve start/stop
- Request queue
- Request processing
**Phase 6: Metrics**
- Metrics collection
- Metrics endpoints
- Performance monitoring
## CLI Integration
CLI commands will be updated to use REST API instead of placeholder implementations:
- `aitbc island join``POST /v1/islands/join`
- `aitbc gpu list-island``POST /v1/gpu/listings`
- `aitbc database init-edge``POST /v1/database/init`
- `aitbc edge serve``POST /v1/serve/start`
- `aitbc edge metrics``GET /v1/metrics`
## Agent SDK Integration
Agent SDK will be updated to use REST API for island and edge operations.
## Dependencies
- FastAPI
- SQLAlchemy + SQLModel
- PostgreSQL
- httpx
- PyJWT
- uvicorn

View File

@@ -0,0 +1,22 @@
[Unit]
Description=AITBC Edge API Service
After=network.target postgresql.service
[Service]
Type=simple
WorkingDirectory=/opt/aitbc/apps/edge-api
Environment="PATH=/opt/aitbc/venv/bin"
Environment="PYTHONPATH=/opt/aitbc/packages/py/aitbc-core/src:/opt/aitbc/apps/edge-api/src:/opt/aitbc"
Environment="DATABASE_URL=postgresql+asyncpg://aitbc_edge:password@localhost:5432/aitbc_edge"
Environment="BLOCKCHAIN_RPC_HOST=localhost"
Environment="BLOCKCHAIN_RPC_PORT=8006"
Environment="GPU_SERVICE_HOST=localhost"
Environment="GPU_SERVICE_PORT=8101"
Environment="JWT_SECRET_KEY=your-secret-key-change-in-production"
Environment="API_PORT=8103"
ExecStart=/opt/aitbc/venv/bin/python -m edge_api.main
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target

View File

@@ -0,0 +1,24 @@
[project]
name = "edge-api"
version = "0.1.0"
description = "Edge API Service for AITBC island and edge operations"
requires-python = ">=3.13"
dependencies = [
"fastapi>=0.109.0",
"uvicorn>=0.27.0",
"sqlmodel>=0.0.14",
"sqlalchemy[asyncio]>=2.0.25",
"asyncpg>=0.29.0",
"httpx>=0.26.0",
"pydantic>=2.5.0",
"pydantic-settings>=2.1.0",
"python-jose[cryptography]>=3.3.0",
"passlib[bcrypt]>=1.7.4",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/edge_api"]

View File

@@ -0,0 +1,3 @@
"""Edge API Service for AITBC island and edge operations"""
__version__ = "0.1.0"

View File

@@ -0,0 +1,9 @@
"""Clients for Edge API Service"""
from .blockchain_rpc import BlockchainRPCClient
from .gpu_service import GPUServiceClient
__all__ = [
"BlockchainRPCClient",
"GPUServiceClient",
]

View File

@@ -0,0 +1,38 @@
"""Blockchain RPC client for Edge API Service"""
import httpx
from typing import Dict, Optional
from ..config import settings
class BlockchainRPCClient:
"""Client for blockchain node RPC communication"""
def __init__(self):
self.base_url = f"http://{settings.blockchain_rpc_host}:{settings.blockchain_rpc_port}"
self.client = httpx.AsyncClient(timeout=30.0)
async def close(self):
"""Close the HTTP client"""
await self.client.aclose()
async def join_island(self, island_id: str, island_name: str, chain_id: str, role: str) -> Dict:
"""Join island via blockchain RPC - TODO: Implement in Phase 2"""
# TODO: Call blockchain node RPC endpoint for island join
return {"message": "join_island via RPC - to be implemented in Phase 2"}
async def leave_island(self, island_id: str) -> Dict:
"""Leave island via blockchain RPC - TODO: Implement in Phase 2"""
# TODO: Call blockchain node RPC endpoint for island leave
return {"message": "leave_island via RPC - to be implemented in Phase 2"}
async def get_island_info(self, island_id: str) -> Optional[Dict]:
"""Get island info via blockchain RPC - TODO: Implement in Phase 2"""
# TODO: Call blockchain node RPC endpoint for island info
return {"message": "get_island_info via RPC - to be implemented in Phase 2"}
async def request_bridge(self, target_island_id: str) -> Dict:
"""Request bridge via blockchain RPC - TODO: Implement in Phase 2"""
# TODO: Call blockchain node RPC endpoint for bridge request
return {"message": "request_bridge via RPC - to be implemented in Phase 2"}

View File

@@ -0,0 +1,33 @@
"""GPU service client for Edge API Service"""
import httpx
from typing import Dict, List
from ..config import settings
class GPUServiceClient:
"""Client for GPU service communication"""
def __init__(self):
self.base_url = f"http://{settings.gpu_service_host}:{settings.gpu_service_port}"
self.client = httpx.AsyncClient(timeout=30.0)
async def close(self):
"""Close the HTTP client"""
await self.client.aclose()
async def scan_gpus(self, miner_id: str) -> Dict:
"""Scan GPUs via GPU service - TODO: Implement in Phase 3"""
# TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/scan/{miner_id}
return {"message": "scan_gpus via GPU service - to be implemented in Phase 3"}
async def get_gpu_profiles(self) -> List[Dict]:
"""Get GPU profiles via GPU service - TODO: Implement in Phase 3"""
# TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/profiles
return [{"message": "get_gpu_profiles via GPU service - to be implemented in Phase 3"}]
async def get_gpu_metrics(self, gpu_id: str) -> Dict:
"""Get GPU metrics via GPU service - TODO: Implement in Phase 3"""
# TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/metrics/{gpu_id}
return {"message": "get_gpu_metrics via GPU service - to be implemented in Phase 3"}

View File

@@ -0,0 +1,43 @@
"""Configuration for Edge API Service"""
import os
from typing import Optional
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Edge API settings"""
# API settings
api_host: str = "0.0.0.0"
api_port: int = 8103
api_prefix: str = "/v1"
# Database settings
database_url: str = "postgresql+asyncpg://aitbc_edge:password@localhost:5432/aitbc_edge"
# Blockchain node RPC settings
blockchain_rpc_host: str = "localhost"
blockchain_rpc_port: int = 8006
# GPU service settings
gpu_service_host: str = "localhost"
gpu_service_port: int = 8101
# JWT settings
jwt_secret_key: str = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production")
jwt_algorithm: str = "HS256"
jwt_expiration_hours: int = 24
# CORS settings
cors_origins: list[str] = ["http://localhost:3000", "http://localhost:8080"]
# Logging
log_level: str = "INFO"
class Config:
env_file = ".env"
case_sensitive = False
settings = Settings()

View File

@@ -0,0 +1,104 @@
"""Main FastAPI application for Edge API Service"""
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from aitbc import get_logger
from .config import settings
from .storage import init_db
from .routers import islands, gpu, database, serve, metrics
logger = get_logger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup/shutdown"""
# Startup
logger.info("Starting Edge API Service")
await init_db()
logger.info("Database initialized")
yield
# Shutdown
logger.info("Shutting down Edge API Service")
# Create FastAPI application
app = FastAPI(
title="Edge API Service",
description="REST API for AITBC island and edge operations",
version="0.1.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc",
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Health check endpoint
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "edge-api",
"version": "0.1.0"
}
# Readiness check endpoint
@app.get("/ready")
async def readiness_check():
"""Readiness check endpoint"""
# TODO: Check database connection, blockchain RPC, GPU service
return {
"status": "ready",
"service": "edge-api",
"version": "0.1.0"
}
# Include routers
app.include_router(islands.router, prefix=f"{settings.api_prefix}/islands", tags=["islands"])
app.include_router(gpu.router, prefix=f"{settings.api_prefix}/gpu", tags=["gpu"])
app.include_router(database.router, prefix=f"{settings.api_prefix}/database", tags=["database"])
app.include_router(serve.router, prefix=f"{settings.api_prefix}/serve", tags=["serve"])
app.include_router(metrics.router, prefix=f"{settings.api_prefix}/metrics", tags=["metrics"])
# Global exception handler
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Global exception handler"""
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"detail": str(exc)
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"edge_api.main:app",
host=settings.api_host,
port=settings.api_port,
reload=True,
log_level=settings.log_level.lower()
)

View File

@@ -0,0 +1,15 @@
"""Routers for Edge API Service"""
from .islands import router as islands_router
from .gpu import router as gpu_router
from .database import router as database_router
from .serve import router as serve_router
from .metrics import router as metrics_router
__all__ = [
"islands_router",
"gpu_router",
"database_router",
"serve_router",
"metrics_router",
]

View File

@@ -0,0 +1,29 @@
"""Edge database operations router for Edge API Service"""
from fastapi import APIRouter
router = APIRouter()
@router.post("/init")
async def init_edge_database():
"""Initialize edge database - TODO: Implement in Phase 4"""
return {"message": "Edge database init endpoint - to be implemented in Phase 4"}
@router.get("/")
async def get_edge_database():
"""Get edge database status - TODO: Implement in Phase 4"""
return {"message": "Get edge database endpoint - to be implemented in Phase 4"}
@router.delete("/")
async def delete_edge_database():
"""Delete edge database - TODO: Implement in Phase 4"""
return {"message": "Delete edge database endpoint - to be implemented in Phase 4"}
@router.post("/sync")
async def sync_edge_database():
"""Sync edge database to main network - TODO: Implement in Phase 4"""
return {"message": "Sync edge database endpoint - to be implemented in Phase 4"}

View File

@@ -0,0 +1,29 @@
"""GPU operations router for Edge API Service"""
from fastapi import APIRouter
router = APIRouter()
@router.post("/listings")
async def list_gpu():
"""List GPU on island - TODO: Implement in Phase 3"""
return {"message": "GPU listing endpoint - to be implemented in Phase 3"}
@router.get("/listings")
async def get_gpu_listings():
"""Get GPU listings on island - TODO: Implement in Phase 3"""
return {"message": "Get GPU listings endpoint - to be implemented in Phase 3"}
@router.delete("/listings/{listing_id}")
async def remove_gpu_listing(listing_id: str):
"""Remove GPU listing - TODO: Implement in Phase 3"""
return {"message": f"Remove GPU listing {listing_id} - to be implemented in Phase 3"}
@router.get("/scan")
async def scan_gpus():
"""Scan GPUs on island - TODO: Implement in Phase 3"""
return {"message": "GPU scan endpoint - to be implemented in Phase 3"}

View File

@@ -0,0 +1,37 @@
"""Island operations router for Edge API Service"""
from fastapi import APIRouter, Depends
from ..schemas.island import IslandMembership, BridgeRequest
router = APIRouter()
@router.post("/join")
async def join_island():
"""Join an island - TODO: Implement in Phase 2"""
return {"message": "Island join endpoint - to be implemented in Phase 2"}
@router.post("/leave")
async def leave_island():
"""Leave an island - TODO: Implement in Phase 2"""
return {"message": "Island leave endpoint - to be implemented in Phase 2"}
@router.get("/")
async def list_islands():
"""List all islands - TODO: Implement in Phase 2"""
return {"message": "List islands endpoint - to be implemented in Phase 2"}
@router.get("/{island_id}")
async def get_island(island_id: str):
"""Get island details - TODO: Implement in Phase 2"""
return {"message": f"Get island {island_id} - to be implemented in Phase 2"}
@router.post("/bridge")
async def request_bridge():
"""Request bridge to another island - TODO: Implement in Phase 2"""
return {"message": "Bridge request endpoint - to be implemented in Phase 2"}

View File

@@ -0,0 +1,23 @@
"""Edge metrics router for Edge API Service"""
from fastapi import APIRouter
router = APIRouter()
@router.get("/")
async def get_edge_metrics():
"""Get edge metrics for island - TODO: Implement in Phase 6"""
return {"message": "Get edge metrics endpoint - to be implemented in Phase 6"}
@router.get("/gpu")
async def get_gpu_metrics():
"""Get GPU metrics - TODO: Implement in Phase 6"""
return {"message": "Get GPU metrics endpoint - to be implemented in Phase 6"}
@router.get("/database")
async def get_database_metrics():
"""Get database metrics - TODO: Implement in Phase 6"""
return {"message": "Get database metrics endpoint - to be implemented in Phase 6"}

View File

@@ -0,0 +1,35 @@
"""Edge serve operations router for Edge API Service"""
from fastapi import APIRouter
router = APIRouter()
@router.post("/start")
async def start_serve():
"""Start serving edge compute requests - TODO: Implement in Phase 5"""
return {"message": "Start serve endpoint - to be implemented in Phase 5"}
@router.post("/stop")
async def stop_serve():
"""Stop serving edge compute requests - TODO: Implement in Phase 5"""
return {"message": "Stop serve endpoint - to be implemented in Phase 5"}
@router.get("/status")
async def get_serve_status():
"""Get serve status - TODO: Implement in Phase 5"""
return {"message": "Get serve status endpoint - to be implemented in Phase 5"}
@router.get("/requests")
async def get_pending_requests():
"""Get pending compute requests - TODO: Implement in Phase 5"""
return {"message": "Get pending requests endpoint - to be implemented in Phase 5"}
@router.post("/requests/{request_id}/complete")
async def complete_request(request_id: str):
"""Complete a compute request - TODO: Implement in Phase 5"""
return {"message": f"Complete request {request_id} - to be implemented in Phase 5"}

View File

@@ -0,0 +1,17 @@
"""Schemas for Edge API Service"""
from .island import IslandMembership, BridgeRequest
from .gpu import GPUListing
from .database import EdgeDatabase
from .serve import ComputeRequest, ComputeResult
from .metrics import EdgeMetrics
__all__ = [
"IslandMembership",
"BridgeRequest",
"GPUListing",
"EdgeDatabase",
"ComputeRequest",
"ComputeResult",
"EdgeMetrics",
]

View File

@@ -0,0 +1,31 @@
"""Edge database-related schemas for Edge API Service"""
from datetime import datetime, timezone
from uuid import uuid4
from sqlalchemy import JSON, Column
from sqlmodel import Field, SQLModel
class EdgeDatabase(SQLModel, table=True):
"""Edge database instance"""
__tablename__ = "edge_databases"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"edge_db_{uuid4().hex[:8]}", primary_key=True)
database_id: str = Field(index=True)
island_id: str = Field(index=True)
capacity_gb: int
used_gb: int = Field(default=0)
status: str = Field(default="initialized", index=True) # initialized, active, syncing, error
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
# Sync information
last_sync_at: datetime | None = Field(default=None)
sync_status: str = Field(default="idle") # idle, syncing, error
records_synced: int = Field(default=0)
# Database metadata
extra_data: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=True))

View File

@@ -0,0 +1,35 @@
"""GPU-related schemas for Edge API Service"""
from datetime import datetime, timezone
from uuid import uuid4
from sqlalchemy import JSON, Column
from sqlmodel import Field, SQLModel
class GPUListing(SQLModel, table=True):
"""GPU listing on island"""
__tablename__ = "gpu_listings"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"gpu_listing_{uuid4().hex[:8]}", primary_key=True)
listing_id: str = Field(index=True)
island_id: str = Field(index=True)
miner_id: str = Field(index=True)
gpu_type: str = Field(index=True)
price_per_hour: float
status: str = Field(default="active", index=True) # active, inactive, booked
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
# GPU specifications
memory_gb: int | None = Field(default=None)
cuda_version: str = Field(default="")
region: str = Field(default="")
# Capabilities
capabilities: list = Field(default_factory=list, sa_column=Column(JSON, nullable=True))
# Listing metadata
extra_data: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=True))

View File

@@ -0,0 +1,50 @@
"""Island-related schemas for Edge API Service"""
from datetime import datetime, timezone
from enum import StrEnum
from uuid import uuid4
from sqlalchemy import JSON, Column
from sqlmodel import Field, SQLModel
class IslandStatus(StrEnum):
"""Island membership status"""
ACTIVE = "active"
INACTIVE = "inactive"
BRIDGING = "bridging"
class IslandMembership(SQLModel, table=True):
"""Island membership in edge API database"""
__tablename__ = "island_memberships"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"membership_{uuid4().hex[:8]}", primary_key=True)
island_id: str = Field(index=True)
island_name: str
chain_id: str = Field(index=True)
status: IslandStatus = Field(default=IslandStatus.ACTIVE, index=True)
role: str = Field(default="compute-provider") # compute-provider, consumer, hub
joined_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
peer_count: int = Field(default=0)
# Additional metadata
extra_data: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=True))
class BridgeRequest(SQLModel, table=True):
"""Bridge request for island connectivity"""
__tablename__ = "bridge_requests"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"bridge_req_{uuid4().hex[:8]}", primary_key=True)
request_id: str = Field(index=True)
source_island_id: str
target_island_id: str
source_node_id: str
status: str = Field(default="pending", index=True) # pending, approved, rejected
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

View File

@@ -0,0 +1,38 @@
"""Edge metrics-related schemas for Edge API Service"""
from datetime import datetime, timezone
from uuid import uuid4
from sqlmodel import Field, SQLModel
class EdgeMetrics(SQLModel, table=True):
"""Edge performance metrics"""
__tablename__ = "edge_metrics"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"edge_metric_{uuid4().hex[:8]}", primary_key=True)
island_id: str = Field(index=True)
gpu_id: str | None = Field(default=None, index=True)
database_id: str | None = Field(default=None, index=True)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), index=True)
# GPU metrics
gpu_utilization: float = Field(default=0.0)
gpu_temperature: float | None = Field(default=None)
gpu_power: float | None = Field(default=None)
# Memory metrics
memory_utilization: float = Field(default=0.0)
memory_used_gb: float = Field(default=0.0)
# Request metrics
request_rate: float = Field(default=0.0) # requests per second
latency_avg: float = Field(default=0.0) # milliseconds
active_requests: int = Field(default=0)
# Database metrics
database_size_gb: float = Field(default=0.0)
query_rate: float = Field(default=0.0) # queries per second
sync_lag: float = Field(default=0.0) # seconds behind

View File

@@ -0,0 +1,52 @@
"""Edge serve-related schemas for Edge API Service"""
from datetime import datetime, timezone
from uuid import uuid4
from sqlalchemy import JSON, Column
from sqlmodel import Field, SQLModel
class ComputeRequest(SQLModel, table=True):
"""Compute request in edge serve queue"""
__tablename__ = "compute_requests"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"compute_req_{uuid4().hex[:8]}", primary_key=True)
request_id: str = Field(index=True)
island_id: str = Field(index=True)
gpu_type: str
status: str = Field(default="pending", index=True) # pending, processing, completed, failed
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
# Request parameters
model_name: str
input_data: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=True))
# Processing info
assigned_gpu_id: str | None = Field(default=None)
started_at: datetime | None = Field(default=None)
completed_at: datetime | None = Field(default=None)
# Result
result: dict | None = Field(default=None, sa_column=Column(JSON, nullable=True))
error: str | None = Field(default=None)
class ComputeResult(SQLModel, table=True):
"""Compute result cache"""
__tablename__ = "compute_results"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"compute_res_{uuid4().hex[:8]}", primary_key=True)
result_id: str = Field(index=True)
request_id: str = Field(index=True)
island_id: str = Field(index=True)
gpu_id: str
result: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=False))
cache_ttl: int = Field(default=3600) # 1 hour default
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
expires_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

View File

@@ -0,0 +1,15 @@
"""Services for Edge API Service"""
from .island_service import IslandService
from .gpu_service import GPUService
from .database_service import DatabaseService
from .serve_service import ServeService
from .metrics_service import MetricsService
__all__ = [
"IslandService",
"GPUService",
"DatabaseService",
"ServeService",
"MetricsService",
]

View File

@@ -0,0 +1,29 @@
"""Edge database service for Edge API Service"""
from typing import Dict, Optional
from ..schemas.database import EdgeDatabase
class DatabaseService:
"""Service for edge database operations"""
def __init__(self):
# TODO: Initialize database session in Phase 4
pass
async def init_edge_database(self, island_id: str, capacity_gb: int) -> Dict:
"""Initialize edge database - TODO: Implement in Phase 4"""
return {"message": "init_edge_database - to be implemented in Phase 4"}
async def get_edge_database(self, island_id: str) -> Optional[Dict]:
"""Get edge database status - TODO: Implement in Phase 4"""
return {"message": "get_edge_database - to be implemented in Phase 4"}
async def delete_edge_database(self, database_id: str) -> Dict:
"""Delete edge database - TODO: Implement in Phase 4"""
return {"message": f"delete_edge_database {database_id} - to be implemented in Phase 4"}
async def sync_edge_database(self, database_id: str) -> Dict:
"""Sync edge database to main network - TODO: Implement in Phase 4"""
return {"message": f"sync_edge_database {database_id} - to be implemented in Phase 4"}

View File

@@ -0,0 +1,29 @@
"""GPU service for Edge API Service"""
from typing import Dict, List, Optional
from ..schemas.gpu import GPUListing
class GPUService:
"""Service for GPU operations"""
def __init__(self):
# TODO: Initialize GPU service client in Phase 3
pass
async def list_gpu(self, island_id: str, gpu_type: str, price: float) -> Dict:
"""List GPU on island - TODO: Implement in Phase 3"""
return {"message": "list_gpu - to be implemented in Phase 3"}
async def get_gpu_listings(self, island_id: str) -> List[Dict]:
"""Get GPU listings on island - TODO: Implement in Phase 3"""
return [{"message": "get_gpu_listings - to be implemented in Phase 3"}]
async def remove_gpu_listing(self, listing_id: str) -> Dict:
"""Remove GPU listing - TODO: Implement in Phase 3"""
return {"message": f"remove_gpu_listing {listing_id} - to be implemented in Phase 3"}
async def scan_gpus(self, miner_id: str) -> Dict:
"""Scan GPUs on island - TODO: Implement in Phase 3"""
return {"message": "scan_gpus - to be implemented in Phase 3"}

View File

@@ -0,0 +1,33 @@
"""Island service for Edge API Service"""
from typing import Dict, List, Optional
from ..schemas.island import IslandMembership, BridgeRequest
class IslandService:
"""Service for island operations"""
def __init__(self):
# TODO: Initialize blockchain RPC client in Phase 2
pass
async def join_island(self, island_id: str, island_name: str, chain_id: str, role: str) -> Dict:
"""Join an island - TODO: Implement in Phase 2"""
return {"message": "join_island - to be implemented in Phase 2"}
async def leave_island(self, island_id: str) -> Dict:
"""Leave an island - TODO: Implement in Phase 2"""
return {"message": "leave_island - to be implemented in Phase 2"}
async def list_islands(self) -> List[Dict]:
"""List all islands - TODO: Implement in Phase 2"""
return [{"message": "list_islands - to be implemented in Phase 2"}]
async def get_island(self, island_id: str) -> Optional[Dict]:
"""Get island details - TODO: Implement in Phase 2"""
return {"message": f"get_island {island_id} - to be implemented in Phase 2"}
async def request_bridge(self, target_island_id: str) -> Dict:
"""Request bridge to another island - TODO: Implement in Phase 2"""
return {"message": "request_bridge - to be implemented in Phase 2"}

View File

@@ -0,0 +1,25 @@
"""Edge metrics service for Edge API Service"""
from typing import Dict, List, Optional
from ..schemas.metrics import EdgeMetrics
class MetricsService:
"""Service for edge metrics operations"""
def __init__(self):
# TODO: Initialize metrics collection in Phase 6
pass
async def get_edge_metrics(self, island_id: str) -> Dict:
"""Get edge metrics for island - TODO: Implement in Phase 6"""
return {"message": "get_edge_metrics - to be implemented in Phase 6"}
async def get_gpu_metrics(self, island_id: str) -> List[Dict]:
"""Get GPU metrics - TODO: Implement in Phase 6"""
return [{"message": "get_gpu_metrics - to be implemented in Phase 6"}]
async def get_database_metrics(self, island_id: str) -> Dict:
"""Get database metrics - TODO: Implement in Phase 6"""
return {"message": "get_database_metrics - to be implemented in Phase 6"}

View File

@@ -0,0 +1,33 @@
"""Edge serve service for Edge API Service"""
from typing import Dict, List
from ..schemas.serve import ComputeRequest, ComputeResult
class ServeService:
"""Service for edge serve operations"""
def __init__(self):
# TODO: Initialize serve queue in Phase 5
pass
async def start_serve(self, island_id: str) -> Dict:
"""Start serving edge compute requests - TODO: Implement in Phase 5"""
return {"message": "start_serve - to be implemented in Phase 5"}
async def stop_serve(self, island_id: str) -> Dict:
"""Stop serving edge compute requests - TODO: Implement in Phase 5"""
return {"message": "stop_serve - to be implemented in Phase 5"}
async def get_serve_status(self, island_id: str) -> Dict:
"""Get serve status - TODO: Implement in Phase 5"""
return {"message": "get_serve_status - to be implemented in Phase 5"}
async def get_pending_requests(self, island_id: str) -> List[Dict]:
"""Get pending compute requests - TODO: Implement in Phase 5"""
return [{"message": "get_pending_requests - to be implemented in Phase 5"}]
async def complete_request(self, request_id: str, result: Dict) -> Dict:
"""Complete a compute request - TODO: Implement in Phase 5"""
return {"message": f"complete_request {request_id} - to be implemented in Phase 5"}

View File

@@ -0,0 +1,39 @@
"""Database session management for Edge API Service"""
import os
from contextlib import asynccontextmanager
from typing import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlmodel import SQLModel
from aitbc import get_logger
logger = get_logger(__name__)
# Database URL from environment variable or default
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://aitbc_edge:password@localhost:5432/aitbc_edge")
# Create async engine
engine = create_async_engine(DATABASE_URL, echo=False)
async def init_db() -> None:
"""Initialize database tables"""
from .schemas.island import IslandMembership, BridgeRequest
from .schemas.gpu import GPUListing
from .schemas.database import EdgeDatabase
from .schemas.serve import ComputeRequest, ComputeResult
from .schemas.metrics import EdgeMetrics
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
logger.info("Edge API database initialized")
@asynccontextmanager
async def get_session() -> AsyncIterator[AsyncSession]:
"""Get database session"""
async with AsyncSession(engine) as session:
yield session

View File

@@ -113,7 +113,7 @@ async def get_profiles(
svc: GovernanceService = Depends(get_governance_service),
):
"""Get governance profiles"""
return svc.list_profiles(role=role, user_id=user_id)
return await svc.list_profiles(role=role, user_id=user_id)
@app.get("/v1/governance/profiles/{profile_id}")
@@ -122,7 +122,7 @@ async def get_profile(
svc: GovernanceService = Depends(get_governance_service),
):
"""Get a specific governance profile"""
return svc.get_profile(profile_id)
return await svc.get_profile(profile_id)
@app.post("/v1/governance/profiles")
@@ -131,7 +131,7 @@ async def create_profile(
svc: GovernanceService = Depends(get_governance_service),
):
"""Create a new governance profile"""
return svc.create_profile(profile_data)
return await svc.create_profile(profile_data)
@app.get("/v1/governance/proposals")
@@ -142,7 +142,7 @@ async def get_proposals(
svc: GovernanceService = Depends(get_governance_service),
):
"""Get governance proposals"""
return svc.list_proposals(status=status, category=category, proposer_id=proposer_id)
return await svc.list_proposals(status=status, category=category, proposer_id=proposer_id)
@app.get("/v1/governance/proposals/{proposal_id}")
@@ -151,7 +151,7 @@ async def get_proposal(
svc: GovernanceService = Depends(get_governance_service),
):
"""Get a specific proposal"""
return svc.get_proposal(proposal_id)
return await svc.get_proposal(proposal_id)
@app.post("/v1/governance/proposals")
@@ -160,7 +160,7 @@ async def create_proposal(
svc: GovernanceService = Depends(get_governance_service),
):
"""Create a new proposal"""
return svc.create_proposal(proposal_data)
return await svc.create_proposal(proposal_data)
@app.get("/v1/governance/votes")
@@ -170,7 +170,7 @@ async def get_votes(
svc: GovernanceService = Depends(get_governance_service),
):
"""Get votes"""
return svc.list_votes(proposal_id=proposal_id, voter_id=voter_id)
return await svc.list_votes(proposal_id=proposal_id, voter_id=voter_id)
@app.post("/v1/governance/votes")
@@ -179,7 +179,7 @@ async def create_vote(
svc: GovernanceService = Depends(get_governance_service),
):
"""Create a new vote"""
return svc.create_vote(vote_data)
return await svc.create_vote(vote_data)
@app.get("/v1/governance/treasury")
@@ -187,7 +187,7 @@ async def get_treasury(
svc: GovernanceService = Depends(get_governance_service),
):
"""Get DAO treasury"""
return svc.get_treasury()
return await svc.get_treasury()
@app.get("/v1/governance/analytics")

View File

@@ -4,16 +4,17 @@ Governance service for managing governance operations
from typing import Any
from sqlmodel import Session, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select
from ..domain.governance import GovernanceProfile, Proposal, Vote, DaoTreasury
class GovernanceService:
def __init__(self, session: Session):
def __init__(self, session: AsyncSession):
self.session = session
def list_profiles(
async def list_profiles(
self,
role: str | None = None,
user_id: str | None = None,
@@ -24,23 +25,24 @@ class GovernanceService:
stmt = stmt.where(GovernanceProfile.role == role)
if user_id:
stmt = stmt.where(GovernanceProfile.user_id == user_id)
return list(self.session.execute(stmt).all())
result = await self.session.execute(stmt)
return list(result.scalars().all())
def get_profile(self, profile_id: str) -> GovernanceProfile | None:
async def get_profile(self, profile_id: str) -> GovernanceProfile | None:
"""Get a specific governance profile"""
stmt = select(GovernanceProfile).where(GovernanceProfile.profile_id == profile_id)
result = self.session.execute(stmt).first()
return result[0] if result else None
result = await self.session.execute(stmt)
return result.scalars().first()
def create_profile(self, profile_data: dict) -> GovernanceProfile:
async def create_profile(self, profile_data: dict) -> GovernanceProfile:
"""Create a new governance profile"""
profile = GovernanceProfile(**profile_data)
self.session.add(profile)
self.session.commit()
self.session.refresh(profile)
await self.session.commit()
await self.session.refresh(profile)
return profile
def list_proposals(
async def list_proposals(
self,
status: str | None = None,
category: str | None = None,
@@ -54,23 +56,24 @@ class GovernanceService:
stmt = stmt.where(Proposal.category == category)
if proposer_id:
stmt = stmt.where(Proposal.proposer_id == proposer_id)
return list(self.session.execute(stmt).all())
result = await self.session.execute(stmt)
return list(result.scalars().all())
def get_proposal(self, proposal_id: str) -> Proposal | None:
async def get_proposal(self, proposal_id: str) -> Proposal | None:
"""Get a specific proposal"""
stmt = select(Proposal).where(Proposal.proposal_id == proposal_id)
result = self.session.execute(stmt).first()
return result[0] if result else None
result = await self.session.execute(stmt)
return result.scalars().first()
def create_proposal(self, proposal_data: dict) -> Proposal:
async def create_proposal(self, proposal_data: dict) -> Proposal:
"""Create a new proposal"""
proposal = Proposal(**proposal_data)
self.session.add(proposal)
self.session.commit()
self.session.refresh(proposal)
await self.session.commit()
await self.session.refresh(proposal)
return proposal
def list_votes(
async def list_votes(
self,
proposal_id: str | None = None,
voter_id: str | None = None,
@@ -81,25 +84,25 @@ class GovernanceService:
stmt = stmt.where(Vote.proposal_id == proposal_id)
if voter_id:
stmt = stmt.where(Vote.voter_id == voter_id)
return list(self.session.execute(stmt).all())
result = await self.session.execute(stmt)
return list(result.scalars().all())
def create_vote(self, vote_data: dict) -> Vote:
async def create_vote(self, vote_data: dict) -> Vote:
"""Create a new vote"""
vote = Vote(**vote_data)
self.session.add(vote)
self.session.commit()
self.session.refresh(vote)
await self.session.commit()
await self.session.refresh(vote)
return vote
def get_treasury(self) -> DaoTreasury | None:
async def get_treasury(self) -> DaoTreasury | None:
"""Get DAO treasury"""
stmt = select(DaoTreasury).where(DaoTreasury.treasury_id == "main_treasury")
result = self.session.execute(stmt).first()
return result[0] if result else None
result = await self.session.execute(stmt)
return result.scalars().first()
async def get_analytics(self, period: str = "monthly") -> dict[str, Any]:
"""Get governance analytics"""
# Placeholder for analytics logic
return {
"period": period,
"total_proposals": 0,