Phase 3 complete: GPU operations endpoints
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled

Edge API:
- Implemented GPU service client to call existing GPU service endpoints
- Implemented GPU service to store GPU listings in database
- Implemented GPU router endpoints (list, get, remove, scan, metrics)
- GPU service already has necessary endpoints (profiles, scan, metrics)

Testing:
- Edge API GPU endpoints working on port 8103
- List GPUs returns empty list (expected - no GPUs registered)
This commit is contained in:
aitbc
2026-05-14 22:24:02 +02:00
parent 4097c71276
commit 13c60d6486
3 changed files with 150 additions and 43 deletions

View File

@@ -1,7 +1,7 @@
"""GPU service client for Edge API Service""" """GPU service client for Edge API Service"""
import httpx import httpx
from typing import Dict, List from typing import Dict, Optional, Any, List
from ..config import settings from ..config import settings
@@ -17,17 +17,34 @@ class GPUServiceClient:
"""Close the HTTP client""" """Close the HTTP client"""
await self.client.aclose() await self.client.aclose()
async def scan_gpus(self, miner_id: str) -> Dict: async def scan_gpus(self, miner_id: str) -> Dict[str, Any]:
"""Scan GPUs via GPU service - TODO: Implement in Phase 3""" """Scan GPUs via GPU service"""
# TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/scan/{miner_id} response = await self.client.post(f"{self.base_url}/v1/marketplace/edge-gpu/scan/{miner_id}")
return {"message": "scan_gpus via GPU service - to be implemented in Phase 3"} response.raise_for_status()
return response.json()
async def get_gpu_profiles(self) -> List[Dict]: async def get_gpu_profiles(self, architecture: str = None, edge_optimized: bool = None, min_memory_gb: int = None) -> List[Dict[str, Any]]:
"""Get GPU profiles via GPU service - TODO: Implement in Phase 3""" """Get GPU profiles via GPU service"""
# TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/profiles params = {}
return [{"message": "get_gpu_profiles via GPU service - to be implemented in Phase 3"}] if architecture:
params["architecture"] = architecture
if edge_optimized is not None:
params["edge_optimized"] = edge_optimized
if min_memory_gb is not None:
params["min_memory_gb"] = min_memory_gb
response = await self.client.get(f"{self.base_url}/v1/marketplace/edge-gpu/profiles", params=params)
response.raise_for_status()
return response.json()
async def get_gpu_metrics(self, gpu_id: str) -> Dict: async def get_gpu_metrics(self, gpu_id: str, limit: int = 100) -> List[Dict[str, Any]]:
"""Get GPU metrics via GPU service - TODO: Implement in Phase 3""" """Get GPU metrics via GPU service"""
# TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/metrics/{gpu_id} response = await self.client.get(f"{self.base_url}/v1/marketplace/edge-gpu/metrics/{gpu_id}", params={"limit": limit})
return {"message": "get_gpu_metrics via GPU service - to be implemented in Phase 3"} response.raise_for_status()
return response.json()
async def get_miner_gpus(self, miner_id: str) -> List[Dict[str, Any]]:
"""Get GPUs registered by a miner"""
response = await self.client.get(f"{self.base_url}/v1/miners/{miner_id}/gpus")
response.raise_for_status()
return response.json()

View File

@@ -1,29 +1,67 @@
"""GPU operations router for Edge API Service""" """GPU operations router for Edge API Service"""
from fastapi import APIRouter from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from ..services.gpu_service import GPUService
router = APIRouter() router = APIRouter()
@router.post("/listings") class ScanGPUsRequest(BaseModel):
async def list_gpu(): """Request model for scanning GPUs"""
"""List GPU on island - TODO: Implement in Phase 3""" miner_id: str
return {"message": "GPU listing endpoint - to be implemented in Phase 3"}
@router.get("/listings") def get_gpu_service() -> GPUService:
async def get_gpu_listings(): """Dependency injection for GPU service"""
"""Get GPU listings on island - TODO: Implement in Phase 3""" return GPUService()
return {"message": "Get GPU listings endpoint - to be implemented in Phase 3"}
@router.delete("/listings/{listing_id}") @router.get("/")
async def remove_gpu_listing(listing_id: str): async def list_gpus(
"""Remove GPU listing - TODO: Implement in Phase 3""" architecture: str = Query(None),
return {"message": f"Remove GPU listing {listing_id} - to be implemented in Phase 3"} edge_optimized: bool = Query(None),
min_memory_gb: int = Query(None),
svc: GPUService = Depends(get_gpu_service)
):
"""List all GPUs"""
gpus = await svc.list_gpus(architecture=architecture, edge_optimized=edge_optimized, min_memory_gb=min_memory_gb)
return {"gpus": gpus, "total": len(gpus)}
@router.get("/scan") @router.get("/{gpu_id}")
async def scan_gpus(): async def get_gpu_listing(gpu_id: str, svc: GPUService = Depends(get_gpu_service)):
"""Scan GPUs on island - TODO: Implement in Phase 3""" """Get GPU listing details"""
return {"message": "GPU scan endpoint - to be implemented in Phase 3"} gpu = await svc.get_gpu_listing(gpu_id)
if gpu is None:
raise HTTPException(status_code=404, detail=f"GPU {gpu_id} not found")
return gpu
@router.delete("/{gpu_id}")
async def remove_gpu_listing(gpu_id: str, svc: GPUService = Depends(get_gpu_service)):
"""Remove GPU listing"""
success = await svc.remove_gpu_listing(gpu_id)
if success:
return {"message": f"GPU {gpu_id} removed"}
else:
raise HTTPException(status_code=404, detail=f"GPU {gpu_id} not found")
@router.post("/scan")
async def scan_gpus(request: ScanGPUsRequest, svc: GPUService = Depends(get_gpu_service)):
"""Scan GPUs for a miner"""
result = await svc.scan_gpus(request.miner_id)
return result
@router.get("/{gpu_id}/metrics")
async def get_gpu_metrics(
gpu_id: str,
limit: int = Query(100),
svc: GPUService = Depends(get_gpu_service)
):
"""Get GPU metrics"""
metrics = await svc.get_gpu_metrics(gpu_id, limit)
return {"gpu_id": gpu_id, "metrics": metrics, "total": len(metrics)}

View File

@@ -2,6 +2,8 @@
from typing import Dict, List, Optional from typing import Dict, List, Optional
from ..clients.gpu_service import GPUServiceClient
from ..storage import get_session
from ..schemas.gpu import GPUListing from ..schemas.gpu import GPUListing
@@ -9,21 +11,71 @@ class GPUService:
"""Service for GPU operations""" """Service for GPU operations"""
def __init__(self): def __init__(self):
# TODO: Initialize GPU service client in Phase 3 self.gpu_client = GPUServiceClient()
pass
async def list_gpu(self, island_id: str, gpu_type: str, price: float) -> Dict: async def list_gpus(self, architecture: str = None, edge_optimized: bool = None, min_memory_gb: int = None) -> List[Dict]:
"""List GPU on island - TODO: Implement in Phase 3""" """List GPUs via GPU service"""
return {"message": "list_gpu - to be implemented in Phase 3"} profiles = await self.gpu_client.get_gpu_profiles(architecture, edge_optimized, min_memory_gb)
# Store GPU listings in edge-api database
async with get_session() as session:
for profile in profiles:
gpu_listing = GPUListing(
gpu_id=profile.get("id", ""),
model=profile.get("model", "Unknown"),
memory_gb=profile.get("memory_gb", 0),
cuda_version=profile.get("cuda_version", ""),
region=profile.get("region", ""),
capabilities=profile.get("capabilities", []),
extra_data=profile
)
session.add(gpu_listing)
await session.commit()
return profiles
async def get_gpu_listings(self, island_id: str) -> List[Dict]: async def get_gpu_listing(self, gpu_id: str) -> Optional[Dict]:
"""Get GPU listings on island - TODO: Implement in Phase 3""" """Get GPU listing details"""
return [{"message": "get_gpu_listings - to be implemented in Phase 3"}] # Get from GPU service
try:
profiles = await self.gpu_client.get_gpu_profiles()
for profile in profiles:
if profile.get("id") == gpu_id:
return profile
return None
except Exception as e:
# Fall back to database
from sqlmodel import select
async with get_session() as session:
result = await session.execute(select(GPUListing).where(GPUListing.gpu_id == gpu_id))
gpu = result.scalar_one_or_none()
if gpu:
return {
"id": gpu.gpu_id,
"model": gpu.model,
"memory_gb": gpu.memory_gb,
"cuda_version": gpu.cuda_version,
"region": gpu.region,
"capabilities": gpu.capabilities,
"extra_data": gpu.extra_data
}
return None
async def remove_gpu_listing(self, listing_id: str) -> Dict: async def remove_gpu_listing(self, gpu_id: str) -> bool:
"""Remove GPU listing - TODO: Implement in Phase 3""" """Remove GPU listing from database"""
return {"message": f"remove_gpu_listing {listing_id} - to be implemented in Phase 3"} from sqlmodel import delete
async with get_session() as session:
stmt = delete(GPUListing).where(GPUListing.gpu_id == gpu_id)
result = await session.execute(stmt)
await session.commit()
return result.rowcount > 0
async def scan_gpus(self, miner_id: str) -> Dict: async def scan_gpus(self, miner_id: str) -> Dict:
"""Scan GPUs on island - TODO: Implement in Phase 3""" """Scan GPUs via GPU service"""
return {"message": "scan_gpus - to be implemented in Phase 3"} result = await self.gpu_client.scan_gpus(miner_id)
return result
async def get_gpu_metrics(self, gpu_id: str, limit: int = 100) -> List[Dict]:
"""Get GPU metrics via GPU service"""
metrics = await self.gpu_client.get_gpu_metrics(gpu_id, limit)
return metrics