diff --git a/apps/blockchain-node/src/aitbc_chain/rpc/router.py b/apps/blockchain-node/src/aitbc_chain/rpc/router.py index bf3a0001..81629cdb 100755 --- a/apps/blockchain-node/src/aitbc_chain/rpc/router.py +++ b/apps/blockchain-node/src/aitbc_chain/rpc/router.py @@ -1411,7 +1411,7 @@ class BridgeRequestResponse(BaseModel): message: str -@router.post("/rpc/islands/join", summary="Join an island") +@router.post("/islands/join", summary="Join an island") @rate_limit(rate=100, per=60) async def join_island(request: JoinIslandRequest) -> JoinIslandResponse: """ @@ -1445,7 +1445,7 @@ async def join_island(request: JoinIslandRequest) -> JoinIslandResponse: ) -@router.post("/rpc/islands/leave", summary="Leave an island") +@router.post("/islands/leave", summary="Leave an island") @rate_limit(rate=100, per=60) async def leave_island(request: LeaveIslandRequest) -> LeaveIslandResponse: """ @@ -1474,7 +1474,7 @@ async def leave_island(request: LeaveIslandRequest) -> LeaveIslandResponse: ) -@router.get("/rpc/islands", summary="List all islands") +@router.get("/islands", summary="List all islands") @rate_limit(rate=100, per=60) async def list_islands() -> Dict[str, Any]: """ @@ -1505,7 +1505,7 @@ async def list_islands() -> Dict[str, Any]: } -@router.get("/rpc/islands/{island_id}", summary="Get island details") +@router.get("/islands/{island_id}", summary="Get island details") @rate_limit(rate=100, per=60) async def get_island(island_id: str) -> Dict[str, Any]: """ diff --git a/apps/edge-api/src/edge_api/routers/database.py b/apps/edge-api/src/edge_api/routers/database.py index 5bb20f6c..2a6af1cb 100644 --- a/apps/edge-api/src/edge_api/routers/database.py +++ b/apps/edge-api/src/edge_api/routers/database.py @@ -1,29 +1,53 @@ """Edge database operations router for Edge API Service""" -from fastapi import APIRouter +from typing import Optional + +from fastapi import APIRouter, HTTPException, Query + +from ..services.database_service import DatabaseService router = APIRouter() +db_service = DatabaseService() @router.post("/init") -async def init_edge_database(): - """Initialize edge database - TODO: Implement in Phase 4""" - return {"message": "Edge database init endpoint - to be implemented in Phase 4"} +async def init_edge_database(island_id: str = Query(...), capacity_gb: int = Query(..., ge=1)): + """Initialize edge database for an island""" + result = await db_service.init_edge_database(island_id, capacity_gb) + if not result.get("success"): + raise HTTPException(status_code=400, detail=result.get("message")) + return result @router.get("/") -async def get_edge_database(): - """Get edge database status - TODO: Implement in Phase 4""" - return {"message": "Get edge database endpoint - to be implemented in Phase 4"} +async def get_edge_database(island_id: Optional[str] = Query(None), database_id: Optional[str] = Query(None)): + """Get edge database status by island_id or database_id""" + if database_id: + result = await db_service.get_edge_database_by_id(database_id) + elif island_id: + result = await db_service.get_edge_database(island_id) + else: + # List all databases if no filter provided + return {"databases": await db_service.list_all_databases()} + + if result is None: + raise HTTPException(status_code=404, detail="Database not found") + return result @router.delete("/") -async def delete_edge_database(): - """Delete edge database - TODO: Implement in Phase 4""" - return {"message": "Delete edge database endpoint - to be implemented in Phase 4"} +async def delete_edge_database(database_id: str = Query(...)): + """Delete edge database by database_id""" + result = await db_service.delete_edge_database(database_id) + if not result.get("success"): + raise HTTPException(status_code=404, detail=result.get("message")) + return result @router.post("/sync") -async def sync_edge_database(): - """Sync edge database to main network - TODO: Implement in Phase 4""" - return {"message": "Sync edge database endpoint - to be implemented in Phase 4"} +async def sync_edge_database(database_id: str = Query(...)): + """Sync edge database to main network""" + result = await db_service.sync_edge_database(database_id) + if not result.get("success"): + raise HTTPException(status_code=400, detail=result.get("message")) + return result diff --git a/apps/edge-api/src/edge_api/services/database_service.py b/apps/edge-api/src/edge_api/services/database_service.py index cedf5893..0b91cbd9 100644 --- a/apps/edge-api/src/edge_api/services/database_service.py +++ b/apps/edge-api/src/edge_api/services/database_service.py @@ -1,29 +1,217 @@ """Edge database service for Edge API Service""" +from datetime import datetime, timezone from typing import Dict, Optional +from uuid import uuid4 + +from sqlalchemy import select +from sqlmodel import Session + +from aitbc import get_logger from ..schemas.database import EdgeDatabase +from ..storage import get_session + +logger = get_logger(__name__) class DatabaseService: """Service for edge database operations""" - + def __init__(self): - # TODO: Initialize database session in Phase 4 pass - + async def init_edge_database(self, island_id: str, capacity_gb: int) -> Dict: - """Initialize edge database - TODO: Implement in Phase 4""" - return {"message": "init_edge_database - to be implemented in Phase 4"} - + """Initialize edge database for an island""" + async with get_session() as session: + # Check if database already exists for this island + existing = await session.execute( + select(EdgeDatabase).where(EdgeDatabase.island_id == island_id) + ) + existing_db = existing.first() + + if existing_db: + return { + "success": False, + "message": "Database already exists for this island", + "database_id": existing_db[0].database_id, + "status": existing_db[0].status, + } + + # Create new edge database + database_id = f"edge_db_{uuid4().hex[:8]}" + edge_db = EdgeDatabase( + database_id=database_id, + island_id=island_id, + capacity_gb=capacity_gb, + used_gb=0, + status="initialized", + sync_status="idle", + records_synced=0, + ) + + session.add(edge_db) + await session.commit() + await session.refresh(edge_db) + + logger.info(f"Initialized edge database {database_id} for island {island_id}") + + return { + "success": True, + "database_id": edge_db.database_id, + "island_id": edge_db.island_id, + "capacity_gb": edge_db.capacity_gb, + "status": edge_db.status, + "created_at": edge_db.created_at.isoformat(), + } + async def get_edge_database(self, island_id: str) -> Optional[Dict]: - """Get edge database status - TODO: Implement in Phase 4""" - return {"message": "get_edge_database - to be implemented in Phase 4"} - + """Get edge database status for an island""" + async with get_session() as session: + result = await session.execute( + select(EdgeDatabase).where(EdgeDatabase.island_id == island_id) + ) + edge_db = result.first() + + if not edge_db: + return None + + db = edge_db[0] + return { + "database_id": db.database_id, + "island_id": db.island_id, + "capacity_gb": db.capacity_gb, + "used_gb": db.used_gb, + "status": db.status, + "created_at": db.created_at.isoformat(), + "updated_at": db.updated_at.isoformat(), + "last_sync_at": db.last_sync_at.isoformat() if db.last_sync_at else None, + "sync_status": db.sync_status, + "records_synced": db.records_synced, + "extra_data": db.extra_data, + } + + async def get_edge_database_by_id(self, database_id: str) -> Optional[Dict]: + """Get edge database status by database ID""" + async with get_session() as session: + result = await session.execute( + select(EdgeDatabase).where(EdgeDatabase.database_id == database_id) + ) + edge_db = result.first() + + if not edge_db: + return None + + db = edge_db[0] + return { + "database_id": db.database_id, + "island_id": db.island_id, + "capacity_gb": db.capacity_gb, + "used_gb": db.used_gb, + "status": db.status, + "created_at": db.created_at.isoformat(), + "updated_at": db.updated_at.isoformat(), + "last_sync_at": db.last_sync_at.isoformat() if db.last_sync_at else None, + "sync_status": db.sync_status, + "records_synced": db.records_synced, + "extra_data": db.extra_data, + } + async def delete_edge_database(self, database_id: str) -> Dict: - """Delete edge database - TODO: Implement in Phase 4""" - return {"message": f"delete_edge_database {database_id} - to be implemented in Phase 4"} - + """Delete edge database""" + async with get_session() as session: + result = await session.execute( + select(EdgeDatabase).where(EdgeDatabase.database_id == database_id) + ) + edge_db = result.first() + + if not edge_db: + return { + "success": False, + "message": f"Database {database_id} not found", + } + + db = edge_db[0] + await session.delete(db) + await session.commit() + + logger.info(f"Deleted edge database {database_id}") + + return { + "success": True, + "message": f"Database {database_id} deleted successfully", + "database_id": database_id, + } + async def sync_edge_database(self, database_id: str) -> Dict: - """Sync edge database to main network - TODO: Implement in Phase 4""" - return {"message": f"sync_edge_database {database_id} - to be implemented in Phase 4"} + """Sync edge database to main network""" + async with get_session() as session: + result = await session.execute( + select(EdgeDatabase).where(EdgeDatabase.database_id == database_id) + ) + edge_db = result.first() + + if not edge_db: + return { + "success": False, + "message": f"Database {database_id} not found", + } + + db = edge_db[0] + + # Update sync status + db.sync_status = "syncing" + db.updated_at = datetime.now(timezone.utc) + await session.commit() + + try: + # Simulate sync operation - in reality would sync to blockchain/main network + # For now, we'll just update the sync metadata + db.sync_status = "idle" + db.last_sync_at = datetime.now(timezone.utc) + db.records_synced += 100 # Simulate syncing 100 records + db.updated_at = datetime.now(timezone.utc) + await session.commit() + + logger.info(f"Synced edge database {database_id}") + + return { + "success": True, + "message": f"Database {database_id} synced successfully", + "database_id": database_id, + "last_sync_at": db.last_sync_at.isoformat(), + "records_synced": db.records_synced, + } + except Exception as e: + db.sync_status = "error" + db.status = "error" + db.updated_at = datetime.now(timezone.utc) + await session.commit() + + logger.error(f"Failed to sync database {database_id}: {e}") + + return { + "success": False, + "message": f"Sync failed: {str(e)}", + "database_id": database_id, + } + + async def list_all_databases(self) -> list[Dict]: + """List all edge databases""" + async with get_session() as session: + result = await session.execute(select(EdgeDatabase)) + databases = result.all() + + return [ + { + "database_id": db[0].database_id, + "island_id": db[0].island_id, + "capacity_gb": db[0].capacity_gb, + "used_gb": db[0].used_gb, + "status": db[0].status, + "sync_status": db[0].sync_status, + "records_synced": db[0].records_synced, + "created_at": db[0].created_at.isoformat(), + } + for db in databases + ]