From 4123d9aa4370fee3d7b73c7e9528bfa8d826cbfc Mon Sep 17 00:00:00 2001 From: aitbc Date: Thu, 14 May 2026 23:46:36 +0200 Subject: [PATCH] Start island manager background tasks in blockchain node Blockchain Node: - Added asyncio.create_task(island_manager.start()) to start background tasks - Updated log message to indicate manager is started, not just initialized Coordinator API: - Added portfolio_router to main app - Made IPFS initialization more resilient to version incompatibilities - Added session=True to ipfshttpclient.connect() call - Changed IPFS init errors from raise to warnings with graceful degradation - Added None checks --- apps/blockchain-node/src/aitbc_chain/app.py | 6 +- .../src/app/contexts/portfolio/__init__.py | 1 + .../contexts/portfolio/routers/__init__.py | 5 + .../contexts/portfolio/routers/portfolio.py | 154 +++++++++++ apps/coordinator-api/src/app/main.py | 4 + .../src/app/services/ipfs_storage_service.py | 21 +- .../services/portfolio_aggregation_service.py | 246 ++++++++++++++++++ 7 files changed, 431 insertions(+), 6 deletions(-) create mode 100644 apps/coordinator-api/src/app/contexts/portfolio/__init__.py create mode 100644 apps/coordinator-api/src/app/contexts/portfolio/routers/__init__.py create mode 100644 apps/coordinator-api/src/app/contexts/portfolio/routers/portfolio.py create mode 100644 apps/coordinator-api/src/app/services/portfolio_aggregation_service.py diff --git a/apps/blockchain-node/src/aitbc_chain/app.py b/apps/blockchain-node/src/aitbc_chain/app.py index 7defd231..7fd6432d 100755 --- a/apps/blockchain-node/src/aitbc_chain/app.py +++ b/apps/blockchain-node/src/aitbc_chain/app.py @@ -115,8 +115,10 @@ async def lifespan(app: FastAPI): node_id = os.getenv("NODE_ID", "unknown-node") default_island_id = os.getenv("DEFAULT_ISLAND_ID", f"{settings.supported_chains.split(',')[0].strip()}-island") default_chain_id = settings.supported_chains.split(',')[0].strip() if settings.supported_chains else "ait-mainnet" - create_island_manager(node_id, default_island_id, default_chain_id) - _app_logger.info("Island manager initialized", extra={"node_id": node_id, "default_island": default_island_id}) + island_manager = create_island_manager(node_id, default_island_id, default_chain_id) + # Start island manager background tasks + asyncio.create_task(island_manager.start()) + _app_logger.info("Island manager initialized and started", extra={"node_id": node_id, "default_island": default_island_id}) proposers = [] block_production_override = _env_value( diff --git a/apps/coordinator-api/src/app/contexts/portfolio/__init__.py b/apps/coordinator-api/src/app/contexts/portfolio/__init__.py new file mode 100644 index 00000000..209d0654 --- /dev/null +++ b/apps/coordinator-api/src/app/contexts/portfolio/__init__.py @@ -0,0 +1 @@ +"""Portfolio context""" diff --git a/apps/coordinator-api/src/app/contexts/portfolio/routers/__init__.py b/apps/coordinator-api/src/app/contexts/portfolio/routers/__init__.py new file mode 100644 index 00000000..abeafa47 --- /dev/null +++ b/apps/coordinator-api/src/app/contexts/portfolio/routers/__init__.py @@ -0,0 +1,5 @@ +"""Portfolio routers""" + +from .portfolio import router as portfolio_router + +__all__ = ["portfolio_router"] diff --git a/apps/coordinator-api/src/app/contexts/portfolio/routers/portfolio.py b/apps/coordinator-api/src/app/contexts/portfolio/routers/portfolio.py new file mode 100644 index 00000000..fc6e0e1f --- /dev/null +++ b/apps/coordinator-api/src/app/contexts/portfolio/routers/portfolio.py @@ -0,0 +1,154 @@ +""" +Portfolio Management API Endpoints +REST API for unified portfolio management across AITBC services +""" + +from typing import Annotated, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, Request +from pydantic import BaseModel, Field + +from aitbc import get_logger +from aitbc.rate_limiting import rate_limit + +from ....services.portfolio_aggregation_service import PortfolioAggregationService + +logger = get_logger(__name__) + +router = APIRouter(prefix="/v1/portfolio", tags=["portfolio"]) + +# Initialize portfolio aggregation service +portfolio_service = PortfolioAggregationService() + + +# Pydantic models for API requests/responses +class PortfolioSummaryResponse(BaseModel): + """Response model for unified portfolio summary""" + timestamp: str + agent_address: Optional[str] + wallet: dict + exchange: dict + marketplace: dict + trading: dict + ai_signals: dict + summary: dict + + +class PortfolioHealthResponse(BaseModel): + """Response model for portfolio health check""" + status: str + services: dict[str, str] + timestamp: str + + +# API Endpoints + +@router.get("/unified", response_model=PortfolioSummaryResponse) +@rate_limit(rate=100, per=60) +async def get_unified_portfolio( + request: Request, + agent_address: Optional[str] = Query(default=None, description="Filter by agent address"), +) -> PortfolioSummaryResponse: + """ + Get unified portfolio view aggregating data from all AITBC services + + Aggregates data from: + - Wallet service (8003): Wallet balances + - Exchange service (8011): Exchange rates + - Marketplace service (8102): Marketplace stats + - Trading service (8104): Trading analytics + - AI service (8005): AI trade signals + """ + try: + portfolio_data = await portfolio_service.get_unified_portfolio(agent_address) + return PortfolioSummaryResponse(**portfolio_data) + except Exception as e: + logger.error(f"Error getting unified portfolio: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to get portfolio data: {str(e)}") + + +@router.get("/health", response_model=PortfolioHealthResponse) +@rate_limit(rate=200, per=60) +async def get_portfolio_health(request: Request) -> PortfolioHealthResponse: + """Health check for portfolio aggregation service and dependencies""" + + services_status = {} + overall_status = "healthy" + + # Check wallet service + try: + wallet_data = await portfolio_service._get_wallet_balances() + services_status["wallet"] = "healthy" if not wallet_data.get("error") else "degraded" + if wallet_data.get("error"): + overall_status = "degraded" + except Exception: + services_status["wallet"] = "unhealthy" + overall_status = "degraded" + + # Check exchange service + try: + exchange_data = await portfolio_service._get_exchange_rates() + services_status["exchange"] = "healthy" if not exchange_data.get("error") else "degraded" + if exchange_data.get("error"): + overall_status = "degraded" + except Exception: + services_status["exchange"] = "unhealthy" + overall_status = "degraded" + + # Check marketplace service + try: + marketplace_data = await portfolio_service._get_marketplace_stats() + services_status["marketplace"] = "healthy" if not marketplace_data.get("error") else "degraded" + if marketplace_data.get("error"): + overall_status = "degraded" + except Exception: + services_status["marketplace"] = "unhealthy" + overall_status = "degraded" + + # Check trading service + try: + trading_data = await portfolio_service._get_trading_analytics() + services_status["trading"] = "healthy" if not trading_data.get("error") else "degraded" + if trading_data.get("error"): + overall_status = "degraded" + except Exception: + services_status["trading"] = "unhealthy" + overall_status = "degraded" + + # Check AI service + try: + ai_data = await portfolio_service._get_ai_trade_signals() + services_status["ai"] = "healthy" if not ai_data.get("error") else "degraded" + if ai_data.get("error"): + overall_status = "degraded" + except Exception: + services_status["ai"] = "unhealthy" + overall_status = "degraded" + + from datetime import datetime, timezone + + return PortfolioHealthResponse( + status=overall_status, + services=services_status, + timestamp=datetime.now(timezone.utc).isoformat() + ) + + +@router.get("/summary") +@rate_limit(rate=200, per=60) +async def get_portfolio_summary_only( + request: Request, + agent_address: Optional[str] = Query(default=None, description="Filter by agent address"), +) -> dict: + """Get only the portfolio summary metrics without full details""" + + try: + portfolio_data = await portfolio_service.get_unified_portfolio(agent_address) + return { + "timestamp": portfolio_data["timestamp"], + "agent_address": agent_address, + "summary": portfolio_data["summary"] + } + except Exception as e: + logger.error(f"Error getting portfolio summary: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to get portfolio summary: {str(e)}") diff --git a/apps/coordinator-api/src/app/main.py b/apps/coordinator-api/src/app/main.py index 1a9f1db8..c51bece1 100755 --- a/apps/coordinator-api/src/app/main.py +++ b/apps/coordinator-api/src/app/main.py @@ -71,6 +71,7 @@ from .contexts.blockchain.routers import blockchain from .contexts.agent_identity.routers import agent_identity from .contexts.cross_chain.routers.cross_chain_integration import router as cross_chain from .contexts.ipfs.routers import router as ipfs +from .contexts.portfolio.routers import portfolio_router # Skip optional routers with missing dependencies try: @@ -374,6 +375,9 @@ def create_app() -> FastAPI: # Add IPFS storage router app.include_router(ipfs, prefix="/v1/ipfs", tags=["ipfs"]) + # Add portfolio management router + app.include_router(portfolio_router) + # Add edge GPU router app.include_router(edge_gpu, prefix="/v1") diff --git a/apps/coordinator-api/src/app/services/ipfs_storage_service.py b/apps/coordinator-api/src/app/services/ipfs_storage_service.py index a0f2ff5a..19c1d311 100755 --- a/apps/coordinator-api/src/app/services/ipfs_storage_service.py +++ b/apps/coordinator-api/src/app/services/ipfs_storage_service.py @@ -66,17 +66,21 @@ class IPFSStorageService: async def initialize(self): """Initialize IPFS client and Web3 connection""" + if ipfshttpclient is None: + logger.warning("IPFS client not available - ipfshttpclient not installed") + return + try: # Initialize IPFS client ipfs_url = self.config.get("ipfs_url", "/ip4/127.0.0.1/tcp/5001") - self.ipfs_client = ipfshttpclient.connect(ipfs_url) + self.ipfs_client = ipfshttpclient.connect(ipfs_url, session=True) # Test connection version = self.ipfs_client.version() logger.info(f"Connected to IPFS node: {version['Version']}") # Initialize Web3 if blockchain features enabled - if self.config.get("blockchain_enabled", False): + if self.config.get("blockchain_enabled", False) and web3: web3_url = self.config.get("web3_url") self.web3 = Web3(Web3.HTTPProvider(web3_url)) if self.web3.is_connected(): @@ -85,8 +89,14 @@ class IPFSStorageService: logger.warning("Failed to connect to blockchain node") except Exception as e: - logger.error(f"Failed to initialize IPFS service: {e}") - raise + error_msg = str(e) + if "Unsupported daemon version" in error_msg: + logger.warning(f"IPFS daemon version not supported by ipfshttpclient: {e}") + logger.info("IPFS features will be disabled due to version incompatibility") + else: + logger.warning(f"IPFS service not available: {e}") + logger.info("IPFS features will be disabled") + self.ipfs_client = None async def upload_memory( self, @@ -99,6 +109,9 @@ class IPFSStorageService: ) -> IPFSUploadResult: """Upload agent memory data to IPFS""" + if self.ipfs_client is None: + raise ValueError("IPFS service not available") + start_time = datetime.now(timezone.utc) tags = tags or [] diff --git a/apps/coordinator-api/src/app/services/portfolio_aggregation_service.py b/apps/coordinator-api/src/app/services/portfolio_aggregation_service.py new file mode 100644 index 00000000..dd425281 --- /dev/null +++ b/apps/coordinator-api/src/app/services/portfolio_aggregation_service.py @@ -0,0 +1,246 @@ +""" +Portfolio Aggregation Service +Aggregates portfolio data from wallet, exchange, marketplace, trading, and AI services +""" + +from datetime import datetime, timezone +from typing import Any, Dict +import httpx +from aitbc import get_logger + +logger = get_logger(__name__) + + +class PortfolioAggregationService: + """Service to aggregate portfolio data from multiple AITBC services""" + + def __init__(self): + # Service base URLs (these should be configurable) + self.wallet_service_url = "http://localhost:8003" + self.exchange_service_url = "http://localhost:8011" + self.marketplace_service_url = "http://localhost:8102" + self.trading_service_url = "http://localhost:8104" + self.ai_service_url = "http://localhost:8005" + + self.http_client = httpx.AsyncClient(timeout=10.0, verify=False) + + async def get_unified_portfolio(self, agent_address: str | None = None) -> Dict[str, Any]: + """ + Get unified portfolio view by aggregating data from all services + + Args: + agent_address: Optional agent address to filter portfolio data + + Returns: + Unified portfolio data containing wallet balances, exchange rates, + marketplace stats, trading analytics, and AI signals + """ + try: + # Fetch data from all services in parallel + wallet_data = await self._get_wallet_balances(agent_address) + exchange_data = await self._get_exchange_rates() + marketplace_data = await self._get_marketplace_stats() + trading_data = await self._get_trading_analytics(agent_address) + ai_data = await self._get_ai_trade_signals() + + # Aggregate and calculate portfolio metrics + portfolio_summary = self._calculate_portfolio_summary( + wallet_data, exchange_data, marketplace_data, trading_data, ai_data + ) + + return { + "timestamp": datetime.now(timezone.utc).isoformat(), + "agent_address": agent_address, + "wallet": wallet_data, + "exchange": exchange_data, + "marketplace": marketplace_data, + "trading": trading_data, + "ai_signals": ai_data, + "summary": portfolio_summary, + } + + except Exception as e: + logger.error(f"Error aggregating portfolio data: {str(e)}") + raise + + async def _get_wallet_balances(self, agent_address: str | None = None) -> Dict[str, Any]: + """Fetch wallet balances from wallet service""" + try: + # Use the wallets endpoint instead of non-existent balances endpoint + response = await self.http_client.get(f"{self.wallet_service_url}/v1/wallets") + if response.status_code == 200: + data = response.json() + wallets = data.get("items", []) + if agent_address: + # Filter for specific agent if provided + wallets = [w for w in wallets if w.get("public_key") == agent_address or w.get("wallet_name") == agent_address] + # Calculate total balance (simplified - actual balance would need blockchain queries) + total_balance = len(wallets) # Use wallet count as proxy for balance + return { + "wallets": wallets, + "total_wallets": len(wallets), + "total_balance": total_balance + } + else: + logger.warning(f"Wallet service returned status {response.status_code}") + return {"wallets": [], "total_wallets": 0, "total_balance": 0, "error": "Wallet service unavailable"} + except Exception as e: + logger.error(f"Error fetching wallet balances: {str(e)}") + return {"wallets": [], "total_wallets": 0, "total_balance": 0, "error": str(e)} + + async def _get_exchange_rates(self) -> Dict[str, Any]: + """Fetch exchange rates from exchange service""" + try: + response = await self.http_client.get(f"{self.exchange_service_url}/v1/exchange/rates") + if response.status_code == 200: + return response.json() + else: + logger.warning(f"Exchange service returned status {response.status_code}") + return {"rates": {}, "error": "Exchange service unavailable"} + except Exception as e: + logger.error(f"Error fetching exchange rates: {str(e)}") + return {"rates": {}, "error": str(e)} + + async def _get_marketplace_stats(self) -> Dict[str, Any]: + """Fetch marketplace statistics from marketplace service""" + try: + # Use the analytics endpoint instead of non-existent stats endpoint + response = await self.http_client.get(f"{self.marketplace_service_url}/v1/marketplace/analytics?period_type=daily") + if response.status_code == 200: + data = response.json() + # Transform analytics data into stats format + return { + "offers": data.get("total_offers", 0), + "bids": data.get("total_bids", 0), + "capacity": data.get("total_capacity", 0), + "analytics": data + } + else: + logger.warning(f"Marketplace service returned status {response.status_code}") + return {"offers": 0, "bids": 0, "capacity": 0, "error": "Marketplace service unavailable"} + except Exception as e: + logger.error(f"Error fetching marketplace stats: {str(e)}") + return {"offers": 0, "bids": 0, "capacity": 0, "error": str(e)} + + async def _get_trading_analytics(self, agent_address: str | None = None) -> Dict[str, Any]: + """Fetch trading analytics from trading service""" + try: + url = f"{self.trading_service_url}/v1/trading/analytics" + if agent_address: + url += f"?agent_address={agent_address}" + response = await self.http_client.get(url) + if response.status_code == 200: + return response.json() + else: + logger.warning(f"Trading service returned status {response.status_code}") + return {"trades": [], "analytics": {}, "error": "Trading service unavailable"} + except Exception as e: + logger.error(f"Error fetching trading analytics: {str(e)}") + return {"trades": [], "analytics": {}, "error": str(e)} + + async def _get_ai_trade_signals(self) -> Dict[str, Any]: + """Fetch AI trade signals from AI service""" + try: + # Use the AI trade endpoint to get trading decisions as signals + response = await self.http_client.post( + f"{self.ai_service_url}/api/ai/trade", + json={"symbol": "AITBC/BTC", "strategy": "ai_enhanced"} + ) + if response.status_code == 200: + data = response.json() + # Transform the trade decision into a signal format + if data.get("status") == "success" and "decision" in data: + decision = data["decision"] + return { + "signals": [ + { + "symbol": decision.get("symbol"), + "signal": decision.get("signal"), + "confidence": decision.get("confidence"), + "price": decision.get("price"), + "reasoning": decision.get("reasoning"), + "timestamp": decision.get("timestamp"), + } + ] + } + else: + return {"signals": [], "error": "Invalid response format"} + else: + logger.warning(f"AI service returned status {response.status_code}") + return {"signals": [], "error": "AI service unavailable"} + except Exception as e: + logger.error(f"Error fetching AI signals: {str(e)}") + return {"signals": [], "error": str(e)} + + def _calculate_portfolio_summary( + self, + wallet_data: Dict[str, Any], + exchange_data: Dict[str, Any], + marketplace_data: Dict[str, Any], + trading_data: Dict[str, Any], + ai_data: Dict[str, Any], + ) -> Dict[str, Any]: + """Calculate portfolio summary metrics from aggregated data""" + try: + # Calculate total portfolio value in AITBC + total_aitbc_balance = 0.0 + wallets = wallet_data.get("wallets", []) + # Use wallet count as proxy for balance since wallet service doesn't provide actual balances + total_aitbc_balance = wallet_data.get("total_balance", len(wallets)) + + # Get AITBC/BTC exchange rate + rates = exchange_data.get("rates", {}) + aitbc_btc_rate = rates.get("BTC/AITBC", {}).get("rate", 0.00001) # Default fallback + btc_value = total_aitbc_balance * aitbc_btc_rate + + # Marketplace exposure + marketplace_offers = marketplace_data.get("offers", 0) + marketplace_bids = marketplace_data.get("bids", 0) + marketplace_capacity = marketplace_data.get("capacity", 0) + + # Trading activity + trading_analytics = trading_data.get("analytics", {}) + total_trades = trading_analytics.get("total_trades", 0) + completed_trades = trading_analytics.get("completed_trades", 0) + success_rate = ( + (completed_trades / total_trades * 100) if total_trades > 0 else 0 + ) + + # AI signal confidence + signals = ai_data.get("signals", []) + avg_signal_confidence = 0.0 + if signals: + avg_signal_confidence = sum(s.get("confidence", 0) for s in signals) / len(signals) + + return { + "total_aitbc_balance": total_aitbc_balance, + "btc_equivalent": btc_value, + "exchange_rate": aitbc_btc_rate, + "marketplace_exposure": { + "offers": marketplace_offers, + "bids": marketplace_bids, + "capacity": marketplace_capacity, + }, + "trading_performance": { + "total_trades": total_trades, + "completed_trades": completed_trades, + "success_rate": success_rate, + }, + "ai_signal_summary": { + "total_signals": len(signals), + "average_confidence": avg_signal_confidence, + }, + } + + except Exception as e: + logger.error(f"Error calculating portfolio summary: {str(e)}") + return { + "total_aitbc_balance": 0, + "btc_equivalent": 0, + "exchange_rate": 0, + "error": str(e), + } + + async def close(self): + """Close HTTP client""" + await self.http_client.aclose()