Extract marketplace services to marketplace-service

- Created MarketplaceService with basic CRUD operations
- Created storage.py for database session management
- Updated main.py to include database initialization and marketplace endpoints:
  - GET /v1/marketplace/offers
  - GET /v1/marketplace/offers/{offer_id}
  - POST /v1/marketplace/offers
  - GET /v1/marketplace/bids
  - POST /v1/marketplace/bids
  - GET /v1/marketplace/analytics

This completes Phase 4.4c: Extract marketplace services
This commit is contained in:
aitbc
2026-04-30 11:29:27 +02:00
parent b7e5289e15
commit a5fb454100
4 changed files with 194 additions and 1 deletions

View File

@@ -6,8 +6,9 @@ Manages GPU marketplace operations
from contextlib import asynccontextmanager
from typing import AsyncIterator
from fastapi import FastAPI
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from aitbc import (
configure_logging,
@@ -18,6 +19,9 @@ from aitbc import (
ErrorHandlerMiddleware,
)
from .storage import init_db, get_session
from .services.marketplace_service import MarketplaceService
# Configure structured logging
configure_logging(level="INFO")
logger = get_logger(__name__)
@@ -27,6 +31,8 @@ logger = get_logger(__name__)
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""Lifecycle events for the Marketplace Service."""
logger.info("Starting Marketplace Service")
# Initialize database
await init_db()
yield
logger.info("Shutting down Marketplace Service")
@@ -67,6 +73,68 @@ async def marketplace_status() -> dict[str, str]:
}
async def get_marketplace_service(session: AsyncSession = Depends(get_session)) -> MarketplaceService:
"""Get marketplace service instance"""
return MarketplaceService(session)
@app.get("/v1/marketplace/offers")
async def get_offers(
status: str | None = None,
region: str | None = None,
gpu_model: str | None = None,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get marketplace offers"""
return svc.list_offers(status=status, region=region, gpu_model=gpu_model)
@app.get("/v1/marketplace/offers/{offer_id}")
async def get_offer(
offer_id: str,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get a specific marketplace offer"""
return svc.get_offer(offer_id)
@app.post("/v1/marketplace/offers")
async def create_offer(
offer_data: dict,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Create a new marketplace offer"""
return svc.create_offer(offer_data)
@app.get("/v1/marketplace/bids")
async def get_bids(
status: str | None = None,
provider: str | None = None,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get marketplace bids"""
return svc.list_bids(status=status, provider=provider)
@app.post("/v1/marketplace/bids")
async def create_bid(
bid_data: dict,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Create a new marketplace bid"""
return svc.create_bid(bid_data)
@app.get("/v1/marketplace/analytics")
async def get_analytics(
period_type: str = "daily",
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get marketplace analytics"""
return await svc.get_analytics(period_type=period_type)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8102)

View File

@@ -0,0 +1,7 @@
"""
Marketplace Service services
"""
from .marketplace_service import MarketplaceService
__all__ = ["MarketplaceService"]

View File

@@ -0,0 +1,76 @@
"""
Marketplace service for managing marketplace operations
"""
from typing import Any
from sqlmodel import Session, select
from ..domain.marketplace import MarketplaceOffer, MarketplaceBid
class MarketplaceService:
def __init__(self, session: Session):
self.session = session
def list_offers(
self,
status: str | None = None,
region: str | None = None,
gpu_model: str | None = None,
) -> list[MarketplaceOffer]:
"""List marketplace offers"""
stmt = select(MarketplaceOffer)
if status:
stmt = stmt.where(MarketplaceOffer.status == status)
if region:
stmt = stmt.where(MarketplaceOffer.region == region)
if gpu_model:
stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model)
return list(self.session.execute(stmt).all())
def get_offer(self, offer_id: str) -> MarketplaceOffer | None:
"""Get a specific marketplace offer"""
stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id)
result = self.session.execute(stmt).first()
return result[0] if result else None
def create_offer(self, offer_data: dict) -> MarketplaceOffer:
"""Create a new marketplace offer"""
offer = MarketplaceOffer(**offer_data)
self.session.add(offer)
self.session.commit()
self.session.refresh(offer)
return offer
def list_bids(
self,
status: str | None = None,
provider: str | None = None,
) -> list[MarketplaceBid]:
"""List marketplace bids"""
stmt = select(MarketplaceBid)
if status:
stmt = stmt.where(MarketplaceBid.status == status)
if provider:
stmt = stmt.where(MarketplaceBid.provider == provider)
return list(self.session.execute(stmt).all())
def create_bid(self, bid_data: dict) -> MarketplaceBid:
"""Create a new marketplace bid"""
bid = MarketplaceBid(**bid_data)
self.session.add(bid)
self.session.commit()
self.session.refresh(bid)
return bid
async def get_analytics(self, period_type: str = "daily") -> dict[str, Any]:
"""Get marketplace analytics"""
# Placeholder for analytics logic
return {
"period_type": period_type,
"total_offers": 0,
"total_transactions": 0,
"total_volume": 0.0,
"average_price": 0.0,
}

View File

@@ -0,0 +1,42 @@
"""
Database session management for Marketplace service
"""
from contextlib import asynccontextmanager
from typing import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlmodel import SQLModel
from aitbc import get_logger
logger = get_logger(__name__)
# Database URL from environment variable or default
DATABASE_URL = "postgresql+asyncpg://aitbc_marketplace:password@localhost:5432/aitbc_marketplace"
# Create async engine
engine = create_async_engine(DATABASE_URL, echo=False)
async def init_db() -> None:
"""Initialize database tables"""
from .domain.marketplace import MarketplaceOffer, MarketplaceBid
from .domain.global_marketplace import (
MarketplaceRegion,
GlobalMarketplaceConfig,
GlobalMarketplaceOffer,
GlobalMarketplaceTransaction,
)
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
logger.info("Marketplace service database initialized")
@asynccontextmanager
async def get_session() -> AsyncIterator[AsyncSession]:
"""Get database session"""
async with AsyncSession(engine) as session:
yield session