diff --git a/apps/marketplace-service-debug/src/marketplace_service/__init__.py b/apps/marketplace-service-debug/src/marketplace_service/__init__.py new file mode 100644 index 00000000..aec04833 --- /dev/null +++ b/apps/marketplace-service-debug/src/marketplace_service/__init__.py @@ -0,0 +1,6 @@ +""" +AITBC Marketplace Service +Manages GPU marketplace operations +""" + +__version__ = "0.1.0" diff --git a/apps/marketplace-service-debug/src/marketplace_service/domain/__init__.py b/apps/marketplace-service-debug/src/marketplace_service/domain/__init__.py new file mode 100644 index 00000000..baafa9a1 --- /dev/null +++ b/apps/marketplace-service-debug/src/marketplace_service/domain/__init__.py @@ -0,0 +1,24 @@ +""" +Marketplace Service domain models +""" + +from .marketplace import MarketplaceOffer, MarketplaceBid +from .global_marketplace import ( + MarketplaceStatus, + RegionStatus, + MarketplaceRegion, + GlobalMarketplaceConfig, + GlobalMarketplaceOffer, + GlobalMarketplaceTransaction, +) + +__all__ = [ + "MarketplaceOffer", + "MarketplaceBid", + "MarketplaceStatus", + "RegionStatus", + "MarketplaceRegion", + "GlobalMarketplaceConfig", + "GlobalMarketplaceOffer", + "GlobalMarketplaceTransaction", +] diff --git a/apps/marketplace-service-debug/src/marketplace_service/domain/global_marketplace.py b/apps/marketplace-service-debug/src/marketplace_service/domain/global_marketplace.py new file mode 100644 index 00000000..88e80745 --- /dev/null +++ b/apps/marketplace-service-debug/src/marketplace_service/domain/global_marketplace.py @@ -0,0 +1,170 @@ +""" +Global Marketplace Domain Models +Domain models for global marketplace operations, multi-region support, and cross-chain integration +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from enum import StrEnum +from typing import Any +from uuid import uuid4 + +from sqlmodel import JSON, Column, Field, SQLModel + + +class MarketplaceStatus(StrEnum): + """Global marketplace offer status""" + + ACTIVE = "active" + INACTIVE = "inactive" + PENDING = "pending" + COMPLETED = "completed" + CANCELLED = "cancelled" + EXPIRED = "expired" + + +class RegionStatus(StrEnum): + """Global marketplace region status""" + + ACTIVE = "active" + INACTIVE = "inactive" + MAINTENANCE = "maintenance" + DEPRECATED = "deprecated" + + +class MarketplaceRegion(SQLModel, table=True): + """Global marketplace region configuration""" + + __tablename__ = "marketplace_regions" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"region_{uuid4().hex[:8]}", primary_key=True) + region_code: str = Field(index=True, unique=True) + region_name: str = Field(index=True) + geographic_area: str = Field(default="global") + + base_currency: str = Field(default="USD") + timezone: str = Field(default="UTC") + language: str = Field(default="en") + + load_factor: float = Field(default=1.0, ge=0.1, le=10.0) + max_concurrent_requests: int = Field(default=1000) + priority_weight: float = Field(default=1.0, ge=0.1, le=10.0) + + status: RegionStatus = Field(default=RegionStatus.ACTIVE) + health_score: float = Field(default=1.0, ge=0.0, le=1.0) + last_health_check: datetime | None = Field(default=None) + + api_endpoint: str = Field(default="") + websocket_endpoint: str = Field(default="") + blockchain_rpc_endpoints: dict[str, str] = Field(default_factory=dict, sa_column=Column(JSON)) + + average_response_time: float = Field(default=0.0) + request_rate: float = Field(default=0.0) + error_rate: float = Field(default=0.0) + + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class GlobalMarketplaceConfig(SQLModel, table=True): + """Global marketplace configuration settings""" + + __tablename__ = "global_marketplace_configs" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"config_{uuid4().hex[:8]}", primary_key=True) + config_key: str = Field(index=True, unique=True) + config_value: str = Field(default="") + config_type: str = Field(default="string") + + description: str = Field(default="") + category: str = Field(default="general") + is_public: bool = Field(default=False) + is_encrypted: bool = Field(default=False) + + min_value: float | None = Field(default=None) + max_value: float | None = Field(default=None) + allowed_values: list[str] = Field(default_factory=list, sa_column=Column(JSON)) + + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + last_modified_by: str | None = Field(default=None) + + +class GlobalMarketplaceOffer(SQLModel, table=True): + """Global marketplace offer with multi-region support""" + + __tablename__ = "global_marketplace_offers" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"offer_{uuid4().hex[:8]}", primary_key=True) + original_offer_id: str = Field(index=True) + + agent_id: str = Field(index=True) + service_type: str = Field(index=True) + resource_specification: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON)) + + base_price: float = Field(default=0.0) + currency: str = Field(default="USD") + price_per_region: dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON)) + dynamic_pricing_enabled: bool = Field(default=False) + + total_capacity: int = Field(default=0) + available_capacity: int = Field(default=0) + regions_available: list[str] = Field(default_factory=list, sa_column=Column(JSON)) + + global_status: MarketplaceStatus = Field(default=MarketplaceStatus.ACTIVE) + region_statuses: dict[str, MarketplaceStatus] = Field(default_factory=dict, sa_column=Column(JSON)) + + global_rating: float = Field(default=0.0, ge=0.0, le=5.0) + total_transactions: int = Field(default=0) + success_rate: float = Field(default=0.0, ge=0.0, le=1.0) + + supported_chains: list[int] = Field(default_factory=list, sa_column=Column(JSON)) + cross_chain_pricing: dict[int, float] = Field(default_factory=dict, sa_column=Column(JSON)) + + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + expires_at: datetime | None = Field(default=None) + + +class GlobalMarketplaceTransaction(SQLModel, table=True): + """Global marketplace transaction with cross-chain support""" + + __tablename__ = "global_marketplace_transactions" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: f"tx_{uuid4().hex[:8]}", primary_key=True) + transaction_hash: str | None = Field(index=True) + + buyer_id: str = Field(index=True) + seller_id: str = Field(index=True) + offer_id: str = Field(index=True) + + service_type: str = Field(index=True) + quantity: int = Field(default=1) + unit_price: float = Field(default=0.0) + total_amount: float = Field(default=0.0) + currency: str = Field(default="USD") + + source_chain: int | None = Field(default=None) + target_chain: int | None = Field(default=None) + bridge_transaction_id: str | None = Field(default=None) + cross_chain_fee: float = Field(default=0.0) + + source_region: str = Field(default="global") + target_region: str = Field(default="global") + regional_fees: dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON)) + + status: str = Field(default="pending") + payment_status: str = Field(default="pending") + delivery_status: str = Field(default="pending") + + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + confirmed_at: datetime | None = Field(default=None) + completed_at: datetime | None = Field(default=None) + + transaction_data: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON)) diff --git a/apps/marketplace-service-debug/src/marketplace_service/domain/marketplace.py b/apps/marketplace-service-debug/src/marketplace_service/domain/marketplace.py new file mode 100644 index 00000000..50e60d4b --- /dev/null +++ b/apps/marketplace-service-debug/src/marketplace_service/domain/marketplace.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from uuid import uuid4 + +from sqlalchemy import JSON, Column +from sqlmodel import Field, SQLModel + + +class MarketplaceOffer(SQLModel, table=True): + __tablename__ = "marketplaceoffer" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True) + provider: str = Field(index=True) + capacity: int = Field(default=0, nullable=False) + price: float = Field(default=0.0, nullable=False) + sla: str = Field(default="") + status: str = Field(default="open", max_length=20) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=False, index=True) + attributes: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=False)) + # GPU-specific fields + gpu_model: str | None = Field(default=None, index=True) + gpu_memory_gb: int | None = Field(default=None) + gpu_count: int | None = Field(default=1) + cuda_version: str | None = Field(default=None) + price_per_hour: float | None = Field(default=None) + region: str | None = Field(default=None, index=True) + + +class MarketplaceBid(SQLModel, table=True): + __tablename__ = "marketplacebid" + __table_args__ = {"extend_existing": True} + + id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True) + provider: str = Field(index=True) + capacity: int = Field(default=0, nullable=False) + price: float = Field(default=0.0, nullable=False) + notes: str | None = Field(default=None) + status: str = Field(default="pending", nullable=False) + submitted_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=False, index=True) diff --git a/apps/marketplace-service-debug/src/marketplace_service/main.py b/apps/marketplace-service-debug/src/marketplace_service/main.py new file mode 100644 index 00000000..6d67c279 --- /dev/null +++ b/apps/marketplace-service-debug/src/marketplace_service/main.py @@ -0,0 +1,299 @@ +""" +Marketplace Service main application +Manages GPU marketplace operations +""" + +from contextlib import asynccontextmanager +from typing import AsyncIterator + +from fastapi import FastAPI, Depends +from fastapi.responses import JSONResponse +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession + +from aitbc import ( + configure_logging, + get_logger, + RequestIDMiddleware, + PerformanceLoggingMiddleware, + RequestValidationMiddleware, + ErrorHandlerMiddleware, +) + +from .storage import init_db, get_session +from .services.marketplace_service import MarketplaceService + +# Configure structured logging +configure_logging(level="INFO") +logger = get_logger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncIterator[None]: + """Lifecycle events for the Marketplace Service.""" + logger.info("Starting Marketplace Service") + # Initialize database + await init_db() + yield + logger.info("Shutting down Marketplace Service") + + +app = FastAPI( + title="AITBC Marketplace Service", + description="Manages GPU marketplace operations", + version="0.1.0", + lifespan=lifespan, +) + +# Add middleware +app.add_middleware(RequestIDMiddleware) +app.add_middleware(PerformanceLoggingMiddleware) +app.add_middleware(RequestValidationMiddleware, max_request_size=10*1024*1024) +# app.add_middleware(ErrorHandlerMiddleware) # Temporarily disabled for debugging + + +# Use get_session() directly as dependency - FastAPI handles @asynccontextmanager +get_session_dep = get_session + + +class HealthResponse(BaseModel): + """Health check response""" + status: str + service: str + + +@app.get("/health") +async def health() -> HealthResponse: + """Health check endpoint""" + return HealthResponse(status="healthy", service="marketplace-service") + + +@app.get("/ready") +async def ready() -> dict[str, str]: + """Readiness check - verifies database connectivity""" + try: + async with get_session() as session: + # Test database connection + await session.execute("SELECT 1") + return {"status": "ready", "service": "marketplace-service"} + except Exception as e: + logger.error(f"Readiness check failed: {e}") + return JSONResponse( + status_code=503, + content={"status": "not_ready", "service": "marketplace-service", "error": str(e)}, + ) + + +@app.get("/live") +async def live() -> dict[str, str]: + """Liveness check - verifies service is not stuck""" + return {"status": "alive", "service": "marketplace-service"} + + +@app.get("/marketplace/status") +async def marketplace_status() -> dict[str, str]: + """Get marketplace status""" + return { + "status": "operational", + "service": "marketplace-service", + "message": "Marketplace service is running", + } + + +async def get_marketplace_service(session: AsyncSession = Depends(get_session)) -> MarketplaceService: + """Get marketplace service instance""" + return MarketplaceService(session) + + +@app.get("/v1/marketplace/offers") +async def get_offers( + status: str | None = None, + region: str | None = None, + gpu_model: str | None = None, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Get marketplace offers""" + try: + logger.info(f"GET /v1/marketplace/offers called with filters: status={status}, region={region}, gpu_model={gpu_model}") + result = await svc.list_offers(status=status, region=region, gpu_model=gpu_model) + logger.info(f"GET /v1/marketplace/offers returned {len(result)} offers") + return result + except Exception as e: + logger.error(f"Error in GET /v1/marketplace/offers: {type(e).__name__}: {str(e)}") + raise + + +@app.get("/v1/marketplace/offers/{offer_id}") +async def get_offer( + offer_id: str, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Get a specific marketplace offer""" + try: + logger.info(f"GET /v1/marketplace/offers/{offer_id} called") + result = await svc.get_offer(offer_id) + logger.info(f"GET /v1/marketplace/offers/{offer_id} returned: {result is not None}") + return result + except Exception as e: + logger.error(f"Error in GET /v1/marketplace/offers/{offer_id}: {type(e).__name__}: {str(e)}") + raise + + +@app.post("/v1/marketplace/offers") +async def create_offer( + offer_data: dict, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Create a new marketplace offer""" + try: + logger.info(f"POST /v1/marketplace/offers called with data keys: {offer_data.keys()}") + result = await svc.create_offer(offer_data) + logger.info(f"POST /v1/marketplace/offers created offer with id: {result.id}") + return result + except Exception as e: + logger.error(f"Error in POST /v1/marketplace/offers: {type(e).__name__}: {str(e)}") + raise + + +@app.get("/v1/marketplace/bids") +async def get_bids( + status: str | None = None, + provider: str | None = None, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Get marketplace bids""" + try: + logger.info(f"GET /v1/marketplace/bids called with filters: status={status}, provider={provider}") + result = await svc.list_bids(status=status, provider=provider) + logger.info(f"GET /v1/marketplace/bids returned {len(result)} bids") + return result + except Exception as e: + logger.error(f"Error in GET /v1/marketplace/bids: {type(e).__name__}: {str(e)}") + raise + + +@app.post("/v1/marketplace/bids") +async def create_bid( + bid_data: dict, + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Create a new marketplace bid""" + try: + logger.info(f"POST /v1/marketplace/bids called with data keys: {bid_data.keys()}") + result = await svc.create_bid(bid_data) + logger.info(f"POST /v1/marketplace/bids created bid with id: {result.id}") + return result + except Exception as e: + logger.error(f"Error in POST /v1/marketplace/bids: {type(e).__name__}: {str(e)}") + raise + + +@app.get("/v1/marketplace/analytics") +async def get_analytics( + period_type: str = "daily", + svc: MarketplaceService = Depends(get_marketplace_service), +): + """Get marketplace analytics""" + try: + logger.info(f"GET /v1/marketplace/analytics called with period_type={period_type}") + result = await svc.get_analytics(period_type=period_type) + logger.info(f"GET /v1/marketplace/analytics returned analytics data") + return result + except Exception as e: + logger.error(f"Error in GET /v1/marketplace/analytics: {type(e).__name__}: {str(e)}") + raise + + +@app.post("/v1/transactions") +async def submit_transaction(transaction_data: dict, session: AsyncSession = Depends(get_session_dep)): + """Submit marketplace transaction""" + from .domain.marketplace import MarketplaceOffer, MarketplaceBid + + # Validate transaction type + transaction_type = transaction_data.get('type') + action = transaction_data.get('action') + + if transaction_type != 'marketplace': + return {"error": "Invalid transaction type for marketplace service"}, 400 + + try: + if action == 'offer': + offer = MarketplaceOffer(**transaction_data) + session.add(offer) + elif action == 'bid': + bid = MarketplaceBid(**transaction_data) + session.add(bid) + else: + return {"error": f"Invalid action: {action}. Only 'offer' and 'bid' are currently supported"}, 400 + + await session.commit() + return {"status": "success"} + except Exception as e: + await session.rollback() + logger.error(f"Transaction submission error: {e}") + return {"error": str(e)}, 500 + + +@app.get("/v1/transactions") +async def get_transactions( + transaction_type: str | None = None, + action: str | None = None, + status: str | None = None, + island_id: str | None = None, + session: AsyncSession = Depends(get_session_dep), +): + """Query marketplace transactions""" + from .domain.marketplace import MarketplaceOffer, MarketplaceBid + from sqlalchemy import select + + try: + transactions = [] + + # Query offers + if action == 'offer' or not action: + result = await session.execute(select(MarketplaceOffer)) + offers = result.scalars().all() + transactions.extend([{ + "id": o.id, + "action": "offer", + "provider": o.provider, + "capacity": o.capacity, + "price": o.price, + "status": o.status, + "gpu_model": o.gpu_model, + "gpu_memory_gb": o.gpu_memory_gb, + "gpu_count": o.gpu_count, + "price_per_hour": o.price_per_hour, + "region": o.region, + "created_at": o.created_at.isoformat() if o.created_at else None + } for o in offers]) + + # Query bids + if action == 'bid' or not action: + result = await session.execute(select(MarketplaceBid)) + bids = result.scalars().all() + transactions.extend([{ + "id": b.id, + "action": "bid", + "provider": b.provider, + "capacity": b.capacity, + "price": b.price, + "status": b.status, + "submitted_at": b.submitted_at.isoformat() if b.submitted_at else None + } for b in bids]) + + # Apply filters + if status: + transactions = [t for t in transactions if t.get('status') == status] + if island_id: + transactions = [t for t in transactions if t.get('provider') == island_id] + + return transactions + except Exception as e: + logger.error(f"Transaction query error: {e}") + return {"error": str(e)}, 500 + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8102) diff --git a/apps/marketplace-service-debug/src/marketplace_service/services/__init__.py b/apps/marketplace-service-debug/src/marketplace_service/services/__init__.py new file mode 100644 index 00000000..5dfb55c6 --- /dev/null +++ b/apps/marketplace-service-debug/src/marketplace_service/services/__init__.py @@ -0,0 +1,7 @@ +""" +Marketplace Service services +""" + +from .marketplace_service import MarketplaceService + +__all__ = ["MarketplaceService"] diff --git a/apps/marketplace-service-debug/src/marketplace_service/services/marketplace_service.py b/apps/marketplace-service-debug/src/marketplace_service/services/marketplace_service.py new file mode 100644 index 00000000..01825f12 --- /dev/null +++ b/apps/marketplace-service-debug/src/marketplace_service/services/marketplace_service.py @@ -0,0 +1,115 @@ +""" +Marketplace service for managing marketplace operations +""" + +from typing import Any + +from sqlmodel import select +from sqlalchemy.ext.asyncio import AsyncSession + +from aitbc import get_logger +from ..domain.marketplace import MarketplaceOffer, MarketplaceBid + +logger = get_logger(__name__) + + +class MarketplaceService: + def __init__(self, session: AsyncSession): + self.session = session + + async def list_offers( + self, + status: str | None = None, + region: str | None = None, + gpu_model: str | None = None, + ) -> list[MarketplaceOffer]: + """List marketplace offers""" + try: + logger.info(f"list_offers called with filters: status={status}, region={region}, gpu_model={gpu_model}") + stmt = select(MarketplaceOffer) + if status: + stmt = stmt.where(MarketplaceOffer.status == status) + if region: + stmt = stmt.where(MarketplaceOffer.region == region) + if gpu_model: + stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model) + logger.info("Executing database query for offers") + result = list((await self.session.execute(stmt)).all()) + logger.info(f"Retrieved {len(result)} offers") + return result + except Exception as e: + logger.error(f"Error in list_offers: {type(e).__name__}: {str(e)}") + raise + + async def get_offer(self, offer_id: str) -> MarketplaceOffer | None: + """Get a specific marketplace offer""" + try: + logger.info(f"get_offer called with offer_id={offer_id}") + stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id) + result = (await self.session.execute(stmt)).first() + offer = result[0] if result else None + logger.info(f"Retrieved offer: {offer_id}, found: {offer is not None}") + return offer + except Exception as e: + logger.error(f"Error in get_offer: {type(e).__name__}: {str(e)}") + raise + + async def create_offer(self, offer_data: dict) -> MarketplaceOffer: + """Create a new marketplace offer""" + try: + logger.info(f"create_offer called with data keys: {offer_data.keys()}") + offer = MarketplaceOffer(**offer_data) + self.session.add(offer) + await self.session.commit() + await self.session.refresh(offer) + logger.info(f"Created offer with id: {offer.id}") + return offer + except Exception as e: + logger.error(f"Error in create_offer: {type(e).__name__}: {str(e)}") + raise + + async def list_bids( + self, + status: str | None = None, + provider: str | None = None, + ) -> list[MarketplaceBid]: + """List marketplace bids""" + try: + logger.info(f"list_bids called with filters: status={status}, provider={provider}") + stmt = select(MarketplaceBid) + if status: + stmt = stmt.where(MarketplaceBid.status == status) + if provider: + stmt = stmt.where(MarketplaceBid.provider == provider) + logger.info("Executing database query for bids") + result = list((await self.session.execute(stmt)).all()) + logger.info(f"Retrieved {len(result)} bids") + return result + except Exception as e: + logger.error(f"Error in list_bids: {type(e).__name__}: {str(e)}") + raise + + async def create_bid(self, bid_data: dict) -> MarketplaceBid: + """Create a new marketplace bid""" + try: + logger.info(f"create_bid called with data keys: {bid_data.keys()}") + bid = MarketplaceBid(**bid_data) + self.session.add(bid) + await self.session.commit() + await self.session.refresh(bid) + logger.info(f"Created bid with id: {bid.id}") + return bid + except Exception as e: + logger.error(f"Error in create_bid: {type(e).__name__}: {str(e)}") + raise + + async def get_analytics(self, period_type: str = "daily") -> dict[str, Any]: + """Get marketplace analytics""" + # Placeholder for analytics logic + return { + "period_type": period_type, + "total_offers": 0, + "total_transactions": 0, + "total_volume": 0.0, + "average_price": 0.0, + } diff --git a/apps/marketplace-service-debug/src/marketplace_service/storage.py b/apps/marketplace-service-debug/src/marketplace_service/storage.py new file mode 100644 index 00000000..b18b6f3e --- /dev/null +++ b/apps/marketplace-service-debug/src/marketplace_service/storage.py @@ -0,0 +1,54 @@ +""" +Database session management for Marketplace service +""" + +import os +from contextlib import asynccontextmanager +from typing import AsyncIterator + +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlmodel import SQLModel + +from aitbc import get_logger + +logger = get_logger(__name__) + +# Database URL from environment variable or default +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./data/marketplace_service.db") + +# Create async engine +engine = create_async_engine(DATABASE_URL, echo=False) + + +async def init_db() -> None: + """Initialize database tables""" + try: + logger.info("Initializing database tables") + from .domain.marketplace import MarketplaceOffer, MarketplaceBid + from .domain.global_marketplace import ( + MarketplaceRegion, + GlobalMarketplaceConfig, + GlobalMarketplaceOffer, + GlobalMarketplaceTransaction, + ) + + async with engine.begin() as conn: + await conn.run_sync(SQLModel.metadata.create_all) + + logger.info("Marketplace service database initialized") + except Exception as e: + logger.error(f"Error initializing database: {type(e).__name__}: {str(e)}") + raise + + +async def get_session() -> AsyncIterator[AsyncSession]: + """Get database session""" + try: + logger.debug("Creating database session") + async with AsyncSession(engine) as session: + logger.debug("Database session created successfully") + yield session + logger.debug("Database session closed") + except Exception as e: + logger.error(f"Error in get_session: {type(e).__name__}: {str(e)}") + raise diff --git a/apps/marketplace-service/src/marketplace_service/main.py b/apps/marketplace-service/src/marketplace_service/main.py index 223fd305..0fe6d8ab 100644 --- a/apps/marketplace-service/src/marketplace_service/main.py +++ b/apps/marketplace-service/src/marketplace_service/main.py @@ -52,10 +52,8 @@ app.add_middleware(RequestValidationMiddleware, max_request_size=10*1024*1024) app.add_middleware(ErrorHandlerMiddleware) -async def get_session_dep() -> AsyncIterator[AsyncSession]: - """Get database session dependency""" - async with get_session() as session: - yield session +# Use get_session() directly as dependency - FastAPI handles async generators +get_session_dep = get_session class HealthResponse(BaseModel): @@ -115,7 +113,14 @@ async def get_offers( svc: MarketplaceService = Depends(get_marketplace_service), ): """Get marketplace offers""" - return svc.list_offers(status=status, region=region, gpu_model=gpu_model) + try: + logger.info(f"GET /v1/marketplace/offers called with filters: status={status}, region={region}, gpu_model={gpu_model}") + result = await svc.list_offers(status=status, region=region, gpu_model=gpu_model) + logger.info(f"GET /v1/marketplace/offers returned {len(result)} offers") + return result + except Exception as e: + logger.error(f"Error in GET /v1/marketplace/offers: {type(e).__name__}: {str(e)}") + raise @app.get("/v1/marketplace/offers/{offer_id}") @@ -124,7 +129,14 @@ async def get_offer( svc: MarketplaceService = Depends(get_marketplace_service), ): """Get a specific marketplace offer""" - return svc.get_offer(offer_id) + try: + logger.info(f"GET /v1/marketplace/offers/{offer_id} called") + result = await svc.get_offer(offer_id) + logger.info(f"GET /v1/marketplace/offers/{offer_id} returned: {result is not None}") + return result + except Exception as e: + logger.error(f"Error in GET /v1/marketplace/offers/{offer_id}: {type(e).__name__}: {str(e)}") + raise @app.post("/v1/marketplace/offers") @@ -133,7 +145,14 @@ async def create_offer( svc: MarketplaceService = Depends(get_marketplace_service), ): """Create a new marketplace offer""" - return svc.create_offer(offer_data) + try: + logger.info(f"POST /v1/marketplace/offers called with data keys: {offer_data.keys()}") + result = await svc.create_offer(offer_data) + logger.info(f"POST /v1/marketplace/offers created offer with id: {result.id}") + return result + except Exception as e: + logger.error(f"Error in POST /v1/marketplace/offers: {type(e).__name__}: {str(e)}") + raise @app.get("/v1/marketplace/bids") @@ -143,7 +162,14 @@ async def get_bids( svc: MarketplaceService = Depends(get_marketplace_service), ): """Get marketplace bids""" - return svc.list_bids(status=status, provider=provider) + try: + logger.info(f"GET /v1/marketplace/bids called with filters: status={status}, provider={provider}") + result = await svc.list_bids(status=status, provider=provider) + logger.info(f"GET /v1/marketplace/bids returned {len(result)} bids") + return result + except Exception as e: + logger.error(f"Error in GET /v1/marketplace/bids: {type(e).__name__}: {str(e)}") + raise @app.post("/v1/marketplace/bids") @@ -152,7 +178,14 @@ async def create_bid( svc: MarketplaceService = Depends(get_marketplace_service), ): """Create a new marketplace bid""" - return svc.create_bid(bid_data) + try: + logger.info(f"POST /v1/marketplace/bids called with data keys: {bid_data.keys()}") + result = await svc.create_bid(bid_data) + logger.info(f"POST /v1/marketplace/bids created bid with id: {result.id}") + return result + except Exception as e: + logger.error(f"Error in POST /v1/marketplace/bids: {type(e).__name__}: {str(e)}") + raise @app.get("/v1/marketplace/analytics") diff --git a/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py b/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py index 57cfc2ee..01825f12 100644 --- a/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py +++ b/apps/marketplace-service/src/marketplace_service/services/marketplace_service.py @@ -4,65 +4,104 @@ Marketplace service for managing marketplace operations from typing import Any -from sqlmodel import Session, select +from sqlmodel import select +from sqlalchemy.ext.asyncio import AsyncSession +from aitbc import get_logger from ..domain.marketplace import MarketplaceOffer, MarketplaceBid +logger = get_logger(__name__) + class MarketplaceService: - def __init__(self, session: Session): + def __init__(self, session: AsyncSession): self.session = session - def list_offers( + async def list_offers( self, status: str | None = None, region: str | None = None, gpu_model: str | None = None, ) -> list[MarketplaceOffer]: """List marketplace offers""" - stmt = select(MarketplaceOffer) - if status: - stmt = stmt.where(MarketplaceOffer.status == status) - if region: - stmt = stmt.where(MarketplaceOffer.region == region) - if gpu_model: - stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model) - return list(self.session.execute(stmt).all()) + try: + logger.info(f"list_offers called with filters: status={status}, region={region}, gpu_model={gpu_model}") + stmt = select(MarketplaceOffer) + if status: + stmt = stmt.where(MarketplaceOffer.status == status) + if region: + stmt = stmt.where(MarketplaceOffer.region == region) + if gpu_model: + stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model) + logger.info("Executing database query for offers") + result = list((await self.session.execute(stmt)).all()) + logger.info(f"Retrieved {len(result)} offers") + return result + except Exception as e: + logger.error(f"Error in list_offers: {type(e).__name__}: {str(e)}") + raise - def get_offer(self, offer_id: str) -> MarketplaceOffer | None: + async def get_offer(self, offer_id: str) -> MarketplaceOffer | None: """Get a specific marketplace offer""" - stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id) - result = self.session.execute(stmt).first() - return result[0] if result else None + try: + logger.info(f"get_offer called with offer_id={offer_id}") + stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id) + result = (await self.session.execute(stmt)).first() + offer = result[0] if result else None + logger.info(f"Retrieved offer: {offer_id}, found: {offer is not None}") + return offer + except Exception as e: + logger.error(f"Error in get_offer: {type(e).__name__}: {str(e)}") + raise - def create_offer(self, offer_data: dict) -> MarketplaceOffer: + async def create_offer(self, offer_data: dict) -> MarketplaceOffer: """Create a new marketplace offer""" - offer = MarketplaceOffer(**offer_data) - self.session.add(offer) - self.session.commit() - self.session.refresh(offer) - return offer + try: + logger.info(f"create_offer called with data keys: {offer_data.keys()}") + offer = MarketplaceOffer(**offer_data) + self.session.add(offer) + await self.session.commit() + await self.session.refresh(offer) + logger.info(f"Created offer with id: {offer.id}") + return offer + except Exception as e: + logger.error(f"Error in create_offer: {type(e).__name__}: {str(e)}") + raise - def list_bids( + async def list_bids( self, status: str | None = None, provider: str | None = None, ) -> list[MarketplaceBid]: """List marketplace bids""" - stmt = select(MarketplaceBid) - if status: - stmt = stmt.where(MarketplaceBid.status == status) - if provider: - stmt = stmt.where(MarketplaceBid.provider == provider) - return list(self.session.execute(stmt).all()) + try: + logger.info(f"list_bids called with filters: status={status}, provider={provider}") + stmt = select(MarketplaceBid) + if status: + stmt = stmt.where(MarketplaceBid.status == status) + if provider: + stmt = stmt.where(MarketplaceBid.provider == provider) + logger.info("Executing database query for bids") + result = list((await self.session.execute(stmt)).all()) + logger.info(f"Retrieved {len(result)} bids") + return result + except Exception as e: + logger.error(f"Error in list_bids: {type(e).__name__}: {str(e)}") + raise - def create_bid(self, bid_data: dict) -> MarketplaceBid: + async def create_bid(self, bid_data: dict) -> MarketplaceBid: """Create a new marketplace bid""" - bid = MarketplaceBid(**bid_data) - self.session.add(bid) - self.session.commit() - self.session.refresh(bid) - return bid + try: + logger.info(f"create_bid called with data keys: {bid_data.keys()}") + bid = MarketplaceBid(**bid_data) + self.session.add(bid) + await self.session.commit() + await self.session.refresh(bid) + logger.info(f"Created bid with id: {bid.id}") + return bid + except Exception as e: + logger.error(f"Error in create_bid: {type(e).__name__}: {str(e)}") + raise async def get_analytics(self, period_type: str = "daily") -> dict[str, Any]: """Get marketplace analytics""" diff --git a/apps/marketplace-service/src/marketplace_service/storage.py b/apps/marketplace-service/src/marketplace_service/storage.py index a6a62b6b..c22309a4 100644 --- a/apps/marketplace-service/src/marketplace_service/storage.py +++ b/apps/marketplace-service/src/marketplace_service/storage.py @@ -36,8 +36,14 @@ async def init_db() -> None: logger.info("Marketplace service database initialized") -@asynccontextmanager async def get_session() -> AsyncIterator[AsyncSession]: """Get database session""" - async with AsyncSession(engine) as session: - yield session + try: + logger.debug("Creating database session") + async with AsyncSession(engine) as session: + logger.debug("Database session created successfully") + yield session + logger.debug("Database session closed") + except Exception as e: + logger.error(f"Error in get_session: {type(e).__name__}: {str(e)}") + raise