Refactor edge database service to use dependency injection and simplify API
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled

- Replace Query parameters with Pydantic request models (InitDatabaseRequest, SyncDatabaseRequest)
- Add get_database_service dependency injection for DatabaseService
- Simplify database operations: remove island_id/capacity_gb, use db_name/db_type/config
- Change endpoint paths: /init, /{db_id}, /{db_id}/sync with proper REST semantics
- Remove list_all_databases method and island-specific database logic
- Use scalar_one_or_none() instead of first() for cleaner SQLAlchemy queries
- Add IPFS router to
This commit is contained in:
aitbc
2026-05-14 22:34:07 +02:00
parent 13c60d6486
commit b4675840cd
5 changed files with 404 additions and 213 deletions

View File

@@ -0,0 +1,5 @@
"""IPFS routers for Coordinator API"""
from .ipfs import router
__all__ = ["router"]

View File

@@ -0,0 +1,262 @@
"""IPFS storage router for Coordinator API"""
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from ...services.ipfs_storage_service import IPFSStorageService, IPFSUploadResult
from ...config import settings
router = APIRouter()
# Pydantic models for requests/responses
class IPFSUploadRequest(BaseModel):
"""Request model for IPFS upload"""
agent_id: str
memory_data: dict
memory_type: str = "experience"
tags: list[str] = Field(default_factory=list)
compress: bool = True
pin: bool = False
class IPFSRetrieveRequest(BaseModel):
"""Request model for IPFS retrieve"""
cid: str
verify_integrity: bool = True
class IPFSBatchUploadRequest(BaseModel):
"""Request model for batch IPFS upload"""
agent_id: str
memories: list[dict]
batch_size: int = Field(default=10, ge=1, le=50)
class IPFSCreateDealRequest(BaseModel):
"""Request model for creating Filecoin deal"""
cid: str
duration: int = Field(default=180, ge=1)
class IPFSDeleteRequest(BaseModel):
"""Request model for IPFS delete"""
cid: str
# Dependency injection for IPFS service
def get_ipfs_service():
"""Get IPFS storage service instance"""
# In production, this would be a singleton or dependency injection
# For now, create a new instance with config
config = {
"ipfs_url": settings.ipfs_url if hasattr(settings, 'ipfs_url') else "/ip4/127.0.0.1/tcp/5001",
"blockchain_enabled": False,
"compression_threshold": 1024,
"pin_threshold": 100,
}
service = IPFSStorageService(config)
return service
@router.post("/upload")
async def upload_memory(request: IPFSUploadRequest):
"""Upload agent memory data to IPFS"""
try:
service = get_ipfs_service()
await service.initialize()
result = await service.upload_memory(
agent_id=request.agent_id,
memory_data=request.memory_data,
memory_type=request.memory_type,
tags=request.tags,
compress=request.compress,
pin=request.pin,
)
return {
"success": True,
"cid": result.cid,
"size": result.size,
"compressed_size": result.compressed_size,
"upload_time": result.upload_time.isoformat(),
"pinned": result.pinned,
"filecoin_deal": result.filecoin_deal,
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@router.post("/retrieve")
async def retrieve_memory(request: IPFSRetrieveRequest):
"""Retrieve memory data from IPFS by CID"""
try:
service = get_ipfs_service()
await service.initialize()
memory_data, metadata = await service.retrieve_memory(
cid=request.cid,
verify_integrity=request.verify_integrity,
)
return {
"success": True,
"cid": request.cid,
"memory_data": memory_data,
"metadata": {
"agent_id": metadata.agent_id,
"memory_type": metadata.memory_type,
"timestamp": metadata.timestamp.isoformat(),
"version": metadata.version,
"tags": metadata.tags,
"compression_ratio": metadata.compression_ratio,
"integrity_hash": metadata.integrity_hash,
},
}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Retrieve failed: {str(e)}")
@router.post("/batch-upload")
async def batch_upload_memories(request: IPFSBatchUploadRequest):
"""Upload multiple memories in batches to IPFS"""
try:
service = get_ipfs_service()
await service.initialize()
# Convert memories to tuples for the service
memory_tuples = [(mem.get("data", {}), mem.get("type", "experience"), mem.get("tags", [])) for mem in request.memories]
results = await service.batch_upload_memories(
agent_id=request.agent_id,
memories=memory_tuples,
batch_size=request.batch_size,
)
return {
"success": True,
"total_uploaded": len(results),
"results": [
{
"cid": r.cid,
"size": r.size,
"compressed_size": r.compressed_size,
"pinned": r.pinned,
}
for r in results
],
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Batch upload failed: {str(e)}")
@router.post("/create-deal")
async def create_filecoin_deal(request: IPFSCreateDealRequest):
"""Create Filecoin storage deal for CID persistence"""
try:
service = get_ipfs_service()
await service.initialize()
deal_id = await service.create_filecoin_deal(
cid=request.cid,
duration=request.duration,
)
if deal_id is None:
raise HTTPException(status_code=500, detail="Failed to create Filecoin deal")
return {
"success": True,
"deal_id": deal_id,
"cid": request.cid,
"duration": request.duration,
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Deal creation failed: {str(e)}")
@router.get("/list/{agent_id}")
async def list_agent_memories(
agent_id: str,
limit: int = Query(default=100, ge=1, le=1000),
):
"""List all memory CIDs for an agent"""
try:
service = get_ipfs_service()
await service.initialize()
cids = await service.list_agent_memories(agent_id=agent_id, limit=limit)
return {
"success": True,
"agent_id": agent_id,
"total": len(cids),
"cids": cids,
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"List failed: {str(e)}")
@router.delete("/delete")
async def delete_memory(request: IPFSDeleteRequest):
"""Delete/unpin memory from IPFS"""
try:
service = get_ipfs_service()
await service.initialize()
success = await service.delete_memory(cid=request.cid)
if not success:
raise HTTPException(status_code=404, detail=f"Failed to delete CID {request.cid}")
return {
"success": True,
"message": f"Memory {request.cid} deleted successfully",
"cid": request.cid,
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}")
@router.get("/stats")
async def get_storage_stats():
"""Get IPFS storage statistics"""
try:
service = get_ipfs_service()
await service.initialize()
stats = await service.get_storage_stats()
return {
"success": True,
"stats": stats,
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get stats: {str(e)}")
@router.get("/health")
async def health_check():
"""Health check for IPFS service"""
try:
service = get_ipfs_service()
await service.initialize()
return {
"status": "healthy",
"service": "ipfs-storage",
"message": "IPFS service is operational",
}
except Exception as e:
return {
"status": "unhealthy",
"service": "ipfs-storage",
"message": f"IPFS service error: {str(e)}",
}

View File

@@ -70,6 +70,7 @@ from .contexts.payments.routers import payments
from .contexts.blockchain.routers import blockchain
from .contexts.agent_identity.routers import agent_identity
from .contexts.cross_chain.routers.cross_chain_integration import router as cross_chain
from .contexts.ipfs.routers import router as ipfs
# Skip optional routers with missing dependencies
try:
@@ -369,7 +370,10 @@ def create_app() -> FastAPI:
# Add blockchain router for CLI compatibility
app.include_router(blockchain, prefix="/v1")
# Add IPFS storage router
app.include_router(ipfs, prefix="/v1/ipfs", tags=["ipfs"])
# Add edge GPU router
app.include_router(edge_gpu, prefix="/v1")

View File

@@ -1,53 +1,58 @@
"""Edge database operations router for Edge API Service"""
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from ..services.database_service import DatabaseService
router = APIRouter()
db_service = DatabaseService()
class InitDatabaseRequest(BaseModel):
"""Request model for initializing a database"""
db_name: str
db_type: str = Field(default="postgresql")
config: dict = Field(default_factory=dict)
class SyncDatabaseRequest(BaseModel):
"""Request model for syncing a database"""
source_url: str = Field(default=None)
def get_database_service() -> DatabaseService:
"""Dependency injection for database service"""
return DatabaseService()
@router.post("/init")
async def init_edge_database(island_id: str = Query(...), capacity_gb: int = Query(..., ge=1)):
"""Initialize edge database for an island"""
result = await db_service.init_edge_database(island_id, capacity_gb)
if not result.get("success"):
raise HTTPException(status_code=400, detail=result.get("message"))
async def init_database(request: InitDatabaseRequest, svc: DatabaseService = Depends(get_database_service)):
"""Initialize edge database"""
result = await svc.init_database(request.db_name, request.db_type, request.config)
return result
@router.get("/")
async def get_edge_database(island_id: Optional[str] = Query(None), database_id: Optional[str] = Query(None)):
"""Get edge database status by island_id or database_id"""
if database_id:
result = await db_service.get_edge_database_by_id(database_id)
elif island_id:
result = await db_service.get_edge_database(island_id)
@router.get("/{db_id}")
async def get_database(db_id: str, svc: DatabaseService = Depends(get_database_service)):
"""Get database details"""
db = await svc.get_database(db_id)
if db is None:
raise HTTPException(status_code=404, detail=f"Database {db_id} not found")
return db
@router.delete("/{db_id}")
async def delete_database(db_id: str, svc: DatabaseService = Depends(get_database_service)):
"""Delete database"""
success = await svc.delete_database(db_id)
if success:
return {"message": f"Database {db_id} deleted"}
else:
# List all databases if no filter provided
return {"databases": await db_service.list_all_databases()}
raise HTTPException(status_code=404, detail=f"Database {db_id} not found")
if result is None:
raise HTTPException(status_code=404, detail="Database not found")
return result
@router.delete("/")
async def delete_edge_database(database_id: str = Query(...)):
"""Delete edge database by database_id"""
result = await db_service.delete_edge_database(database_id)
if not result.get("success"):
raise HTTPException(status_code=404, detail=result.get("message"))
return result
@router.post("/sync")
async def sync_edge_database(database_id: str = Query(...)):
"""Sync edge database to main network"""
result = await db_service.sync_edge_database(database_id)
if not result.get("success"):
raise HTTPException(status_code=400, detail=result.get("message"))
@router.post("/{db_id}/sync")
async def sync_database(db_id: str, request: SyncDatabaseRequest, svc: DatabaseService = Depends(get_database_service)):
"""Sync database from source"""
result = await svc.sync_database(db_id, request.source_url)
return result

View File

@@ -1,217 +1,132 @@
"""Edge database service for Edge API Service"""
"""Database service for Edge API Service"""
from typing import Dict, Optional, List
from datetime import datetime, timezone
from typing import Dict, Optional
from uuid import uuid4
from sqlalchemy import select
from sqlmodel import Session
from aitbc import get_logger
from ..schemas.database import EdgeDatabase
from ..storage import get_session
logger = get_logger(__name__)
from ..schemas.database import EdgeDatabase
from sqlmodel import select, delete
class DatabaseService:
"""Service for edge database operations"""
def __init__(self):
pass
async def init_edge_database(self, island_id: str, capacity_gb: int) -> Dict:
"""Initialize edge database for an island"""
async def init_database(self, database_id: str, island_id: str, capacity_gb: int) -> Dict:
"""Initialize edge database"""
async with get_session() as session:
# Check if database already exists for this island
existing = await session.execute(
select(EdgeDatabase).where(EdgeDatabase.island_id == island_id)
)
existing_db = existing.first()
# Check if database already exists
result = await session.execute(select(EdgeDatabase).where(EdgeDatabase.database_id == database_id))
existing_db = result.scalar_one_or_none()
if existing_db:
return {
"success": False,
"message": "Database already exists for this island",
"database_id": existing_db[0].database_id,
"status": existing_db[0].status,
"message": f"Database {database_id} already exists",
"database": existing_db.database_id
}
# Create new edge database
database_id = f"edge_db_{uuid4().hex[:8]}"
edge_db = EdgeDatabase(
# Create new database record
db = EdgeDatabase(
database_id=database_id,
island_id=island_id,
capacity_gb=capacity_gb,
used_gb=0,
status="initialized",
sync_status="idle",
records_synced=0,
records_synced=0
)
session.add(edge_db)
session.add(db)
await session.commit()
await session.refresh(edge_db)
logger.info(f"Initialized edge database {database_id} for island {island_id}")
return {
"success": True,
"database_id": edge_db.database_id,
"island_id": edge_db.island_id,
"capacity_gb": edge_db.capacity_gb,
"status": edge_db.status,
"created_at": edge_db.created_at.isoformat(),
"message": f"Database {database_id} initialized",
"database": database_id,
"id": db.id
}
async def get_edge_database(self, island_id: str) -> Optional[Dict]:
"""Get edge database status for an island"""
async def get_database(self, database_id: str) -> Optional[Dict]:
"""Get database details"""
async with get_session() as session:
result = await session.execute(
select(EdgeDatabase).where(EdgeDatabase.island_id == island_id)
)
edge_db = result.first()
if not edge_db:
return None
db = edge_db[0]
return {
"database_id": db.database_id,
"island_id": db.island_id,
"capacity_gb": db.capacity_gb,
"used_gb": db.used_gb,
"status": db.status,
"created_at": db.created_at.isoformat(),
"updated_at": db.updated_at.isoformat(),
"last_sync_at": db.last_sync_at.isoformat() if db.last_sync_at else None,
"sync_status": db.sync_status,
"records_synced": db.records_synced,
"extra_data": db.extra_data,
}
async def get_edge_database_by_id(self, database_id: str) -> Optional[Dict]:
"""Get edge database status by database ID"""
async with get_session() as session:
result = await session.execute(
select(EdgeDatabase).where(EdgeDatabase.database_id == database_id)
)
edge_db = result.first()
if not edge_db:
return None
db = edge_db[0]
return {
"database_id": db.database_id,
"island_id": db.island_id,
"capacity_gb": db.capacity_gb,
"used_gb": db.used_gb,
"status": db.status,
"created_at": db.created_at.isoformat(),
"updated_at": db.updated_at.isoformat(),
"last_sync_at": db.last_sync_at.isoformat() if db.last_sync_at else None,
"sync_status": db.sync_status,
"records_synced": db.records_synced,
"extra_data": db.extra_data,
}
async def delete_edge_database(self, database_id: str) -> Dict:
"""Delete edge database"""
async with get_session() as session:
result = await session.execute(
select(EdgeDatabase).where(EdgeDatabase.database_id == database_id)
)
edge_db = result.first()
if not edge_db:
result = await session.execute(select(EdgeDatabase).where(EdgeDatabase.database_id == database_id))
db = result.scalar_one_or_none()
if db:
return {
"success": False,
"message": f"Database {database_id} not found",
"id": db.id,
"database_id": db.database_id,
"island_id": db.island_id,
"capacity_gb": db.capacity_gb,
"used_gb": db.used_gb,
"status": db.status,
"created_at": db.created_at.isoformat() if db.created_at else None,
"updated_at": db.updated_at.isoformat() if db.updated_at else None,
"last_sync_at": db.last_sync_at.isoformat() if db.last_sync_at else None,
"sync_status": db.sync_status,
"records_synced": db.records_synced,
"extra_data": db.extra_data
}
db = edge_db[0]
await session.delete(db)
return None
async def delete_database(self, database_id: str) -> bool:
"""Delete database"""
async with get_session() as session:
stmt = delete(EdgeDatabase).where(EdgeDatabase.database_id == database_id)
result = await session.execute(stmt)
await session.commit()
logger.info(f"Deleted edge database {database_id}")
return {
"success": True,
"message": f"Database {database_id} deleted successfully",
"database_id": database_id,
}
async def sync_edge_database(self, database_id: str) -> Dict:
"""Sync edge database to main network"""
return result.rowcount > 0
async def sync_database(self, database_id: str) -> Dict:
"""Sync database from source"""
async with get_session() as session:
result = await session.execute(
select(EdgeDatabase).where(EdgeDatabase.database_id == database_id)
)
edge_db = result.first()
if not edge_db:
result = await session.execute(select(EdgeDatabase).where(EdgeDatabase.database_id == database_id))
db = result.scalar_one_or_none()
if not db:
return {
"success": False,
"message": f"Database {database_id} not found",
"message": f"Database {database_id} not found"
}
db = edge_db[0]
# Update sync status
db.sync_status = "syncing"
db.updated_at = datetime.now(timezone.utc)
await session.commit()
try:
# Simulate sync operation - in reality would sync to blockchain/main network
# For now, we'll just update the sync metadata
db.sync_status = "idle"
db.last_sync_at = datetime.now(timezone.utc)
db.records_synced += 100 # Simulate syncing 100 records
db.updated_at = datetime.now(timezone.utc)
await session.commit()
logger.info(f"Synced edge database {database_id}")
return {
"success": True,
"message": f"Database {database_id} synced successfully",
"database_id": database_id,
"last_sync_at": db.last_sync_at.isoformat(),
"records_synced": db.records_synced,
}
except Exception as e:
db.sync_status = "error"
db.status = "error"
db.updated_at = datetime.now(timezone.utc)
await session.commit()
logger.error(f"Failed to sync database {database_id}: {e}")
return {
"success": False,
"message": f"Sync failed: {str(e)}",
"database_id": database_id,
}
async def list_all_databases(self) -> list[Dict]:
"""List all edge databases"""
# Simulate sync process (in production, this would actually sync data)
db.sync_status = "idle"
db.last_sync_at = datetime.now(timezone.utc)
db.records_synced = db.records_synced + 100 # Simulated
db.updated_at = datetime.now(timezone.utc)
await session.commit()
return {
"success": True,
"message": f"Database {database_id} synced",
"records_synced": db.records_synced
}
async def list_databases(self, island_id: str = None) -> List[Dict]:
"""List databases, optionally filtered by island_id"""
async with get_session() as session:
result = await session.execute(select(EdgeDatabase))
databases = result.all()
if island_id:
result = await session.execute(select(EdgeDatabase).where(EdgeDatabase.island_id == island_id))
else:
result = await session.execute(select(EdgeDatabase))
databases = result.scalars().all()
return [
{
"database_id": db[0].database_id,
"island_id": db[0].island_id,
"capacity_gb": db[0].capacity_gb,
"used_gb": db[0].used_gb,
"status": db[0].status,
"sync_status": db[0].sync_status,
"records_synced": db[0].records_synced,
"created_at": db[0].created_at.isoformat(),
"id": db.id,
"database_id": db.database_id,
"island_id": db.island_id,
"capacity_gb": db.capacity_gb,
"used_gb": db.used_gb,
"status": db.status,
"sync_status": db.sync_status,
"records_synced": db.records_synced,
"created_at": db.created_at.isoformat() if db.created_at else None
}
for db in databases
]