Add async/await support and comprehensive logging to marketplace service with session management improvements
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Successful in 3s
Deploy to Testnet / deploy-testnet (push) Successful in 1m13s
Integration Tests / test-service-integration (push) Successful in 2m39s
Multi-Node Stress Testing / stress-test (push) Successful in 2s
Node Failover Simulation / failover-test (push) Successful in 3s
Python Tests / test-python (push) Failing after 1m10s
Security Scanning / security-scan (push) Has been cancelled
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Successful in 3s
Deploy to Testnet / deploy-testnet (push) Successful in 1m13s
Integration Tests / test-service-integration (push) Successful in 2m39s
Multi-Node Stress Testing / stress-test (push) Successful in 2s
Node Failover Simulation / failover-test (push) Successful in 3s
Python Tests / test-python (push) Failing after 1m10s
Security Scanning / security-scan (push) Has been cancelled
- Convert MarketplaceService methods to async (list_offers, get_offer, create_offer, list_bids, create_bid) - Add await to all database operations (execute, commit, refresh) - Change Session to AsyncSession in MarketplaceService constructor - Add try-except blocks with detailed logging to all service methods - Add logging for method entry, database queries, result counts, and errors - Add try-except with logging to get_session generator - Simpl
This commit is contained in:
@@ -0,0 +1,6 @@
|
||||
"""
|
||||
AITBC Marketplace Service
|
||||
Manages GPU marketplace operations
|
||||
"""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
@@ -0,0 +1,24 @@
|
||||
"""
|
||||
Marketplace Service domain models
|
||||
"""
|
||||
|
||||
from .marketplace import MarketplaceOffer, MarketplaceBid
|
||||
from .global_marketplace import (
|
||||
MarketplaceStatus,
|
||||
RegionStatus,
|
||||
MarketplaceRegion,
|
||||
GlobalMarketplaceConfig,
|
||||
GlobalMarketplaceOffer,
|
||||
GlobalMarketplaceTransaction,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"MarketplaceOffer",
|
||||
"MarketplaceBid",
|
||||
"MarketplaceStatus",
|
||||
"RegionStatus",
|
||||
"MarketplaceRegion",
|
||||
"GlobalMarketplaceConfig",
|
||||
"GlobalMarketplaceOffer",
|
||||
"GlobalMarketplaceTransaction",
|
||||
]
|
||||
@@ -0,0 +1,170 @@
|
||||
"""
|
||||
Global Marketplace Domain Models
|
||||
Domain models for global marketplace operations, multi-region support, and cross-chain integration
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlmodel import JSON, Column, Field, SQLModel
|
||||
|
||||
|
||||
class MarketplaceStatus(StrEnum):
|
||||
"""Global marketplace offer status"""
|
||||
|
||||
ACTIVE = "active"
|
||||
INACTIVE = "inactive"
|
||||
PENDING = "pending"
|
||||
COMPLETED = "completed"
|
||||
CANCELLED = "cancelled"
|
||||
EXPIRED = "expired"
|
||||
|
||||
|
||||
class RegionStatus(StrEnum):
|
||||
"""Global marketplace region status"""
|
||||
|
||||
ACTIVE = "active"
|
||||
INACTIVE = "inactive"
|
||||
MAINTENANCE = "maintenance"
|
||||
DEPRECATED = "deprecated"
|
||||
|
||||
|
||||
class MarketplaceRegion(SQLModel, table=True):
|
||||
"""Global marketplace region configuration"""
|
||||
|
||||
__tablename__ = "marketplace_regions"
|
||||
__table_args__ = {"extend_existing": True}
|
||||
|
||||
id: str = Field(default_factory=lambda: f"region_{uuid4().hex[:8]}", primary_key=True)
|
||||
region_code: str = Field(index=True, unique=True)
|
||||
region_name: str = Field(index=True)
|
||||
geographic_area: str = Field(default="global")
|
||||
|
||||
base_currency: str = Field(default="USD")
|
||||
timezone: str = Field(default="UTC")
|
||||
language: str = Field(default="en")
|
||||
|
||||
load_factor: float = Field(default=1.0, ge=0.1, le=10.0)
|
||||
max_concurrent_requests: int = Field(default=1000)
|
||||
priority_weight: float = Field(default=1.0, ge=0.1, le=10.0)
|
||||
|
||||
status: RegionStatus = Field(default=RegionStatus.ACTIVE)
|
||||
health_score: float = Field(default=1.0, ge=0.0, le=1.0)
|
||||
last_health_check: datetime | None = Field(default=None)
|
||||
|
||||
api_endpoint: str = Field(default="")
|
||||
websocket_endpoint: str = Field(default="")
|
||||
blockchain_rpc_endpoints: dict[str, str] = Field(default_factory=dict, sa_column=Column(JSON))
|
||||
|
||||
average_response_time: float = Field(default=0.0)
|
||||
request_rate: float = Field(default=0.0)
|
||||
error_rate: float = Field(default=0.0)
|
||||
|
||||
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
|
||||
|
||||
class GlobalMarketplaceConfig(SQLModel, table=True):
|
||||
"""Global marketplace configuration settings"""
|
||||
|
||||
__tablename__ = "global_marketplace_configs"
|
||||
__table_args__ = {"extend_existing": True}
|
||||
|
||||
id: str = Field(default_factory=lambda: f"config_{uuid4().hex[:8]}", primary_key=True)
|
||||
config_key: str = Field(index=True, unique=True)
|
||||
config_value: str = Field(default="")
|
||||
config_type: str = Field(default="string")
|
||||
|
||||
description: str = Field(default="")
|
||||
category: str = Field(default="general")
|
||||
is_public: bool = Field(default=False)
|
||||
is_encrypted: bool = Field(default=False)
|
||||
|
||||
min_value: float | None = Field(default=None)
|
||||
max_value: float | None = Field(default=None)
|
||||
allowed_values: list[str] = Field(default_factory=list, sa_column=Column(JSON))
|
||||
|
||||
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
last_modified_by: str | None = Field(default=None)
|
||||
|
||||
|
||||
class GlobalMarketplaceOffer(SQLModel, table=True):
|
||||
"""Global marketplace offer with multi-region support"""
|
||||
|
||||
__tablename__ = "global_marketplace_offers"
|
||||
__table_args__ = {"extend_existing": True}
|
||||
|
||||
id: str = Field(default_factory=lambda: f"offer_{uuid4().hex[:8]}", primary_key=True)
|
||||
original_offer_id: str = Field(index=True)
|
||||
|
||||
agent_id: str = Field(index=True)
|
||||
service_type: str = Field(index=True)
|
||||
resource_specification: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
|
||||
|
||||
base_price: float = Field(default=0.0)
|
||||
currency: str = Field(default="USD")
|
||||
price_per_region: dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
|
||||
dynamic_pricing_enabled: bool = Field(default=False)
|
||||
|
||||
total_capacity: int = Field(default=0)
|
||||
available_capacity: int = Field(default=0)
|
||||
regions_available: list[str] = Field(default_factory=list, sa_column=Column(JSON))
|
||||
|
||||
global_status: MarketplaceStatus = Field(default=MarketplaceStatus.ACTIVE)
|
||||
region_statuses: dict[str, MarketplaceStatus] = Field(default_factory=dict, sa_column=Column(JSON))
|
||||
|
||||
global_rating: float = Field(default=0.0, ge=0.0, le=5.0)
|
||||
total_transactions: int = Field(default=0)
|
||||
success_rate: float = Field(default=0.0, ge=0.0, le=1.0)
|
||||
|
||||
supported_chains: list[int] = Field(default_factory=list, sa_column=Column(JSON))
|
||||
cross_chain_pricing: dict[int, float] = Field(default_factory=dict, sa_column=Column(JSON))
|
||||
|
||||
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
expires_at: datetime | None = Field(default=None)
|
||||
|
||||
|
||||
class GlobalMarketplaceTransaction(SQLModel, table=True):
|
||||
"""Global marketplace transaction with cross-chain support"""
|
||||
|
||||
__tablename__ = "global_marketplace_transactions"
|
||||
__table_args__ = {"extend_existing": True}
|
||||
|
||||
id: str = Field(default_factory=lambda: f"tx_{uuid4().hex[:8]}", primary_key=True)
|
||||
transaction_hash: str | None = Field(index=True)
|
||||
|
||||
buyer_id: str = Field(index=True)
|
||||
seller_id: str = Field(index=True)
|
||||
offer_id: str = Field(index=True)
|
||||
|
||||
service_type: str = Field(index=True)
|
||||
quantity: int = Field(default=1)
|
||||
unit_price: float = Field(default=0.0)
|
||||
total_amount: float = Field(default=0.0)
|
||||
currency: str = Field(default="USD")
|
||||
|
||||
source_chain: int | None = Field(default=None)
|
||||
target_chain: int | None = Field(default=None)
|
||||
bridge_transaction_id: str | None = Field(default=None)
|
||||
cross_chain_fee: float = Field(default=0.0)
|
||||
|
||||
source_region: str = Field(default="global")
|
||||
target_region: str = Field(default="global")
|
||||
regional_fees: dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
|
||||
|
||||
status: str = Field(default="pending")
|
||||
payment_status: str = Field(default="pending")
|
||||
delivery_status: str = Field(default="pending")
|
||||
|
||||
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
confirmed_at: datetime | None = Field(default=None)
|
||||
completed_at: datetime | None = Field(default=None)
|
||||
|
||||
transaction_data: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
|
||||
@@ -0,0 +1,41 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy import JSON, Column
|
||||
from sqlmodel import Field, SQLModel
|
||||
|
||||
|
||||
class MarketplaceOffer(SQLModel, table=True):
|
||||
__tablename__ = "marketplaceoffer"
|
||||
__table_args__ = {"extend_existing": True}
|
||||
|
||||
id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True)
|
||||
provider: str = Field(index=True)
|
||||
capacity: int = Field(default=0, nullable=False)
|
||||
price: float = Field(default=0.0, nullable=False)
|
||||
sla: str = Field(default="")
|
||||
status: str = Field(default="open", max_length=20)
|
||||
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=False, index=True)
|
||||
attributes: dict = Field(default_factory=dict, sa_column=Column(JSON, nullable=False))
|
||||
# GPU-specific fields
|
||||
gpu_model: str | None = Field(default=None, index=True)
|
||||
gpu_memory_gb: int | None = Field(default=None)
|
||||
gpu_count: int | None = Field(default=1)
|
||||
cuda_version: str | None = Field(default=None)
|
||||
price_per_hour: float | None = Field(default=None)
|
||||
region: str | None = Field(default=None, index=True)
|
||||
|
||||
|
||||
class MarketplaceBid(SQLModel, table=True):
|
||||
__tablename__ = "marketplacebid"
|
||||
__table_args__ = {"extend_existing": True}
|
||||
|
||||
id: str = Field(default_factory=lambda: uuid4().hex, primary_key=True)
|
||||
provider: str = Field(index=True)
|
||||
capacity: int = Field(default=0, nullable=False)
|
||||
price: float = Field(default=0.0, nullable=False)
|
||||
notes: str | None = Field(default=None)
|
||||
status: str = Field(default="pending", nullable=False)
|
||||
submitted_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), nullable=False, index=True)
|
||||
299
apps/marketplace-service-debug/src/marketplace_service/main.py
Normal file
299
apps/marketplace-service-debug/src/marketplace_service/main.py
Normal file
@@ -0,0 +1,299 @@
|
||||
"""
|
||||
Marketplace Service main application
|
||||
Manages GPU marketplace operations
|
||||
"""
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import AsyncIterator
|
||||
|
||||
from fastapi import FastAPI, Depends
|
||||
from fastapi.responses import JSONResponse
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from aitbc import (
|
||||
configure_logging,
|
||||
get_logger,
|
||||
RequestIDMiddleware,
|
||||
PerformanceLoggingMiddleware,
|
||||
RequestValidationMiddleware,
|
||||
ErrorHandlerMiddleware,
|
||||
)
|
||||
|
||||
from .storage import init_db, get_session
|
||||
from .services.marketplace_service import MarketplaceService
|
||||
|
||||
# Configure structured logging
|
||||
configure_logging(level="INFO")
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
|
||||
"""Lifecycle events for the Marketplace Service."""
|
||||
logger.info("Starting Marketplace Service")
|
||||
# Initialize database
|
||||
await init_db()
|
||||
yield
|
||||
logger.info("Shutting down Marketplace Service")
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="AITBC Marketplace Service",
|
||||
description="Manages GPU marketplace operations",
|
||||
version="0.1.0",
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
# Add middleware
|
||||
app.add_middleware(RequestIDMiddleware)
|
||||
app.add_middleware(PerformanceLoggingMiddleware)
|
||||
app.add_middleware(RequestValidationMiddleware, max_request_size=10*1024*1024)
|
||||
# app.add_middleware(ErrorHandlerMiddleware) # Temporarily disabled for debugging
|
||||
|
||||
|
||||
# Use get_session() directly as dependency - FastAPI handles @asynccontextmanager
|
||||
get_session_dep = get_session
|
||||
|
||||
|
||||
class HealthResponse(BaseModel):
|
||||
"""Health check response"""
|
||||
status: str
|
||||
service: str
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health() -> HealthResponse:
|
||||
"""Health check endpoint"""
|
||||
return HealthResponse(status="healthy", service="marketplace-service")
|
||||
|
||||
|
||||
@app.get("/ready")
|
||||
async def ready() -> dict[str, str]:
|
||||
"""Readiness check - verifies database connectivity"""
|
||||
try:
|
||||
async with get_session() as session:
|
||||
# Test database connection
|
||||
await session.execute("SELECT 1")
|
||||
return {"status": "ready", "service": "marketplace-service"}
|
||||
except Exception as e:
|
||||
logger.error(f"Readiness check failed: {e}")
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={"status": "not_ready", "service": "marketplace-service", "error": str(e)},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/live")
|
||||
async def live() -> dict[str, str]:
|
||||
"""Liveness check - verifies service is not stuck"""
|
||||
return {"status": "alive", "service": "marketplace-service"}
|
||||
|
||||
|
||||
@app.get("/marketplace/status")
|
||||
async def marketplace_status() -> dict[str, str]:
|
||||
"""Get marketplace status"""
|
||||
return {
|
||||
"status": "operational",
|
||||
"service": "marketplace-service",
|
||||
"message": "Marketplace service is running",
|
||||
}
|
||||
|
||||
|
||||
async def get_marketplace_service(session: AsyncSession = Depends(get_session)) -> MarketplaceService:
|
||||
"""Get marketplace service instance"""
|
||||
return MarketplaceService(session)
|
||||
|
||||
|
||||
@app.get("/v1/marketplace/offers")
|
||||
async def get_offers(
|
||||
status: str | None = None,
|
||||
region: str | None = None,
|
||||
gpu_model: str | None = None,
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Get marketplace offers"""
|
||||
try:
|
||||
logger.info(f"GET /v1/marketplace/offers called with filters: status={status}, region={region}, gpu_model={gpu_model}")
|
||||
result = await svc.list_offers(status=status, region=region, gpu_model=gpu_model)
|
||||
logger.info(f"GET /v1/marketplace/offers returned {len(result)} offers")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in GET /v1/marketplace/offers: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.get("/v1/marketplace/offers/{offer_id}")
|
||||
async def get_offer(
|
||||
offer_id: str,
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Get a specific marketplace offer"""
|
||||
try:
|
||||
logger.info(f"GET /v1/marketplace/offers/{offer_id} called")
|
||||
result = await svc.get_offer(offer_id)
|
||||
logger.info(f"GET /v1/marketplace/offers/{offer_id} returned: {result is not None}")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in GET /v1/marketplace/offers/{offer_id}: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.post("/v1/marketplace/offers")
|
||||
async def create_offer(
|
||||
offer_data: dict,
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Create a new marketplace offer"""
|
||||
try:
|
||||
logger.info(f"POST /v1/marketplace/offers called with data keys: {offer_data.keys()}")
|
||||
result = await svc.create_offer(offer_data)
|
||||
logger.info(f"POST /v1/marketplace/offers created offer with id: {result.id}")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in POST /v1/marketplace/offers: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.get("/v1/marketplace/bids")
|
||||
async def get_bids(
|
||||
status: str | None = None,
|
||||
provider: str | None = None,
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Get marketplace bids"""
|
||||
try:
|
||||
logger.info(f"GET /v1/marketplace/bids called with filters: status={status}, provider={provider}")
|
||||
result = await svc.list_bids(status=status, provider=provider)
|
||||
logger.info(f"GET /v1/marketplace/bids returned {len(result)} bids")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in GET /v1/marketplace/bids: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.post("/v1/marketplace/bids")
|
||||
async def create_bid(
|
||||
bid_data: dict,
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Create a new marketplace bid"""
|
||||
try:
|
||||
logger.info(f"POST /v1/marketplace/bids called with data keys: {bid_data.keys()}")
|
||||
result = await svc.create_bid(bid_data)
|
||||
logger.info(f"POST /v1/marketplace/bids created bid with id: {result.id}")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in POST /v1/marketplace/bids: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.get("/v1/marketplace/analytics")
|
||||
async def get_analytics(
|
||||
period_type: str = "daily",
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Get marketplace analytics"""
|
||||
try:
|
||||
logger.info(f"GET /v1/marketplace/analytics called with period_type={period_type}")
|
||||
result = await svc.get_analytics(period_type=period_type)
|
||||
logger.info(f"GET /v1/marketplace/analytics returned analytics data")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in GET /v1/marketplace/analytics: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.post("/v1/transactions")
|
||||
async def submit_transaction(transaction_data: dict, session: AsyncSession = Depends(get_session_dep)):
|
||||
"""Submit marketplace transaction"""
|
||||
from .domain.marketplace import MarketplaceOffer, MarketplaceBid
|
||||
|
||||
# Validate transaction type
|
||||
transaction_type = transaction_data.get('type')
|
||||
action = transaction_data.get('action')
|
||||
|
||||
if transaction_type != 'marketplace':
|
||||
return {"error": "Invalid transaction type for marketplace service"}, 400
|
||||
|
||||
try:
|
||||
if action == 'offer':
|
||||
offer = MarketplaceOffer(**transaction_data)
|
||||
session.add(offer)
|
||||
elif action == 'bid':
|
||||
bid = MarketplaceBid(**transaction_data)
|
||||
session.add(bid)
|
||||
else:
|
||||
return {"error": f"Invalid action: {action}. Only 'offer' and 'bid' are currently supported"}, 400
|
||||
|
||||
await session.commit()
|
||||
return {"status": "success"}
|
||||
except Exception as e:
|
||||
await session.rollback()
|
||||
logger.error(f"Transaction submission error: {e}")
|
||||
return {"error": str(e)}, 500
|
||||
|
||||
|
||||
@app.get("/v1/transactions")
|
||||
async def get_transactions(
|
||||
transaction_type: str | None = None,
|
||||
action: str | None = None,
|
||||
status: str | None = None,
|
||||
island_id: str | None = None,
|
||||
session: AsyncSession = Depends(get_session_dep),
|
||||
):
|
||||
"""Query marketplace transactions"""
|
||||
from .domain.marketplace import MarketplaceOffer, MarketplaceBid
|
||||
from sqlalchemy import select
|
||||
|
||||
try:
|
||||
transactions = []
|
||||
|
||||
# Query offers
|
||||
if action == 'offer' or not action:
|
||||
result = await session.execute(select(MarketplaceOffer))
|
||||
offers = result.scalars().all()
|
||||
transactions.extend([{
|
||||
"id": o.id,
|
||||
"action": "offer",
|
||||
"provider": o.provider,
|
||||
"capacity": o.capacity,
|
||||
"price": o.price,
|
||||
"status": o.status,
|
||||
"gpu_model": o.gpu_model,
|
||||
"gpu_memory_gb": o.gpu_memory_gb,
|
||||
"gpu_count": o.gpu_count,
|
||||
"price_per_hour": o.price_per_hour,
|
||||
"region": o.region,
|
||||
"created_at": o.created_at.isoformat() if o.created_at else None
|
||||
} for o in offers])
|
||||
|
||||
# Query bids
|
||||
if action == 'bid' or not action:
|
||||
result = await session.execute(select(MarketplaceBid))
|
||||
bids = result.scalars().all()
|
||||
transactions.extend([{
|
||||
"id": b.id,
|
||||
"action": "bid",
|
||||
"provider": b.provider,
|
||||
"capacity": b.capacity,
|
||||
"price": b.price,
|
||||
"status": b.status,
|
||||
"submitted_at": b.submitted_at.isoformat() if b.submitted_at else None
|
||||
} for b in bids])
|
||||
|
||||
# Apply filters
|
||||
if status:
|
||||
transactions = [t for t in transactions if t.get('status') == status]
|
||||
if island_id:
|
||||
transactions = [t for t in transactions if t.get('provider') == island_id]
|
||||
|
||||
return transactions
|
||||
except Exception as e:
|
||||
logger.error(f"Transaction query error: {e}")
|
||||
return {"error": str(e)}, 500
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8102)
|
||||
@@ -0,0 +1,7 @@
|
||||
"""
|
||||
Marketplace Service services
|
||||
"""
|
||||
|
||||
from .marketplace_service import MarketplaceService
|
||||
|
||||
__all__ = ["MarketplaceService"]
|
||||
@@ -0,0 +1,115 @@
|
||||
"""
|
||||
Marketplace service for managing marketplace operations
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from sqlmodel import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from aitbc import get_logger
|
||||
from ..domain.marketplace import MarketplaceOffer, MarketplaceBid
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class MarketplaceService:
|
||||
def __init__(self, session: AsyncSession):
|
||||
self.session = session
|
||||
|
||||
async def list_offers(
|
||||
self,
|
||||
status: str | None = None,
|
||||
region: str | None = None,
|
||||
gpu_model: str | None = None,
|
||||
) -> list[MarketplaceOffer]:
|
||||
"""List marketplace offers"""
|
||||
try:
|
||||
logger.info(f"list_offers called with filters: status={status}, region={region}, gpu_model={gpu_model}")
|
||||
stmt = select(MarketplaceOffer)
|
||||
if status:
|
||||
stmt = stmt.where(MarketplaceOffer.status == status)
|
||||
if region:
|
||||
stmt = stmt.where(MarketplaceOffer.region == region)
|
||||
if gpu_model:
|
||||
stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model)
|
||||
logger.info("Executing database query for offers")
|
||||
result = list((await self.session.execute(stmt)).all())
|
||||
logger.info(f"Retrieved {len(result)} offers")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in list_offers: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
async def get_offer(self, offer_id: str) -> MarketplaceOffer | None:
|
||||
"""Get a specific marketplace offer"""
|
||||
try:
|
||||
logger.info(f"get_offer called with offer_id={offer_id}")
|
||||
stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id)
|
||||
result = (await self.session.execute(stmt)).first()
|
||||
offer = result[0] if result else None
|
||||
logger.info(f"Retrieved offer: {offer_id}, found: {offer is not None}")
|
||||
return offer
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_offer: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
async def create_offer(self, offer_data: dict) -> MarketplaceOffer:
|
||||
"""Create a new marketplace offer"""
|
||||
try:
|
||||
logger.info(f"create_offer called with data keys: {offer_data.keys()}")
|
||||
offer = MarketplaceOffer(**offer_data)
|
||||
self.session.add(offer)
|
||||
await self.session.commit()
|
||||
await self.session.refresh(offer)
|
||||
logger.info(f"Created offer with id: {offer.id}")
|
||||
return offer
|
||||
except Exception as e:
|
||||
logger.error(f"Error in create_offer: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
async def list_bids(
|
||||
self,
|
||||
status: str | None = None,
|
||||
provider: str | None = None,
|
||||
) -> list[MarketplaceBid]:
|
||||
"""List marketplace bids"""
|
||||
try:
|
||||
logger.info(f"list_bids called with filters: status={status}, provider={provider}")
|
||||
stmt = select(MarketplaceBid)
|
||||
if status:
|
||||
stmt = stmt.where(MarketplaceBid.status == status)
|
||||
if provider:
|
||||
stmt = stmt.where(MarketplaceBid.provider == provider)
|
||||
logger.info("Executing database query for bids")
|
||||
result = list((await self.session.execute(stmt)).all())
|
||||
logger.info(f"Retrieved {len(result)} bids")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in list_bids: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
async def create_bid(self, bid_data: dict) -> MarketplaceBid:
|
||||
"""Create a new marketplace bid"""
|
||||
try:
|
||||
logger.info(f"create_bid called with data keys: {bid_data.keys()}")
|
||||
bid = MarketplaceBid(**bid_data)
|
||||
self.session.add(bid)
|
||||
await self.session.commit()
|
||||
await self.session.refresh(bid)
|
||||
logger.info(f"Created bid with id: {bid.id}")
|
||||
return bid
|
||||
except Exception as e:
|
||||
logger.error(f"Error in create_bid: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
async def get_analytics(self, period_type: str = "daily") -> dict[str, Any]:
|
||||
"""Get marketplace analytics"""
|
||||
# Placeholder for analytics logic
|
||||
return {
|
||||
"period_type": period_type,
|
||||
"total_offers": 0,
|
||||
"total_transactions": 0,
|
||||
"total_volume": 0.0,
|
||||
"average_price": 0.0,
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
"""
|
||||
Database session management for Marketplace service
|
||||
"""
|
||||
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import AsyncIterator
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
||||
from sqlmodel import SQLModel
|
||||
|
||||
from aitbc import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Database URL from environment variable or default
|
||||
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./data/marketplace_service.db")
|
||||
|
||||
# Create async engine
|
||||
engine = create_async_engine(DATABASE_URL, echo=False)
|
||||
|
||||
|
||||
async def init_db() -> None:
|
||||
"""Initialize database tables"""
|
||||
try:
|
||||
logger.info("Initializing database tables")
|
||||
from .domain.marketplace import MarketplaceOffer, MarketplaceBid
|
||||
from .domain.global_marketplace import (
|
||||
MarketplaceRegion,
|
||||
GlobalMarketplaceConfig,
|
||||
GlobalMarketplaceOffer,
|
||||
GlobalMarketplaceTransaction,
|
||||
)
|
||||
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(SQLModel.metadata.create_all)
|
||||
|
||||
logger.info("Marketplace service database initialized")
|
||||
except Exception as e:
|
||||
logger.error(f"Error initializing database: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
async def get_session() -> AsyncIterator[AsyncSession]:
|
||||
"""Get database session"""
|
||||
try:
|
||||
logger.debug("Creating database session")
|
||||
async with AsyncSession(engine) as session:
|
||||
logger.debug("Database session created successfully")
|
||||
yield session
|
||||
logger.debug("Database session closed")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_session: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
@@ -52,10 +52,8 @@ app.add_middleware(RequestValidationMiddleware, max_request_size=10*1024*1024)
|
||||
app.add_middleware(ErrorHandlerMiddleware)
|
||||
|
||||
|
||||
async def get_session_dep() -> AsyncIterator[AsyncSession]:
|
||||
"""Get database session dependency"""
|
||||
async with get_session() as session:
|
||||
yield session
|
||||
# Use get_session() directly as dependency - FastAPI handles async generators
|
||||
get_session_dep = get_session
|
||||
|
||||
|
||||
class HealthResponse(BaseModel):
|
||||
@@ -115,7 +113,14 @@ async def get_offers(
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Get marketplace offers"""
|
||||
return svc.list_offers(status=status, region=region, gpu_model=gpu_model)
|
||||
try:
|
||||
logger.info(f"GET /v1/marketplace/offers called with filters: status={status}, region={region}, gpu_model={gpu_model}")
|
||||
result = await svc.list_offers(status=status, region=region, gpu_model=gpu_model)
|
||||
logger.info(f"GET /v1/marketplace/offers returned {len(result)} offers")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in GET /v1/marketplace/offers: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.get("/v1/marketplace/offers/{offer_id}")
|
||||
@@ -124,7 +129,14 @@ async def get_offer(
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Get a specific marketplace offer"""
|
||||
return svc.get_offer(offer_id)
|
||||
try:
|
||||
logger.info(f"GET /v1/marketplace/offers/{offer_id} called")
|
||||
result = await svc.get_offer(offer_id)
|
||||
logger.info(f"GET /v1/marketplace/offers/{offer_id} returned: {result is not None}")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in GET /v1/marketplace/offers/{offer_id}: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.post("/v1/marketplace/offers")
|
||||
@@ -133,7 +145,14 @@ async def create_offer(
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Create a new marketplace offer"""
|
||||
return svc.create_offer(offer_data)
|
||||
try:
|
||||
logger.info(f"POST /v1/marketplace/offers called with data keys: {offer_data.keys()}")
|
||||
result = await svc.create_offer(offer_data)
|
||||
logger.info(f"POST /v1/marketplace/offers created offer with id: {result.id}")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in POST /v1/marketplace/offers: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.get("/v1/marketplace/bids")
|
||||
@@ -143,7 +162,14 @@ async def get_bids(
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Get marketplace bids"""
|
||||
return svc.list_bids(status=status, provider=provider)
|
||||
try:
|
||||
logger.info(f"GET /v1/marketplace/bids called with filters: status={status}, provider={provider}")
|
||||
result = await svc.list_bids(status=status, provider=provider)
|
||||
logger.info(f"GET /v1/marketplace/bids returned {len(result)} bids")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in GET /v1/marketplace/bids: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.post("/v1/marketplace/bids")
|
||||
@@ -152,7 +178,14 @@ async def create_bid(
|
||||
svc: MarketplaceService = Depends(get_marketplace_service),
|
||||
):
|
||||
"""Create a new marketplace bid"""
|
||||
return svc.create_bid(bid_data)
|
||||
try:
|
||||
logger.info(f"POST /v1/marketplace/bids called with data keys: {bid_data.keys()}")
|
||||
result = await svc.create_bid(bid_data)
|
||||
logger.info(f"POST /v1/marketplace/bids created bid with id: {result.id}")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in POST /v1/marketplace/bids: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
@app.get("/v1/marketplace/analytics")
|
||||
|
||||
@@ -4,65 +4,104 @@ Marketplace service for managing marketplace operations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from sqlmodel import Session, select
|
||||
from sqlmodel import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from aitbc import get_logger
|
||||
from ..domain.marketplace import MarketplaceOffer, MarketplaceBid
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class MarketplaceService:
|
||||
def __init__(self, session: Session):
|
||||
def __init__(self, session: AsyncSession):
|
||||
self.session = session
|
||||
|
||||
def list_offers(
|
||||
async def list_offers(
|
||||
self,
|
||||
status: str | None = None,
|
||||
region: str | None = None,
|
||||
gpu_model: str | None = None,
|
||||
) -> list[MarketplaceOffer]:
|
||||
"""List marketplace offers"""
|
||||
stmt = select(MarketplaceOffer)
|
||||
if status:
|
||||
stmt = stmt.where(MarketplaceOffer.status == status)
|
||||
if region:
|
||||
stmt = stmt.where(MarketplaceOffer.region == region)
|
||||
if gpu_model:
|
||||
stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model)
|
||||
return list(self.session.execute(stmt).all())
|
||||
try:
|
||||
logger.info(f"list_offers called with filters: status={status}, region={region}, gpu_model={gpu_model}")
|
||||
stmt = select(MarketplaceOffer)
|
||||
if status:
|
||||
stmt = stmt.where(MarketplaceOffer.status == status)
|
||||
if region:
|
||||
stmt = stmt.where(MarketplaceOffer.region == region)
|
||||
if gpu_model:
|
||||
stmt = stmt.where(MarketplaceOffer.gpu_model == gpu_model)
|
||||
logger.info("Executing database query for offers")
|
||||
result = list((await self.session.execute(stmt)).all())
|
||||
logger.info(f"Retrieved {len(result)} offers")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in list_offers: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
def get_offer(self, offer_id: str) -> MarketplaceOffer | None:
|
||||
async def get_offer(self, offer_id: str) -> MarketplaceOffer | None:
|
||||
"""Get a specific marketplace offer"""
|
||||
stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id)
|
||||
result = self.session.execute(stmt).first()
|
||||
return result[0] if result else None
|
||||
try:
|
||||
logger.info(f"get_offer called with offer_id={offer_id}")
|
||||
stmt = select(MarketplaceOffer).where(MarketplaceOffer.id == offer_id)
|
||||
result = (await self.session.execute(stmt)).first()
|
||||
offer = result[0] if result else None
|
||||
logger.info(f"Retrieved offer: {offer_id}, found: {offer is not None}")
|
||||
return offer
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_offer: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
def create_offer(self, offer_data: dict) -> MarketplaceOffer:
|
||||
async def create_offer(self, offer_data: dict) -> MarketplaceOffer:
|
||||
"""Create a new marketplace offer"""
|
||||
offer = MarketplaceOffer(**offer_data)
|
||||
self.session.add(offer)
|
||||
self.session.commit()
|
||||
self.session.refresh(offer)
|
||||
return offer
|
||||
try:
|
||||
logger.info(f"create_offer called with data keys: {offer_data.keys()}")
|
||||
offer = MarketplaceOffer(**offer_data)
|
||||
self.session.add(offer)
|
||||
await self.session.commit()
|
||||
await self.session.refresh(offer)
|
||||
logger.info(f"Created offer with id: {offer.id}")
|
||||
return offer
|
||||
except Exception as e:
|
||||
logger.error(f"Error in create_offer: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
def list_bids(
|
||||
async def list_bids(
|
||||
self,
|
||||
status: str | None = None,
|
||||
provider: str | None = None,
|
||||
) -> list[MarketplaceBid]:
|
||||
"""List marketplace bids"""
|
||||
stmt = select(MarketplaceBid)
|
||||
if status:
|
||||
stmt = stmt.where(MarketplaceBid.status == status)
|
||||
if provider:
|
||||
stmt = stmt.where(MarketplaceBid.provider == provider)
|
||||
return list(self.session.execute(stmt).all())
|
||||
try:
|
||||
logger.info(f"list_bids called with filters: status={status}, provider={provider}")
|
||||
stmt = select(MarketplaceBid)
|
||||
if status:
|
||||
stmt = stmt.where(MarketplaceBid.status == status)
|
||||
if provider:
|
||||
stmt = stmt.where(MarketplaceBid.provider == provider)
|
||||
logger.info("Executing database query for bids")
|
||||
result = list((await self.session.execute(stmt)).all())
|
||||
logger.info(f"Retrieved {len(result)} bids")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in list_bids: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
def create_bid(self, bid_data: dict) -> MarketplaceBid:
|
||||
async def create_bid(self, bid_data: dict) -> MarketplaceBid:
|
||||
"""Create a new marketplace bid"""
|
||||
bid = MarketplaceBid(**bid_data)
|
||||
self.session.add(bid)
|
||||
self.session.commit()
|
||||
self.session.refresh(bid)
|
||||
return bid
|
||||
try:
|
||||
logger.info(f"create_bid called with data keys: {bid_data.keys()}")
|
||||
bid = MarketplaceBid(**bid_data)
|
||||
self.session.add(bid)
|
||||
await self.session.commit()
|
||||
await self.session.refresh(bid)
|
||||
logger.info(f"Created bid with id: {bid.id}")
|
||||
return bid
|
||||
except Exception as e:
|
||||
logger.error(f"Error in create_bid: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
async def get_analytics(self, period_type: str = "daily") -> dict[str, Any]:
|
||||
"""Get marketplace analytics"""
|
||||
|
||||
@@ -36,8 +36,14 @@ async def init_db() -> None:
|
||||
logger.info("Marketplace service database initialized")
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_session() -> AsyncIterator[AsyncSession]:
|
||||
"""Get database session"""
|
||||
async with AsyncSession(engine) as session:
|
||||
yield session
|
||||
try:
|
||||
logger.debug("Creating database session")
|
||||
async with AsyncSession(engine) as session:
|
||||
logger.debug("Database session created successfully")
|
||||
yield session
|
||||
logger.debug("Database session closed")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_session: {type(e).__name__}: {str(e)}")
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user