From 13c60d6486516243c332aedaacb4aa696b70e19a Mon Sep 17 00:00:00 2001 From: aitbc Date: Thu, 14 May 2026 22:24:02 +0200 Subject: [PATCH] Phase 3 complete: GPU operations endpoints Edge API: - Implemented GPU service client to call existing GPU service endpoints - Implemented GPU service to store GPU listings in database - Implemented GPU router endpoints (list, get, remove, scan, metrics) - GPU service already has necessary endpoints (profiles, scan, metrics) Testing: - Edge API GPU endpoints working on port 8103 - List GPUs returns empty list (expected - no GPUs registered) --- .../src/edge_api/clients/gpu_service.py | 43 ++++++---- apps/edge-api/src/edge_api/routers/gpu.py | 72 +++++++++++++---- .../src/edge_api/services/gpu_service.py | 78 +++++++++++++++---- 3 files changed, 150 insertions(+), 43 deletions(-) diff --git a/apps/edge-api/src/edge_api/clients/gpu_service.py b/apps/edge-api/src/edge_api/clients/gpu_service.py index 88e28f7d..e7e711e8 100644 --- a/apps/edge-api/src/edge_api/clients/gpu_service.py +++ b/apps/edge-api/src/edge_api/clients/gpu_service.py @@ -1,7 +1,7 @@ """GPU service client for Edge API Service""" import httpx -from typing import Dict, List +from typing import Dict, Optional, Any, List from ..config import settings @@ -17,17 +17,34 @@ class GPUServiceClient: """Close the HTTP client""" await self.client.aclose() - async def scan_gpus(self, miner_id: str) -> Dict: - """Scan GPUs via GPU service - TODO: Implement in Phase 3""" - # TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/scan/{miner_id} - return {"message": "scan_gpus via GPU service - to be implemented in Phase 3"} + async def scan_gpus(self, miner_id: str) -> Dict[str, Any]: + """Scan GPUs via GPU service""" + response = await self.client.post(f"{self.base_url}/v1/marketplace/edge-gpu/scan/{miner_id}") + response.raise_for_status() + return response.json() - async def get_gpu_profiles(self) -> List[Dict]: - """Get GPU profiles via GPU service - TODO: Implement in Phase 3""" - # TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/profiles - return [{"message": "get_gpu_profiles via GPU service - to be implemented in Phase 3"}] + async def get_gpu_profiles(self, architecture: str = None, edge_optimized: bool = None, min_memory_gb: int = None) -> List[Dict[str, Any]]: + """Get GPU profiles via GPU service""" + params = {} + if architecture: + params["architecture"] = architecture + if edge_optimized is not None: + params["edge_optimized"] = edge_optimized + if min_memory_gb is not None: + params["min_memory_gb"] = min_memory_gb + + response = await self.client.get(f"{self.base_url}/v1/marketplace/edge-gpu/profiles", params=params) + response.raise_for_status() + return response.json() - async def get_gpu_metrics(self, gpu_id: str) -> Dict: - """Get GPU metrics via GPU service - TODO: Implement in Phase 3""" - # TODO: Call GPU service endpoint /v1/marketplace/edge-gpu/metrics/{gpu_id} - return {"message": "get_gpu_metrics via GPU service - to be implemented in Phase 3"} + async def get_gpu_metrics(self, gpu_id: str, limit: int = 100) -> List[Dict[str, Any]]: + """Get GPU metrics via GPU service""" + response = await self.client.get(f"{self.base_url}/v1/marketplace/edge-gpu/metrics/{gpu_id}", params={"limit": limit}) + response.raise_for_status() + return response.json() + + async def get_miner_gpus(self, miner_id: str) -> List[Dict[str, Any]]: + """Get GPUs registered by a miner""" + response = await self.client.get(f"{self.base_url}/v1/miners/{miner_id}/gpus") + response.raise_for_status() + return response.json() diff --git a/apps/edge-api/src/edge_api/routers/gpu.py b/apps/edge-api/src/edge_api/routers/gpu.py index bc7d9c4a..9f51dfec 100644 --- a/apps/edge-api/src/edge_api/routers/gpu.py +++ b/apps/edge-api/src/edge_api/routers/gpu.py @@ -1,29 +1,67 @@ """GPU operations router for Edge API Service""" -from fastapi import APIRouter +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel, Field + +from ..services.gpu_service import GPUService router = APIRouter() -@router.post("/listings") -async def list_gpu(): - """List GPU on island - TODO: Implement in Phase 3""" - return {"message": "GPU listing endpoint - to be implemented in Phase 3"} +class ScanGPUsRequest(BaseModel): + """Request model for scanning GPUs""" + miner_id: str -@router.get("/listings") -async def get_gpu_listings(): - """Get GPU listings on island - TODO: Implement in Phase 3""" - return {"message": "Get GPU listings endpoint - to be implemented in Phase 3"} +def get_gpu_service() -> GPUService: + """Dependency injection for GPU service""" + return GPUService() -@router.delete("/listings/{listing_id}") -async def remove_gpu_listing(listing_id: str): - """Remove GPU listing - TODO: Implement in Phase 3""" - return {"message": f"Remove GPU listing {listing_id} - to be implemented in Phase 3"} +@router.get("/") +async def list_gpus( + architecture: str = Query(None), + edge_optimized: bool = Query(None), + min_memory_gb: int = Query(None), + svc: GPUService = Depends(get_gpu_service) +): + """List all GPUs""" + gpus = await svc.list_gpus(architecture=architecture, edge_optimized=edge_optimized, min_memory_gb=min_memory_gb) + return {"gpus": gpus, "total": len(gpus)} -@router.get("/scan") -async def scan_gpus(): - """Scan GPUs on island - TODO: Implement in Phase 3""" - return {"message": "GPU scan endpoint - to be implemented in Phase 3"} +@router.get("/{gpu_id}") +async def get_gpu_listing(gpu_id: str, svc: GPUService = Depends(get_gpu_service)): + """Get GPU listing details""" + gpu = await svc.get_gpu_listing(gpu_id) + if gpu is None: + raise HTTPException(status_code=404, detail=f"GPU {gpu_id} not found") + return gpu + + +@router.delete("/{gpu_id}") +async def remove_gpu_listing(gpu_id: str, svc: GPUService = Depends(get_gpu_service)): + """Remove GPU listing""" + success = await svc.remove_gpu_listing(gpu_id) + if success: + return {"message": f"GPU {gpu_id} removed"} + else: + raise HTTPException(status_code=404, detail=f"GPU {gpu_id} not found") + + +@router.post("/scan") +async def scan_gpus(request: ScanGPUsRequest, svc: GPUService = Depends(get_gpu_service)): + """Scan GPUs for a miner""" + result = await svc.scan_gpus(request.miner_id) + return result + + +@router.get("/{gpu_id}/metrics") +async def get_gpu_metrics( + gpu_id: str, + limit: int = Query(100), + svc: GPUService = Depends(get_gpu_service) +): + """Get GPU metrics""" + metrics = await svc.get_gpu_metrics(gpu_id, limit) + return {"gpu_id": gpu_id, "metrics": metrics, "total": len(metrics)} diff --git a/apps/edge-api/src/edge_api/services/gpu_service.py b/apps/edge-api/src/edge_api/services/gpu_service.py index 2608a99c..5a858ec5 100644 --- a/apps/edge-api/src/edge_api/services/gpu_service.py +++ b/apps/edge-api/src/edge_api/services/gpu_service.py @@ -2,6 +2,8 @@ from typing import Dict, List, Optional +from ..clients.gpu_service import GPUServiceClient +from ..storage import get_session from ..schemas.gpu import GPUListing @@ -9,21 +11,71 @@ class GPUService: """Service for GPU operations""" def __init__(self): - # TODO: Initialize GPU service client in Phase 3 - pass + self.gpu_client = GPUServiceClient() - async def list_gpu(self, island_id: str, gpu_type: str, price: float) -> Dict: - """List GPU on island - TODO: Implement in Phase 3""" - return {"message": "list_gpu - to be implemented in Phase 3"} + async def list_gpus(self, architecture: str = None, edge_optimized: bool = None, min_memory_gb: int = None) -> List[Dict]: + """List GPUs via GPU service""" + profiles = await self.gpu_client.get_gpu_profiles(architecture, edge_optimized, min_memory_gb) + + # Store GPU listings in edge-api database + async with get_session() as session: + for profile in profiles: + gpu_listing = GPUListing( + gpu_id=profile.get("id", ""), + model=profile.get("model", "Unknown"), + memory_gb=profile.get("memory_gb", 0), + cuda_version=profile.get("cuda_version", ""), + region=profile.get("region", ""), + capabilities=profile.get("capabilities", []), + extra_data=profile + ) + session.add(gpu_listing) + await session.commit() + + return profiles - async def get_gpu_listings(self, island_id: str) -> List[Dict]: - """Get GPU listings on island - TODO: Implement in Phase 3""" - return [{"message": "get_gpu_listings - to be implemented in Phase 3"}] + async def get_gpu_listing(self, gpu_id: str) -> Optional[Dict]: + """Get GPU listing details""" + # Get from GPU service + try: + profiles = await self.gpu_client.get_gpu_profiles() + for profile in profiles: + if profile.get("id") == gpu_id: + return profile + return None + except Exception as e: + # Fall back to database + from sqlmodel import select + async with get_session() as session: + result = await session.execute(select(GPUListing).where(GPUListing.gpu_id == gpu_id)) + gpu = result.scalar_one_or_none() + if gpu: + return { + "id": gpu.gpu_id, + "model": gpu.model, + "memory_gb": gpu.memory_gb, + "cuda_version": gpu.cuda_version, + "region": gpu.region, + "capabilities": gpu.capabilities, + "extra_data": gpu.extra_data + } + return None - async def remove_gpu_listing(self, listing_id: str) -> Dict: - """Remove GPU listing - TODO: Implement in Phase 3""" - return {"message": f"remove_gpu_listing {listing_id} - to be implemented in Phase 3"} + async def remove_gpu_listing(self, gpu_id: str) -> bool: + """Remove GPU listing from database""" + from sqlmodel import delete + async with get_session() as session: + stmt = delete(GPUListing).where(GPUListing.gpu_id == gpu_id) + result = await session.execute(stmt) + await session.commit() + return result.rowcount > 0 async def scan_gpus(self, miner_id: str) -> Dict: - """Scan GPUs on island - TODO: Implement in Phase 3""" - return {"message": "scan_gpus - to be implemented in Phase 3"} + """Scan GPUs via GPU service""" + result = await self.gpu_client.scan_gpus(miner_id) + return result + + async def get_gpu_metrics(self, gpu_id: str, limit: int = 100) -> List[Dict]: + """Get GPU metrics via GPU service""" + metrics = await self.gpu_client.get_gpu_metrics(gpu_id, limit) + return metrics