diff --git a/apps/coordinator-api/src/app/contexts/ipfs/routers/__init__.py b/apps/coordinator-api/src/app/contexts/ipfs/routers/__init__.py new file mode 100644 index 00000000..a8b4f65f --- /dev/null +++ b/apps/coordinator-api/src/app/contexts/ipfs/routers/__init__.py @@ -0,0 +1,5 @@ +"""IPFS routers for Coordinator API""" + +from .ipfs import router + +__all__ = ["router"] diff --git a/apps/coordinator-api/src/app/contexts/ipfs/routers/ipfs.py b/apps/coordinator-api/src/app/contexts/ipfs/routers/ipfs.py new file mode 100644 index 00000000..37ba2292 --- /dev/null +++ b/apps/coordinator-api/src/app/contexts/ipfs/routers/ipfs.py @@ -0,0 +1,262 @@ +"""IPFS storage router for Coordinator API""" + +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel, Field + +from ...services.ipfs_storage_service import IPFSStorageService, IPFSUploadResult +from ...config import settings + +router = APIRouter() + + +# Pydantic models for requests/responses +class IPFSUploadRequest(BaseModel): + """Request model for IPFS upload""" + agent_id: str + memory_data: dict + memory_type: str = "experience" + tags: list[str] = Field(default_factory=list) + compress: bool = True + pin: bool = False + + +class IPFSRetrieveRequest(BaseModel): + """Request model for IPFS retrieve""" + cid: str + verify_integrity: bool = True + + +class IPFSBatchUploadRequest(BaseModel): + """Request model for batch IPFS upload""" + agent_id: str + memories: list[dict] + batch_size: int = Field(default=10, ge=1, le=50) + + +class IPFSCreateDealRequest(BaseModel): + """Request model for creating Filecoin deal""" + cid: str + duration: int = Field(default=180, ge=1) + + +class IPFSDeleteRequest(BaseModel): + """Request model for IPFS delete""" + cid: str + + +# Dependency injection for IPFS service +def get_ipfs_service(): + """Get IPFS storage service instance""" + # In production, this would be a singleton or dependency injection + # For now, create a new instance with config + config = { + "ipfs_url": settings.ipfs_url if hasattr(settings, 'ipfs_url') else "/ip4/127.0.0.1/tcp/5001", + "blockchain_enabled": False, + "compression_threshold": 1024, + "pin_threshold": 100, + } + service = IPFSStorageService(config) + return service + + +@router.post("/upload") +async def upload_memory(request: IPFSUploadRequest): + """Upload agent memory data to IPFS""" + try: + service = get_ipfs_service() + await service.initialize() + + result = await service.upload_memory( + agent_id=request.agent_id, + memory_data=request.memory_data, + memory_type=request.memory_type, + tags=request.tags, + compress=request.compress, + pin=request.pin, + ) + + return { + "success": True, + "cid": result.cid, + "size": result.size, + "compressed_size": result.compressed_size, + "upload_time": result.upload_time.isoformat(), + "pinned": result.pinned, + "filecoin_deal": result.filecoin_deal, + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}") + + +@router.post("/retrieve") +async def retrieve_memory(request: IPFSRetrieveRequest): + """Retrieve memory data from IPFS by CID""" + try: + service = get_ipfs_service() + await service.initialize() + + memory_data, metadata = await service.retrieve_memory( + cid=request.cid, + verify_integrity=request.verify_integrity, + ) + + return { + "success": True, + "cid": request.cid, + "memory_data": memory_data, + "metadata": { + "agent_id": metadata.agent_id, + "memory_type": metadata.memory_type, + "timestamp": metadata.timestamp.isoformat(), + "version": metadata.version, + "tags": metadata.tags, + "compression_ratio": metadata.compression_ratio, + "integrity_hash": metadata.integrity_hash, + }, + } + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Retrieve failed: {str(e)}") + + +@router.post("/batch-upload") +async def batch_upload_memories(request: IPFSBatchUploadRequest): + """Upload multiple memories in batches to IPFS""" + try: + service = get_ipfs_service() + await service.initialize() + + # Convert memories to tuples for the service + memory_tuples = [(mem.get("data", {}), mem.get("type", "experience"), mem.get("tags", [])) for mem in request.memories] + + results = await service.batch_upload_memories( + agent_id=request.agent_id, + memories=memory_tuples, + batch_size=request.batch_size, + ) + + return { + "success": True, + "total_uploaded": len(results), + "results": [ + { + "cid": r.cid, + "size": r.size, + "compressed_size": r.compressed_size, + "pinned": r.pinned, + } + for r in results + ], + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Batch upload failed: {str(e)}") + + +@router.post("/create-deal") +async def create_filecoin_deal(request: IPFSCreateDealRequest): + """Create Filecoin storage deal for CID persistence""" + try: + service = get_ipfs_service() + await service.initialize() + + deal_id = await service.create_filecoin_deal( + cid=request.cid, + duration=request.duration, + ) + + if deal_id is None: + raise HTTPException(status_code=500, detail="Failed to create Filecoin deal") + + return { + "success": True, + "deal_id": deal_id, + "cid": request.cid, + "duration": request.duration, + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Deal creation failed: {str(e)}") + + +@router.get("/list/{agent_id}") +async def list_agent_memories( + agent_id: str, + limit: int = Query(default=100, ge=1, le=1000), +): + """List all memory CIDs for an agent""" + try: + service = get_ipfs_service() + await service.initialize() + + cids = await service.list_agent_memories(agent_id=agent_id, limit=limit) + + return { + "success": True, + "agent_id": agent_id, + "total": len(cids), + "cids": cids, + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"List failed: {str(e)}") + + +@router.delete("/delete") +async def delete_memory(request: IPFSDeleteRequest): + """Delete/unpin memory from IPFS""" + try: + service = get_ipfs_service() + await service.initialize() + + success = await service.delete_memory(cid=request.cid) + + if not success: + raise HTTPException(status_code=404, detail=f"Failed to delete CID {request.cid}") + + return { + "success": True, + "message": f"Memory {request.cid} deleted successfully", + "cid": request.cid, + } + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}") + + +@router.get("/stats") +async def get_storage_stats(): + """Get IPFS storage statistics""" + try: + service = get_ipfs_service() + await service.initialize() + + stats = await service.get_storage_stats() + + return { + "success": True, + "stats": stats, + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get stats: {str(e)}") + + +@router.get("/health") +async def health_check(): + """Health check for IPFS service""" + try: + service = get_ipfs_service() + await service.initialize() + + return { + "status": "healthy", + "service": "ipfs-storage", + "message": "IPFS service is operational", + } + except Exception as e: + return { + "status": "unhealthy", + "service": "ipfs-storage", + "message": f"IPFS service error: {str(e)}", + } diff --git a/apps/coordinator-api/src/app/main.py b/apps/coordinator-api/src/app/main.py index 1b750a2c..1a9f1db8 100755 --- a/apps/coordinator-api/src/app/main.py +++ b/apps/coordinator-api/src/app/main.py @@ -70,6 +70,7 @@ from .contexts.payments.routers import payments from .contexts.blockchain.routers import blockchain from .contexts.agent_identity.routers import agent_identity from .contexts.cross_chain.routers.cross_chain_integration import router as cross_chain +from .contexts.ipfs.routers import router as ipfs # Skip optional routers with missing dependencies try: @@ -369,7 +370,10 @@ def create_app() -> FastAPI: # Add blockchain router for CLI compatibility app.include_router(blockchain, prefix="/v1") - + + # Add IPFS storage router + app.include_router(ipfs, prefix="/v1/ipfs", tags=["ipfs"]) + # Add edge GPU router app.include_router(edge_gpu, prefix="/v1") diff --git a/apps/edge-api/src/edge_api/routers/database.py b/apps/edge-api/src/edge_api/routers/database.py index 2a6af1cb..5aa79cf4 100644 --- a/apps/edge-api/src/edge_api/routers/database.py +++ b/apps/edge-api/src/edge_api/routers/database.py @@ -1,53 +1,58 @@ """Edge database operations router for Edge API Service""" -from typing import Optional - -from fastapi import APIRouter, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel, Field from ..services.database_service import DatabaseService router = APIRouter() -db_service = DatabaseService() + + +class InitDatabaseRequest(BaseModel): + """Request model for initializing a database""" + db_name: str + db_type: str = Field(default="postgresql") + config: dict = Field(default_factory=dict) + + +class SyncDatabaseRequest(BaseModel): + """Request model for syncing a database""" + source_url: str = Field(default=None) + + +def get_database_service() -> DatabaseService: + """Dependency injection for database service""" + return DatabaseService() @router.post("/init") -async def init_edge_database(island_id: str = Query(...), capacity_gb: int = Query(..., ge=1)): - """Initialize edge database for an island""" - result = await db_service.init_edge_database(island_id, capacity_gb) - if not result.get("success"): - raise HTTPException(status_code=400, detail=result.get("message")) +async def init_database(request: InitDatabaseRequest, svc: DatabaseService = Depends(get_database_service)): + """Initialize edge database""" + result = await svc.init_database(request.db_name, request.db_type, request.config) return result -@router.get("/") -async def get_edge_database(island_id: Optional[str] = Query(None), database_id: Optional[str] = Query(None)): - """Get edge database status by island_id or database_id""" - if database_id: - result = await db_service.get_edge_database_by_id(database_id) - elif island_id: - result = await db_service.get_edge_database(island_id) +@router.get("/{db_id}") +async def get_database(db_id: str, svc: DatabaseService = Depends(get_database_service)): + """Get database details""" + db = await svc.get_database(db_id) + if db is None: + raise HTTPException(status_code=404, detail=f"Database {db_id} not found") + return db + + +@router.delete("/{db_id}") +async def delete_database(db_id: str, svc: DatabaseService = Depends(get_database_service)): + """Delete database""" + success = await svc.delete_database(db_id) + if success: + return {"message": f"Database {db_id} deleted"} else: - # List all databases if no filter provided - return {"databases": await db_service.list_all_databases()} + raise HTTPException(status_code=404, detail=f"Database {db_id} not found") - if result is None: - raise HTTPException(status_code=404, detail="Database not found") - return result - - -@router.delete("/") -async def delete_edge_database(database_id: str = Query(...)): - """Delete edge database by database_id""" - result = await db_service.delete_edge_database(database_id) - if not result.get("success"): - raise HTTPException(status_code=404, detail=result.get("message")) - return result - - -@router.post("/sync") -async def sync_edge_database(database_id: str = Query(...)): - """Sync edge database to main network""" - result = await db_service.sync_edge_database(database_id) - if not result.get("success"): - raise HTTPException(status_code=400, detail=result.get("message")) + +@router.post("/{db_id}/sync") +async def sync_database(db_id: str, request: SyncDatabaseRequest, svc: DatabaseService = Depends(get_database_service)): + """Sync database from source""" + result = await svc.sync_database(db_id, request.source_url) return result diff --git a/apps/edge-api/src/edge_api/services/database_service.py b/apps/edge-api/src/edge_api/services/database_service.py index 0b91cbd9..f34e948e 100644 --- a/apps/edge-api/src/edge_api/services/database_service.py +++ b/apps/edge-api/src/edge_api/services/database_service.py @@ -1,217 +1,132 @@ -"""Edge database service for Edge API Service""" +"""Database service for Edge API Service""" +from typing import Dict, Optional, List from datetime import datetime, timezone -from typing import Dict, Optional from uuid import uuid4 -from sqlalchemy import select -from sqlmodel import Session - -from aitbc import get_logger - -from ..schemas.database import EdgeDatabase from ..storage import get_session - -logger = get_logger(__name__) +from ..schemas.database import EdgeDatabase +from sqlmodel import select, delete class DatabaseService: """Service for edge database operations""" - - def __init__(self): - pass - - async def init_edge_database(self, island_id: str, capacity_gb: int) -> Dict: - """Initialize edge database for an island""" + + async def init_database(self, database_id: str, island_id: str, capacity_gb: int) -> Dict: + """Initialize edge database""" async with get_session() as session: - # Check if database already exists for this island - existing = await session.execute( - select(EdgeDatabase).where(EdgeDatabase.island_id == island_id) - ) - existing_db = existing.first() - + # Check if database already exists + result = await session.execute(select(EdgeDatabase).where(EdgeDatabase.database_id == database_id)) + existing_db = result.scalar_one_or_none() + if existing_db: return { "success": False, - "message": "Database already exists for this island", - "database_id": existing_db[0].database_id, - "status": existing_db[0].status, + "message": f"Database {database_id} already exists", + "database": existing_db.database_id } - - # Create new edge database - database_id = f"edge_db_{uuid4().hex[:8]}" - edge_db = EdgeDatabase( + + # Create new database record + db = EdgeDatabase( database_id=database_id, island_id=island_id, capacity_gb=capacity_gb, used_gb=0, status="initialized", sync_status="idle", - records_synced=0, + records_synced=0 ) - - session.add(edge_db) + session.add(db) await session.commit() - await session.refresh(edge_db) - - logger.info(f"Initialized edge database {database_id} for island {island_id}") - + return { "success": True, - "database_id": edge_db.database_id, - "island_id": edge_db.island_id, - "capacity_gb": edge_db.capacity_gb, - "status": edge_db.status, - "created_at": edge_db.created_at.isoformat(), + "message": f"Database {database_id} initialized", + "database": database_id, + "id": db.id } - - async def get_edge_database(self, island_id: str) -> Optional[Dict]: - """Get edge database status for an island""" + + async def get_database(self, database_id: str) -> Optional[Dict]: + """Get database details""" async with get_session() as session: - result = await session.execute( - select(EdgeDatabase).where(EdgeDatabase.island_id == island_id) - ) - edge_db = result.first() - - if not edge_db: - return None - - db = edge_db[0] - return { - "database_id": db.database_id, - "island_id": db.island_id, - "capacity_gb": db.capacity_gb, - "used_gb": db.used_gb, - "status": db.status, - "created_at": db.created_at.isoformat(), - "updated_at": db.updated_at.isoformat(), - "last_sync_at": db.last_sync_at.isoformat() if db.last_sync_at else None, - "sync_status": db.sync_status, - "records_synced": db.records_synced, - "extra_data": db.extra_data, - } - - async def get_edge_database_by_id(self, database_id: str) -> Optional[Dict]: - """Get edge database status by database ID""" - async with get_session() as session: - result = await session.execute( - select(EdgeDatabase).where(EdgeDatabase.database_id == database_id) - ) - edge_db = result.first() - - if not edge_db: - return None - - db = edge_db[0] - return { - "database_id": db.database_id, - "island_id": db.island_id, - "capacity_gb": db.capacity_gb, - "used_gb": db.used_gb, - "status": db.status, - "created_at": db.created_at.isoformat(), - "updated_at": db.updated_at.isoformat(), - "last_sync_at": db.last_sync_at.isoformat() if db.last_sync_at else None, - "sync_status": db.sync_status, - "records_synced": db.records_synced, - "extra_data": db.extra_data, - } - - async def delete_edge_database(self, database_id: str) -> Dict: - """Delete edge database""" - async with get_session() as session: - result = await session.execute( - select(EdgeDatabase).where(EdgeDatabase.database_id == database_id) - ) - edge_db = result.first() - - if not edge_db: + result = await session.execute(select(EdgeDatabase).where(EdgeDatabase.database_id == database_id)) + db = result.scalar_one_or_none() + + if db: return { - "success": False, - "message": f"Database {database_id} not found", + "id": db.id, + "database_id": db.database_id, + "island_id": db.island_id, + "capacity_gb": db.capacity_gb, + "used_gb": db.used_gb, + "status": db.status, + "created_at": db.created_at.isoformat() if db.created_at else None, + "updated_at": db.updated_at.isoformat() if db.updated_at else None, + "last_sync_at": db.last_sync_at.isoformat() if db.last_sync_at else None, + "sync_status": db.sync_status, + "records_synced": db.records_synced, + "extra_data": db.extra_data } - - db = edge_db[0] - await session.delete(db) + return None + + async def delete_database(self, database_id: str) -> bool: + """Delete database""" + async with get_session() as session: + stmt = delete(EdgeDatabase).where(EdgeDatabase.database_id == database_id) + result = await session.execute(stmt) await session.commit() - - logger.info(f"Deleted edge database {database_id}") - - return { - "success": True, - "message": f"Database {database_id} deleted successfully", - "database_id": database_id, - } - - async def sync_edge_database(self, database_id: str) -> Dict: - """Sync edge database to main network""" + return result.rowcount > 0 + + async def sync_database(self, database_id: str) -> Dict: + """Sync database from source""" async with get_session() as session: - result = await session.execute( - select(EdgeDatabase).where(EdgeDatabase.database_id == database_id) - ) - edge_db = result.first() - - if not edge_db: + result = await session.execute(select(EdgeDatabase).where(EdgeDatabase.database_id == database_id)) + db = result.scalar_one_or_none() + + if not db: return { "success": False, - "message": f"Database {database_id} not found", + "message": f"Database {database_id} not found" } - - db = edge_db[0] - + # Update sync status db.sync_status = "syncing" db.updated_at = datetime.now(timezone.utc) await session.commit() - - try: - # Simulate sync operation - in reality would sync to blockchain/main network - # For now, we'll just update the sync metadata - db.sync_status = "idle" - db.last_sync_at = datetime.now(timezone.utc) - db.records_synced += 100 # Simulate syncing 100 records - db.updated_at = datetime.now(timezone.utc) - await session.commit() - - logger.info(f"Synced edge database {database_id}") - - return { - "success": True, - "message": f"Database {database_id} synced successfully", - "database_id": database_id, - "last_sync_at": db.last_sync_at.isoformat(), - "records_synced": db.records_synced, - } - except Exception as e: - db.sync_status = "error" - db.status = "error" - db.updated_at = datetime.now(timezone.utc) - await session.commit() - - logger.error(f"Failed to sync database {database_id}: {e}") - - return { - "success": False, - "message": f"Sync failed: {str(e)}", - "database_id": database_id, - } - - async def list_all_databases(self) -> list[Dict]: - """List all edge databases""" + + # Simulate sync process (in production, this would actually sync data) + db.sync_status = "idle" + db.last_sync_at = datetime.now(timezone.utc) + db.records_synced = db.records_synced + 100 # Simulated + db.updated_at = datetime.now(timezone.utc) + await session.commit() + + return { + "success": True, + "message": f"Database {database_id} synced", + "records_synced": db.records_synced + } + + async def list_databases(self, island_id: str = None) -> List[Dict]: + """List databases, optionally filtered by island_id""" async with get_session() as session: - result = await session.execute(select(EdgeDatabase)) - databases = result.all() - + if island_id: + result = await session.execute(select(EdgeDatabase).where(EdgeDatabase.island_id == island_id)) + else: + result = await session.execute(select(EdgeDatabase)) + databases = result.scalars().all() + return [ { - "database_id": db[0].database_id, - "island_id": db[0].island_id, - "capacity_gb": db[0].capacity_gb, - "used_gb": db[0].used_gb, - "status": db[0].status, - "sync_status": db[0].sync_status, - "records_synced": db[0].records_synced, - "created_at": db[0].created_at.isoformat(), + "id": db.id, + "database_id": db.database_id, + "island_id": db.island_id, + "capacity_gb": db.capacity_gb, + "used_gb": db.used_gb, + "status": db.status, + "sync_status": db.sync_status, + "records_synced": db.records_synced, + "created_at": db.created_at.isoformat() if db.created_at else None } for db in databases ]