Add async/await support and comprehensive logging to marketplace service with session management improvements
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Successful in 3s
Deploy to Testnet / deploy-testnet (push) Successful in 1m13s
Integration Tests / test-service-integration (push) Successful in 2m39s
Multi-Node Stress Testing / stress-test (push) Successful in 2s
Node Failover Simulation / failover-test (push) Successful in 3s
Python Tests / test-python (push) Failing after 1m10s
Security Scanning / security-scan (push) Has been cancelled

- Convert MarketplaceService methods to async (list_offers, get_offer, create_offer, list_bids, create_bid)
- Add await to all database operations (execute, commit, refresh)
- Change Session to AsyncSession in MarketplaceService constructor
- Add try-except blocks with detailed logging to all service methods
- Add logging for method entry, database queries, result counts, and errors
- Add try-except with logging to get_session generator
- Simpl
This commit is contained in:
aitbc
2026-05-08 19:17:11 +02:00
parent 4ee9705670
commit 130a295366
11 changed files with 840 additions and 46 deletions

View File

@@ -0,0 +1,6 @@
"""
AITBC Marketplace Service
Manages GPU marketplace operations
"""
__version__ = "0.1.0"

View File

@@ -0,0 +1,24 @@
"""
Marketplace Service domain models
"""
from .marketplace import MarketplaceOffer, MarketplaceBid
from .global_marketplace import (
MarketplaceStatus,
RegionStatus,
MarketplaceRegion,
GlobalMarketplaceConfig,
GlobalMarketplaceOffer,
GlobalMarketplaceTransaction,
)
__all__ = [
"MarketplaceOffer",
"MarketplaceBid",
"MarketplaceStatus",
"RegionStatus",
"MarketplaceRegion",
"GlobalMarketplaceConfig",
"GlobalMarketplaceOffer",
"GlobalMarketplaceTransaction",
]

View File

@@ -0,0 +1,170 @@
"""
Global Marketplace Domain Models
Domain models for global marketplace operations, multi-region support, and cross-chain integration
"""
from __future__ import annotations
from datetime import datetime, timezone
from enum import StrEnum
from typing import Any
from uuid import uuid4
from sqlmodel import JSON, Column, Field, SQLModel
class MarketplaceStatus(StrEnum):
"""Global marketplace offer status"""
ACTIVE = "active"
INACTIVE = "inactive"
PENDING = "pending"
COMPLETED = "completed"
CANCELLED = "cancelled"
EXPIRED = "expired"
class RegionStatus(StrEnum):
"""Global marketplace region status"""
ACTIVE = "active"
INACTIVE = "inactive"
MAINTENANCE = "maintenance"
DEPRECATED = "deprecated"
class MarketplaceRegion(SQLModel, table=True):
"""Global marketplace region configuration"""
__tablename__ = "marketplace_regions"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"region_{uuid4().hex[:8]}", primary_key=True)
region_code: str = Field(index=True, unique=True)
region_name: str = Field(index=True)
geographic_area: str = Field(default="global")
base_currency: str = Field(default="USD")
timezone: str = Field(default="UTC")
language: str = Field(default="en")
load_factor: float = Field(default=1.0, ge=0.1, le=10.0)
max_concurrent_requests: int = Field(default=1000)
priority_weight: float = Field(default=1.0, ge=0.1, le=10.0)
status: RegionStatus = Field(default=RegionStatus.ACTIVE)
health_score: float = Field(default=1.0, ge=0.0, le=1.0)
last_health_check: datetime | None = Field(default=None)
api_endpoint: str = Field(default="")
websocket_endpoint: str = Field(default="")
blockchain_rpc_endpoints: dict[str, str] = Field(default_factory=dict, sa_column=Column(JSON))
average_response_time: float = Field(default=0.0)
request_rate: float = Field(default=0.0)
error_rate: float = Field(default=0.0)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class GlobalMarketplaceConfig(SQLModel, table=True):
"""Global marketplace configuration settings"""
__tablename__ = "global_marketplace_configs"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"config_{uuid4().hex[:8]}", primary_key=True)
config_key: str = Field(index=True, unique=True)
config_value: str = Field(default="")
config_type: str = Field(default="string")
description: str = Field(default="")
category: str = Field(default="general")
is_public: bool = Field(default=False)
is_encrypted: bool = Field(default=False)
min_value: float | None = Field(default=None)
max_value: float | None = Field(default=None)
allowed_values: list[str] = Field(default_factory=list, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
last_modified_by: str | None = Field(default=None)
class GlobalMarketplaceOffer(SQLModel, table=True):
"""Global marketplace offer with multi-region support"""
__tablename__ = "global_marketplace_offers"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"offer_{uuid4().hex[:8]}", primary_key=True)
original_offer_id: str = Field(index=True)
agent_id: str = Field(index=True)
service_type: str = Field(index=True)
resource_specification: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
base_price: float = Field(default=0.0)
currency: str = Field(default="USD")
price_per_region: dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
dynamic_pricing_enabled: bool = Field(default=False)
total_capacity: int = Field(default=0)
available_capacity: int = Field(default=0)
regions_available: list[str] = Field(default_factory=list, sa_column=Column(JSON))
global_status: MarketplaceStatus = Field(default=MarketplaceStatus.ACTIVE)
region_statuses: dict[str, MarketplaceStatus] = Field(default_factory=dict, sa_column=Column(JSON))
global_rating: float = Field(default=0.0, ge=0.0, le=5.0)
total_transactions: int = Field(default=0)
success_rate: float = Field(default=0.0, ge=0.0, le=1.0)
supported_chains: list[int] = Field(default_factory=list, sa_column=Column(JSON))
cross_chain_pricing: dict[int, float] = Field(default_factory=dict, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
expires_at: datetime | None = Field(default=None)
class GlobalMarketplaceTransaction(SQLModel, table=True):
"""Global marketplace transaction with cross-chain support"""
__tablename__ = "global_marketplace_transactions"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"tx_{uuid4().hex[:8]}", primary_key=True)
transaction_hash: str | None = Field(index=True)
buyer_id: str = Field(index=True)
seller_id: str = Field(index=True)
offer_id: str = Field(index=True)
service_type: str = Field(index=True)
quantity: int = Field(default=1)
unit_price: float = Field(default=0.0)
total_amount: float = Field(default=0.0)
currency: str = Field(default="USD")
source_chain: int | None = Field(default=None)
target_chain: int | None = Field(default=None)
bridge_transaction_id: str | None = Field(default=None)
cross_chain_fee: float = Field(default=0.0)
source_region: str = Field(default="global")
target_region: str = Field(default="global")
regional_fees: dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
status: str = Field(default="pending")
payment_status: str = Field(default="pending")
delivery_status: str = Field(default="pending")
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
confirmed_at: datetime | None = Field(default=None)
completed_at: datetime | None = Field(default=None)
transaction_data: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
from datetime import datetime, timezone
from uuid import uuid4
from sqlalchemy import JSON, Column
from sqlmodel import Field, SQLModel
class MarketplaceOffer(SQLModel, table=True):
__tablename__ = "marketplaceoffer"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True)
provider: str = Field(index=True)
capacity: int = Field(default=0, nullable=False)
price: float = Field(default=0.0, nullable=False)
sla: str = Field(default="")
status: str = Field(default="open", max_length=20)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=False, index=True)
attributes: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=False))
# GPU-specific fields
gpu_model: str | None = Field(default=None, index=True)
gpu_memory_gb: int | None = Field(default=None)
gpu_count: int | None = Field(default=1)
cuda_version: str | None = Field(default=None)
price_per_hour: float | None = Field(default=None)
region: str | None = Field(default=None, index=True)
class MarketplaceBid(SQLModel, table=True):
__tablename__ = "marketplacebid"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True)
provider: str = Field(index=True)
capacity: int = Field(default=0, nullable=False)
price: float = Field(default=0.0, nullable=False)
notes: str | None = Field(default=None)
status: str = Field(default="pending", nullable=False)
submitted_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=False, index=True)

View File

@@ -0,0 +1,299 @@
"""
Marketplace Service main application
Manages GPU marketplace operations
"""
from contextlib import asynccontextmanager
from typing import AsyncIterator
from fastapi import FastAPI, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from aitbc import (
configure_logging,
get_logger,
RequestIDMiddleware,
PerformanceLoggingMiddleware,
RequestValidationMiddleware,
ErrorHandlerMiddleware,
)
from .storage import init_db, get_session
from .services.marketplace_service import MarketplaceService
# Configure structured logging
configure_logging(level="INFO")
logger = get_logger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""Lifecycle events for the Marketplace Service."""
logger.info("Starting Marketplace Service")
# Initialize database
await init_db()
yield
logger.info("Shutting down Marketplace Service")
app = FastAPI(
title="AITBC Marketplace Service",
description="Manages GPU marketplace operations",
version="0.1.0",
lifespan=lifespan,
)
# Add middleware
app.add_middleware(RequestIDMiddleware)
app.add_middleware(PerformanceLoggingMiddleware)
app.add_middleware(RequestValidationMiddleware, max_request_size=10*1024*1024)
# app.add_middleware(ErrorHandlerMiddleware) # Temporarily disabled for debugging
# Use get_session() directly as dependency - FastAPI handles @asynccontextmanager
get_session_dep = get_session
class HealthResponse(BaseModel):
"""Health check response"""
status: str
service: str
@app.get("/health")
async def health() -> HealthResponse:
"""Health check endpoint"""
return HealthResponse(status="healthy", service="marketplace-service")
@app.get("/ready")
async def ready() -> dict[str, str]:
"""Readiness check - verifies database connectivity"""
try:
async with get_session() as session:
# Test database connection
await session.execute("SELECT 1")
return {"status": "ready", "service": "marketplace-service"}
except Exception as e:
logger.error(f"Readiness check failed: {e}")
return JSONResponse(
status_code=503,
content={"status": "not_ready", "service": "marketplace-service", "error": str(e)},
)
@app.get("/live")
async def live() -> dict[str, str]:
"""Liveness check - verifies service is not stuck"""
return {"status": "alive", "service": "marketplace-service"}
@app.get("/marketplace/status")
async def marketplace_status() -> dict[str, str]:
"""Get marketplace status"""
return {
"status": "operational",
"service": "marketplace-service",
"message": "Marketplace service is running",
}
async def get_marketplace_service(session: AsyncSession = Depends(get_session)) -> MarketplaceService:
"""Get marketplace service instance"""
return MarketplaceService(session)
@app.get("/v1/marketplace/offers")
async def get_offers(
status: str | None = None,
region: str | None = None,
gpu_model: str | None = None,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get marketplace offers"""
try:
logger.info(f"GET /v1/marketplace/offers called with filters: status={status}, region={region}, gpu_model={gpu_model}")
result = await svc.list_offers(status=status, region=region, gpu_model=gpu_model)
logger.info(f"GET /v1/marketplace/offers returned {len(result)} offers")
return result
except Exception as e:
logger.error(f"Error in GET /v1/marketplace/offers: {type(e).__name__}: {str(e)}")
raise
@app.get("/v1/marketplace/offers/{offer_id}")
async def get_offer(
offer_id: str,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get a specific marketplace offer"""
try:
logger.info(f"GET /v1/marketplace/offers/{offer_id} called")
result = await svc.get_offer(offer_id)
logger.info(f"GET /v1/marketplace/offers/{offer_id} returned: {result is not None}")
return result
except Exception as e:
logger.error(f"Error in GET /v1/marketplace/offers/{offer_id}: {type(e).__name__}: {str(e)}")
raise
@app.post("/v1/marketplace/offers")
async def create_offer(
offer_data: dict,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Create a new marketplace offer"""
try:
logger.info(f"POST /v1/marketplace/offers called with data keys: {offer_data.keys()}")
result = await svc.create_offer(offer_data)
logger.info(f"POST /v1/marketplace/offers created offer with id: {result.id}")
return result
except Exception as e:
logger.error(f"Error in POST /v1/marketplace/offers: {type(e).__name__}: {str(e)}")
raise
@app.get("/v1/marketplace/bids")
async def get_bids(
status: str | None = None,
provider: str | None = None,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get marketplace bids"""
try:
logger.info(f"GET /v1/marketplace/bids called with filters: status={status}, provider={provider}")
result = await svc.list_bids(status=status, provider=provider)
logger.info(f"GET /v1/marketplace/bids returned {len(result)} bids")
return result
except Exception as e:
logger.error(f"Error in GET /v1/marketplace/bids: {type(e).__name__}: {str(e)}")
raise
@app.post("/v1/marketplace/bids")
async def create_bid(
bid_data: dict,
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Create a new marketplace bid"""
try:
logger.info(f"POST /v1/marketplace/bids called with data keys: {bid_data.keys()}")
result = await svc.create_bid(bid_data)
logger.info(f"POST /v1/marketplace/bids created bid with id: {result.id}")
return result
except Exception as e:
logger.error(f"Error in POST /v1/marketplace/bids: {type(e).__name__}: {str(e)}")
raise
@app.get("/v1/marketplace/analytics")
async def get_analytics(
period_type: str = "daily",
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get marketplace analytics"""
try:
logger.info(f"GET /v1/marketplace/analytics called with period_type={period_type}")
result = await svc.get_analytics(period_type=period_type)
logger.info(f"GET /v1/marketplace/analytics returned analytics data")
return result
except Exception as e:
logger.error(f"Error in GET /v1/marketplace/analytics: {type(e).__name__}: {str(e)}")
raise
@app.post("/v1/transactions")
async def submit_transaction(transaction_data: dict, session: AsyncSession = Depends(get_session_dep)):
"""Submit marketplace transaction"""
from .domain.marketplace import MarketplaceOffer, MarketplaceBid
# Validate transaction type
transaction_type = transaction_data.get('type')
action = transaction_data.get('action')
if transaction_type != 'marketplace':
return {"error": "Invalid transaction type for marketplace service"}, 400
try:
if action == 'offer':
offer = MarketplaceOffer(**transaction_data)
session.add(offer)
elif action == 'bid':
bid = MarketplaceBid(**transaction_data)
session.add(bid)
else:
return {"error": f"Invalid action: {action}. Only 'offer' and 'bid' are currently supported"}, 400
await session.commit()
return {"status": "success"}
except Exception as e:
await session.rollback()
logger.error(f"Transaction submission error: {e}")
return {"error": str(e)}, 500
@app.get("/v1/transactions")
async def get_transactions(
transaction_type: str | None = None,
action: str | None = None,
status: str | None = None,
island_id: str | None = None,
session: AsyncSession = Depends(get_session_dep),
):
"""Query marketplace transactions"""
from .domain.marketplace import MarketplaceOffer, MarketplaceBid
from sqlalchemy import select
try:
transactions = []
# Query offers
if action == 'offer' or not action:
result = await session.execute(select(MarketplaceOffer))
offers = result.scalars().all()
transactions.extend([{
"id": o.id,
"action": "offer",
"provider": o.provider,
"capacity": o.capacity,
"price": o.price,
"status": o.status,
"gpu_model": o.gpu_model,
"gpu_memory_gb": o.gpu_memory_gb,
"gpu_count": o.gpu_count,
"price_per_hour": o.price_per_hour,
"region": o.region,
"created_at": o.created_at.isoformat() if o.created_at else None
} for o in offers])
# Query bids
if action == 'bid' or not action:
result = await session.execute(select(MarketplaceBid))
bids = result.scalars().all()
transactions.extend([{
"id": b.id,
"action": "bid",
"provider": b.provider,
"capacity": b.capacity,
"price": b.price,
"status": b.status,
"submitted_at": b.submitted_at.isoformat() if b.submitted_at else None
} for b in bids])
# Apply filters
if status:
transactions = [t for t in transactions if t.get('status') == status]
if island_id:
transactions = [t for t in transactions if t.get('provider') == island_id]
return transactions
except Exception as e:
logger.error(f"Transaction query error: {e}")
return {"error": str(e)}, 500
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8102)

View File

@@ -0,0 +1,7 @@
"""
Marketplace Service services
"""
from .marketplace_service import MarketplaceService
__all__ = ["MarketplaceService"]

View File

@@ -0,0 +1,115 @@
"""
Marketplace service for managing marketplace operations
"""
from typing import Any
from sqlmodel import select
from sqlalchemy.ext.asyncio import AsyncSession
from aitbc import get_logger
from ..domain.marketplace import MarketplaceOffer, MarketplaceBid
logger = get_logger(__name__)
class MarketplaceService:
def __init__(self, session: AsyncSession):
self.session = session
async def list_offers(
self,
status: str | None = None,
region: str | None = None,
gpu_model: str | None = None,
) -> list[MarketplaceOffer]:
"""List marketplace offers"""
try:
logger.info(f"list_offers called with filters: status={status}, region={region}, gpu_model={gpu_model}")
stmt = select(MarketplaceOffer)
if status:
stmt = stmt.where(MarketplaceOffer.status == status)
if region:
stmt = stmt.where(MarketplaceOffer.region == region)
if gpu_model:
stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model)
logger.info("Executing database query for offers")
result = list((await self.session.execute(stmt)).all())
logger.info(f"Retrieved {len(result)} offers")
return result
except Exception as e:
logger.error(f"Error in list_offers: {type(e).__name__}: {str(e)}")
raise
async def get_offer(self, offer_id: str) -> MarketplaceOffer | None:
"""Get a specific marketplace offer"""
try:
logger.info(f"get_offer called with offer_id={offer_id}")
stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id)
result = (await self.session.execute(stmt)).first()
offer = result[0] if result else None
logger.info(f"Retrieved offer: {offer_id}, found: {offer is not None}")
return offer
except Exception as e:
logger.error(f"Error in get_offer: {type(e).__name__}: {str(e)}")
raise
async def create_offer(self, offer_data: dict) -> MarketplaceOffer:
"""Create a new marketplace offer"""
try:
logger.info(f"create_offer called with data keys: {offer_data.keys()}")
offer = MarketplaceOffer(**offer_data)
self.session.add(offer)
await self.session.commit()
await self.session.refresh(offer)
logger.info(f"Created offer with id: {offer.id}")
return offer
except Exception as e:
logger.error(f"Error in create_offer: {type(e).__name__}: {str(e)}")
raise
async def list_bids(
self,
status: str | None = None,
provider: str | None = None,
) -> list[MarketplaceBid]:
"""List marketplace bids"""
try:
logger.info(f"list_bids called with filters: status={status}, provider={provider}")
stmt = select(MarketplaceBid)
if status:
stmt = stmt.where(MarketplaceBid.status == status)
if provider:
stmt = stmt.where(MarketplaceBid.provider == provider)
logger.info("Executing database query for bids")
result = list((await self.session.execute(stmt)).all())
logger.info(f"Retrieved {len(result)} bids")
return result
except Exception as e:
logger.error(f"Error in list_bids: {type(e).__name__}: {str(e)}")
raise
async def create_bid(self, bid_data: dict) -> MarketplaceBid:
"""Create a new marketplace bid"""
try:
logger.info(f"create_bid called with data keys: {bid_data.keys()}")
bid = MarketplaceBid(**bid_data)
self.session.add(bid)
await self.session.commit()
await self.session.refresh(bid)
logger.info(f"Created bid with id: {bid.id}")
return bid
except Exception as e:
logger.error(f"Error in create_bid: {type(e).__name__}: {str(e)}")
raise
async def get_analytics(self, period_type: str = "daily") -> dict[str, Any]:
"""Get marketplace analytics"""
# Placeholder for analytics logic
return {
"period_type": period_type,
"total_offers": 0,
"total_transactions": 0,
"total_volume": 0.0,
"average_price": 0.0,
}

View File

@@ -0,0 +1,54 @@
"""
Database session management for Marketplace service
"""
import os
from contextlib import asynccontextmanager
from typing import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlmodel import SQLModel
from aitbc import get_logger
logger = get_logger(__name__)
# Database URL from environment variable or default
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./data/marketplace_service.db")
# Create async engine
engine = create_async_engine(DATABASE_URL, echo=False)
async def init_db() -> None:
"""Initialize database tables"""
try:
logger.info("Initializing database tables")
from .domain.marketplace import MarketplaceOffer, MarketplaceBid
from .domain.global_marketplace import (
MarketplaceRegion,
GlobalMarketplaceConfig,
GlobalMarketplaceOffer,
GlobalMarketplaceTransaction,
)
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
logger.info("Marketplace service database initialized")
except Exception as e:
logger.error(f"Error initializing database: {type(e).__name__}: {str(e)}")
raise
async def get_session() -> AsyncIterator[AsyncSession]:
"""Get database session"""
try:
logger.debug("Creating database session")
async with AsyncSession(engine) as session:
logger.debug("Database session created successfully")
yield session
logger.debug("Database session closed")
except Exception as e:
logger.error(f"Error in get_session: {type(e).__name__}: {str(e)}")
raise

View File

@@ -52,10 +52,8 @@ app.add_middleware(RequestValidationMiddleware, max_request_size=10*1024*1024)
app.add_middleware(ErrorHandlerMiddleware)
async def get_session_dep() -> AsyncIterator[AsyncSession]:
"""Get database session dependency"""
async with get_session() as session:
yield session
# Use get_session() directly as dependency - FastAPI handles async generators
get_session_dep = get_session
class HealthResponse(BaseModel):
@@ -115,7 +113,14 @@ async def get_offers(
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get marketplace offers"""
return svc.list_offers(status=status, region=region, gpu_model=gpu_model)
try:
logger.info(f"GET /v1/marketplace/offers called with filters: status={status}, region={region}, gpu_model={gpu_model}")
result = await svc.list_offers(status=status, region=region, gpu_model=gpu_model)
logger.info(f"GET /v1/marketplace/offers returned {len(result)} offers")
return result
except Exception as e:
logger.error(f"Error in GET /v1/marketplace/offers: {type(e).__name__}: {str(e)}")
raise
@app.get("/v1/marketplace/offers/{offer_id}")
@@ -124,7 +129,14 @@ async def get_offer(
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get a specific marketplace offer"""
return svc.get_offer(offer_id)
try:
logger.info(f"GET /v1/marketplace/offers/{offer_id} called")
result = await svc.get_offer(offer_id)
logger.info(f"GET /v1/marketplace/offers/{offer_id} returned: {result is not None}")
return result
except Exception as e:
logger.error(f"Error in GET /v1/marketplace/offers/{offer_id}: {type(e).__name__}: {str(e)}")
raise
@app.post("/v1/marketplace/offers")
@@ -133,7 +145,14 @@ async def create_offer(
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Create a new marketplace offer"""
return svc.create_offer(offer_data)
try:
logger.info(f"POST /v1/marketplace/offers called with data keys: {offer_data.keys()}")
result = await svc.create_offer(offer_data)
logger.info(f"POST /v1/marketplace/offers created offer with id: {result.id}")
return result
except Exception as e:
logger.error(f"Error in POST /v1/marketplace/offers: {type(e).__name__}: {str(e)}")
raise
@app.get("/v1/marketplace/bids")
@@ -143,7 +162,14 @@ async def get_bids(
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Get marketplace bids"""
return svc.list_bids(status=status, provider=provider)
try:
logger.info(f"GET /v1/marketplace/bids called with filters: status={status}, provider={provider}")
result = await svc.list_bids(status=status, provider=provider)
logger.info(f"GET /v1/marketplace/bids returned {len(result)} bids")
return result
except Exception as e:
logger.error(f"Error in GET /v1/marketplace/bids: {type(e).__name__}: {str(e)}")
raise
@app.post("/v1/marketplace/bids")
@@ -152,7 +178,14 @@ async def create_bid(
svc: MarketplaceService = Depends(get_marketplace_service),
):
"""Create a new marketplace bid"""
return svc.create_bid(bid_data)
try:
logger.info(f"POST /v1/marketplace/bids called with data keys: {bid_data.keys()}")
result = await svc.create_bid(bid_data)
logger.info(f"POST /v1/marketplace/bids created bid with id: {result.id}")
return result
except Exception as e:
logger.error(f"Error in POST /v1/marketplace/bids: {type(e).__name__}: {str(e)}")
raise
@app.get("/v1/marketplace/analytics")

View File

@@ -4,65 +4,104 @@ Marketplace service for managing marketplace operations
from typing import Any
from sqlmodel import Session, select
from sqlmodel import select
from sqlalchemy.ext.asyncio import AsyncSession
from aitbc import get_logger
from ..domain.marketplace import MarketplaceOffer, MarketplaceBid
logger = get_logger(__name__)
class MarketplaceService:
def __init__(self, session: Session):
def __init__(self, session: AsyncSession):
self.session = session
def list_offers(
async def list_offers(
self,
status: str | None = None,
region: str | None = None,
gpu_model: str | None = None,
) -> list[MarketplaceOffer]:
"""List marketplace offers"""
stmt = select(MarketplaceOffer)
if status:
stmt = stmt.where(MarketplaceOffer.status == status)
if region:
stmt = stmt.where(MarketplaceOffer.region == region)
if gpu_model:
stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model)
return list(self.session.execute(stmt).all())
try:
logger.info(f"list_offers called with filters: status={status}, region={region}, gpu_model={gpu_model}")
stmt = select(MarketplaceOffer)
if status:
stmt = stmt.where(MarketplaceOffer.status == status)
if region:
stmt = stmt.where(MarketplaceOffer.region == region)
if gpu_model:
stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model)
logger.info("Executing database query for offers")
result = list((await self.session.execute(stmt)).all())
logger.info(f"Retrieved {len(result)} offers")
return result
except Exception as e:
logger.error(f"Error in list_offers: {type(e).__name__}: {str(e)}")
raise
def get_offer(self, offer_id: str) -> MarketplaceOffer | None:
async def get_offer(self, offer_id: str) -> MarketplaceOffer | None:
"""Get a specific marketplace offer"""
stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id)
result = self.session.execute(stmt).first()
return result[0] if result else None
try:
logger.info(f"get_offer called with offer_id={offer_id}")
stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id)
result = (await self.session.execute(stmt)).first()
offer = result[0] if result else None
logger.info(f"Retrieved offer: {offer_id}, found: {offer is not None}")
return offer
except Exception as e:
logger.error(f"Error in get_offer: {type(e).__name__}: {str(e)}")
raise
def create_offer(self, offer_data: dict) -> MarketplaceOffer:
async def create_offer(self, offer_data: dict) -> MarketplaceOffer:
"""Create a new marketplace offer"""
offer = MarketplaceOffer(**offer_data)
self.session.add(offer)
self.session.commit()
self.session.refresh(offer)
return offer
try:
logger.info(f"create_offer called with data keys: {offer_data.keys()}")
offer = MarketplaceOffer(**offer_data)
self.session.add(offer)
await self.session.commit()
await self.session.refresh(offer)
logger.info(f"Created offer with id: {offer.id}")
return offer
except Exception as e:
logger.error(f"Error in create_offer: {type(e).__name__}: {str(e)}")
raise
def list_bids(
async def list_bids(
self,
status: str | None = None,
provider: str | None = None,
) -> list[MarketplaceBid]:
"""List marketplace bids"""
stmt = select(MarketplaceBid)
if status:
stmt = stmt.where(MarketplaceBid.status == status)
if provider:
stmt = stmt.where(MarketplaceBid.provider == provider)
return list(self.session.execute(stmt).all())
try:
logger.info(f"list_bids called with filters: status={status}, provider={provider}")
stmt = select(MarketplaceBid)
if status:
stmt = stmt.where(MarketplaceBid.status == status)
if provider:
stmt = stmt.where(MarketplaceBid.provider == provider)
logger.info("Executing database query for bids")
result = list((await self.session.execute(stmt)).all())
logger.info(f"Retrieved {len(result)} bids")
return result
except Exception as e:
logger.error(f"Error in list_bids: {type(e).__name__}: {str(e)}")
raise
def create_bid(self, bid_data: dict) -> MarketplaceBid:
async def create_bid(self, bid_data: dict) -> MarketplaceBid:
"""Create a new marketplace bid"""
bid = MarketplaceBid(**bid_data)
self.session.add(bid)
self.session.commit()
self.session.refresh(bid)
return bid
try:
logger.info(f"create_bid called with data keys: {bid_data.keys()}")
bid = MarketplaceBid(**bid_data)
self.session.add(bid)
await self.session.commit()
await self.session.refresh(bid)
logger.info(f"Created bid with id: {bid.id}")
return bid
except Exception as e:
logger.error(f"Error in create_bid: {type(e).__name__}: {str(e)}")
raise
async def get_analytics(self, period_type: str = "daily") -> dict[str, Any]:
"""Get marketplace analytics"""

View File

@@ -36,8 +36,14 @@ async def init_db() -> None:
logger.info("Marketplace service database initialized")
@asynccontextmanager
async def get_session() -> AsyncIterator[AsyncSession]:
"""Get database session"""
async with AsyncSession(engine) as session:
yield session
try:
logger.debug("Creating database session")
async with AsyncSession(engine) as session:
logger.debug("Database session created successfully")
yield session
logger.debug("Database session closed")
except Exception as e:
logger.error(f"Error in get_session: {type(e).__name__}: {str(e)}")
raise