diff --git a/apps/marketplace-service/src/marketplace_service/main.py b/apps/marketplace-service/src/marketplace_service/main.py index 33987dc7..587acc4d 100644 --- a/apps/marketplace-service/src/marketplace_service/main.py +++ b/apps/marketplace-service/src/marketplace_service/main.py @@ -6,8 +6,9 @@ Manages GPU marketplace operations from contextlib import asynccontextmanager from typing import AsyncIterator -from fastapi import FastAPI +from fastapi import FastAPI, Depends from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession from aitbc import ( configure_logging, @@ -18,6 +19,9 @@ from aitbc import ( ErrorHandlerMiddleware, ) +from .storage import init_db, get_session +from .services.marketplace_service import MarketplaceService + # Configure structured logging configure_logging(level="INFO") logger = get_logger(__name__) @@ -27,6 +31,8 @@ logger = get_logger(__name__) async def lifespan(app: FastAPI) -> AsyncIterator[None]: """Lifecycle events for the Marketplace Service.""" logger.info("Starting Marketplace Service") + # Initialize database + await init_db() yield logger.info("Shutting down Marketplace Service") @@ -67,6 +73,68 @@ async def marketplace_status() -> dict[str, str]: } +async def get_marketplace_service(session: AsyncSession = Depends(get_session)) -> MarketplaceService: + """Get marketplace service instance""" + return MarketplaceService(session) + + +@app.get("/v1/marketplace/offers") +async def get_offers( + status: str | None = None, + region: str | None = None, + gpu_model: str | None = None, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Get marketplace offers""" + return svc.list_offers(status=status, region=region, gpu_model=gpu_model) + + +@app.get("/v1/marketplace/offers/{offer_id}") +async def get_offer( + offer_id: str, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Get a specific marketplace offer""" + return svc.get_offer(offer_id) + + +@app.post("/v1/marketplace/offers") +async def create_offer( + offer_data: dict, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Create a new marketplace offer""" + return svc.create_offer(offer_data) + + +@app.get("/v1/marketplace/bids") +async def get_bids( + status: str | None = None, + provider: str | None = None, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Get marketplace bids""" + return svc.list_bids(status=status, provider=provider) + + +@app.post("/v1/marketplace/bids") +async def create_bid( + bid_data: dict, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Create a new marketplace bid""" + return svc.create_bid(bid_data) + + +@app.get("/v1/marketplace/analytics") +async def get_analytics( + period_type: str = "daily", + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Get marketplace analytics""" + return await svc.get_analytics(period_type=period_type) + + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8102) diff --git a/apps/marketplace-service/src/marketplace_service/services/__init__.py b/apps/marketplace-service/src/marketplace_service/services/__init__.py new file mode 100644 index 00000000..5dfb55c6 --- /dev/null +++ b/apps/marketplace-service/src/marketplace_service/services/__init__.py @@ -0,0 +1,7 @@ +""" +Marketplace Service services +""" + +from .marketplace_service import MarketplaceService + +__all__ = ["MarketplaceService"] diff --git a/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py b/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py new file mode 100644 index 00000000..57cfc2ee --- /dev/null +++ b/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py @@ -0,0 +1,76 @@ +""" +Marketplace service for managing marketplace operations +""" + +from typing import Any + +from sqlmodel import Session, select + +from ..domain.marketplace import MarketplaceOffer, MarketplaceBid + + +class MarketplaceService: + def __init__(self, session: Session): + self.session = session + + def list_offers( + self, + status: str | None = None, + region: str | None = None, + gpu_model: str | None = None, + ) -> list[MarketplaceOffer]: + """List marketplace offers""" + stmt = select(MarketplaceOffer) + if status: + stmt = stmt.where(MarketplaceOffer.status == status) + if region: + stmt = stmt.where(MarketplaceOffer.region == region) + if gpu_model: + stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model) + return list(self.session.execute(stmt).all()) + + def get_offer(self, offer_id: str) -> MarketplaceOffer | None: + """Get a specific marketplace offer""" + stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id) + result = self.session.execute(stmt).first() + return result[0] if result else None + + def create_offer(self, offer_data: dict) -> MarketplaceOffer: + """Create a new marketplace offer""" + offer = MarketplaceOffer(**offer_data) + self.session.add(offer) + self.session.commit() + self.session.refresh(offer) + return offer + + def list_bids( + self, + status: str | None = None, + provider: str | None = None, + ) -> list[MarketplaceBid]: + """List marketplace bids""" + stmt = select(MarketplaceBid) + if status: + stmt = stmt.where(MarketplaceBid.status == status) + if provider: + stmt = stmt.where(MarketplaceBid.provider == provider) + return list(self.session.execute(stmt).all()) + + def create_bid(self, bid_data: dict) -> MarketplaceBid: + """Create a new marketplace bid""" + bid = MarketplaceBid(**bid_data) + self.session.add(bid) + self.session.commit() + self.session.refresh(bid) + return bid + + async def get_analytics(self, period_type: str = "daily") -> dict[str, Any]: + """Get marketplace analytics""" + # Placeholder for analytics logic + return { + "period_type": period_type, + "total_offers": 0, + "total_transactions": 0, + "total_volume": 0.0, + "average_price": 0.0, + } diff --git a/apps/marketplace-service/src/marketplace_service/storage.py b/apps/marketplace-service/src/marketplace_service/storage.py new file mode 100644 index 00000000..36bf3cc3 --- /dev/null +++ b/apps/marketplace-service/src/marketplace_service/storage.py @@ -0,0 +1,42 @@ +""" +Database session management for Marketplace service +""" + +from contextlib import asynccontextmanager +from typing import AsyncIterator + +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlmodel import SQLModel + +from aitbc import get_logger + +logger = get_logger(__name__) + +# Database URL from environment variable or default +DATABASE_URL = "postgresql+asyncpg://aitbc_marketplace:password@localhost:5432/aitbc_marketplace" + +# Create async engine +engine = create_async_engine(DATABASE_URL, echo=False) + + +async def init_db() -> None: + """Initialize database tables""" + from .domain.marketplace import MarketplaceOffer, MarketplaceBid + from .domain.global_marketplace import ( + MarketplaceRegion, + GlobalMarketplaceConfig, + GlobalMarketplaceOffer, + GlobalMarketplaceTransaction, + ) + + async with engine.begin() as conn: + await conn.run_sync(SQLModel.metadata.create_all) + + logger.info("Marketplace service database initialized") + + +@asynccontextmanager +async def get_session() -> AsyncIterator[AsyncSession]: + """Get database session""" + async with AsyncSession(engine) as session: + yield session