From 136364298c99ebde6df11df337c795684ca03a2a Mon Sep 17 00:00:00 2001 From: aitbc Date: Sat, 25 Apr 2026 07:20:45 +0200 Subject: [PATCH] Migrate blockchain-event-bridge app to centralized aitbc package utilities - Migrate polling/batch.py and polling/conditions.py from logging to aitbc.get_logger --- .../action_handlers/agent_daemon.py | 21 ++++---- .../action_handlers/coordinator_api.py | 50 ++++++++--------- .../action_handlers/marketplace.py | 36 ++++++------- .../src/blockchain_event_bridge/bridge.py | 5 +- .../event_subscribers/blocks.py | 5 +- .../event_subscribers/contracts.py | 53 ++++++++----------- .../event_subscribers/transactions.py | 5 +- .../src/blockchain_event_bridge/main.py | 5 +- .../blockchain_event_bridge/polling/batch.py | 5 +- .../polling/conditions.py | 5 +- 10 files changed, 89 insertions(+), 101 deletions(-) diff --git a/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/agent_daemon.py b/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/agent_daemon.py index 32d5d322..920d1649 100644 --- a/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/agent_daemon.py +++ b/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/agent_daemon.py @@ -1,10 +1,11 @@ """Agent daemon action handler for triggering autonomous agent responses.""" -import httpx -import logging -from typing import Any, Dict +from typing import Any, Dict, Optional +from aitbc.http_client import AsyncAITBCHTTPClient +from aitbc.aitbc_logging import get_logger +from aitbc.exceptions import NetworkError -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class AgentDaemonHandler: @@ -12,22 +13,20 @@ class AgentDaemonHandler: def __init__(self, blockchain_rpc_url: str) -> None: self.blockchain_rpc_url = blockchain_rpc_url.rstrip("/") - self._client: Optional[httpx.AsyncClient] = None + self._client: Optional[AsyncAITBCHTTPClient] = None - async def _get_client(self) -> httpx.AsyncClient: + async def _get_client(self) -> AsyncAITBCHTTPClient: """Get or create HTTP client.""" if self._client is None: - self._client = httpx.AsyncClient( + self._client = AsyncAITBCHTTPClient( base_url=self.blockchain_rpc_url, - timeout=30.0, + timeout=30 ) return self._client async def close(self) -> None: """Close HTTP client.""" - if self._client: - await self._client.aclose() - self._client = None + self._client = None async def handle_transaction(self, tx_data: Dict[str, Any]) -> None: """Handle a transaction that may require agent daemon response.""" diff --git a/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/coordinator_api.py b/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/coordinator_api.py index ef0ed303..55539cea 100644 --- a/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/coordinator_api.py +++ b/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/coordinator_api.py @@ -1,10 +1,11 @@ """Coordinator API action handler for triggering OpenClaw agent actions.""" -import httpx -import logging from typing import Any, Dict, List, Optional +from aitbc.http_client import AsyncAITBCHTTPClient +from aitbc.aitbc_logging import get_logger +from aitbc.exceptions import NetworkError -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class CoordinatorAPIHandler: @@ -13,27 +14,25 @@ class CoordinatorAPIHandler: def __init__(self, base_url: str, api_key: Optional[str] = None) -> None: self.base_url = base_url.rstrip("/") self.api_key = api_key - self._client: Optional[httpx.AsyncClient] = None + headers = {} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + self._client: Optional[AsyncAITBCHTTPClient] = None + self._headers = headers - async def _get_client(self) -> httpx.AsyncClient: + async def _get_client(self) -> AsyncAITBCHTTPClient: """Get or create HTTP client.""" if self._client is None: - headers = {} - if self.api_key: - headers["Authorization"] = f"Bearer {self.api_key}" - - self._client = httpx.AsyncClient( + self._client = AsyncAITBCHTTPClient( base_url=self.base_url, - headers=headers, - timeout=30.0, + headers=self._headers, + timeout=30 ) return self._client async def close(self) -> None: """Close HTTP client.""" - if self._client: - await self._client.aclose() - self._client = None + self._client = None async def handle_block(self, block_data: Dict[str, Any], transactions: List[Dict[str, Any]]) -> None: """Handle a new block by triggering coordinator API actions.""" @@ -69,12 +68,11 @@ class CoordinatorAPIHandler: if job_id: # Notify coordinator about new AI job - response = await client.post(f"/v1/ai-jobs/{job_id}/notify", json=tx_data) - response.raise_for_status() + await client.async_post(f"/v1/ai-jobs/{job_id}/notify", json=tx_data) logger.info(f"Successfully notified coordinator about AI job {job_id}") - except httpx.HTTPError as e: - logger.error(f"HTTP error triggering AI job processing: {e}") + except NetworkError as e: + logger.error(f"Network error triggering AI job processing: {e}") except Exception as e: logger.error(f"Error triggering AI job processing: {e}", exc_info=True) @@ -89,15 +87,14 @@ class CoordinatorAPIHandler: if recipient: # Notify coordinator about agent message - response = await client.post( + await client.async_post( f"/v1/agents/{recipient}/message", json={"transaction": tx_data, "payload": payload} ) - response.raise_for_status() logger.info(f"Successfully notified coordinator about message to {recipient}") - except httpx.HTTPError as e: - logger.error(f"HTTP error triggering agent message processing: {e}") + except NetworkError as e: + logger.error(f"Network error triggering agent message processing: {e}") except Exception as e: logger.error(f"Error triggering agent message processing: {e}", exc_info=True) @@ -112,14 +109,13 @@ class CoordinatorAPIHandler: if listing_id: # Update marketplace state - response = await client.post( + await client.async_post( f"/v1/marketplace/{listing_id}/sync", json={"transaction": tx_data} ) - response.raise_for_status() logger.info(f"Successfully updated marketplace listing {listing_id}") - except httpx.HTTPError as e: - logger.error(f"HTTP error triggering marketplace update: {e}") + except NetworkError as e: + logger.error(f"Network error triggering marketplace update: {e}") except Exception as e: logger.error(f"Error triggering marketplace update: {e}", exc_info=True) diff --git a/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/marketplace.py b/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/marketplace.py index 858d4020..e4ebd549 100644 --- a/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/marketplace.py +++ b/apps/blockchain-event-bridge/src/blockchain_event_bridge/action_handlers/marketplace.py @@ -1,10 +1,11 @@ """Marketplace action handler for triggering marketplace state updates.""" -import httpx -import logging from typing import Any, Dict, List +from aitbc.http_client import AsyncAITBCHTTPClient +from aitbc.aitbc_logging import get_logger +from aitbc.exceptions import NetworkError -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class MarketplaceHandler: @@ -13,27 +14,25 @@ class MarketplaceHandler: def __init__(self, coordinator_api_url: str, api_key: str | None = None) -> None: self.base_url = coordinator_api_url.rstrip("/") self.api_key = api_key - self._client: httpx.AsyncClient | None = None + headers = {} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + self._client: AsyncAITBCHTTPClient | None = None + self._headers = headers - async def _get_client(self) -> httpx.AsyncClient: + async def _get_client(self) -> AsyncAITBCHTTPClient: """Get or create HTTP client.""" if self._client is None: - headers = {} - if self.api_key: - headers["Authorization"] = f"Bearer {self.api_key}" - - self._client = httpx.AsyncClient( + self._client = AsyncAITBCHTTPClient( base_url=self.base_url, - headers=headers, - timeout=30.0, + headers=self._headers, + timeout=30 ) return self._client async def close(self) -> None: """Close HTTP client.""" - if self._client: - await self._client.aclose() - self._client = None + self._client = None async def handle_block(self, block_data: Dict[str, Any], transactions: List[Dict[str, Any]]) -> None: """Handle a new block by updating marketplace state.""" @@ -69,16 +68,15 @@ class MarketplaceHandler: client = await self._get_client() # Send batch of marketplace transactions for processing - response = await client.post( + result = await client.async_post( "/v1/marketplace/sync", json={"transactions": transactions} ) - response.raise_for_status() logger.info(f"Successfully synced {len(transactions)} marketplace transactions") - except httpx.HTTPError as e: - logger.error(f"HTTP error syncing marketplace state: {e}") + except NetworkError as e: + logger.error(f"Network error syncing marketplace state: {e}") except Exception as e: logger.error(f"Error syncing marketplace state: {e}", exc_info=True) diff --git a/apps/blockchain-event-bridge/src/blockchain_event_bridge/bridge.py b/apps/blockchain-event-bridge/src/blockchain_event_bridge/bridge.py index 92f0fc20..6d643f46 100644 --- a/apps/blockchain-event-bridge/src/blockchain_event_bridge/bridge.py +++ b/apps/blockchain-event-bridge/src/blockchain_event_bridge/bridge.py @@ -1,9 +1,10 @@ """Core bridge logic for blockchain event to OpenClaw agent trigger mapping.""" import asyncio -import logging from typing import Any, Dict, Optional +from aitbc.aitbc_logging import get_logger + from .config import Settings from .event_subscribers.blocks import BlockEventSubscriber from .event_subscribers.transactions import TransactionEventSubscriber @@ -20,7 +21,7 @@ from .metrics import ( action_execution_duration_seconds, ) -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class BlockchainEventBridge: diff --git a/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/blocks.py b/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/blocks.py index edb24e3f..a2e83cc3 100644 --- a/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/blocks.py +++ b/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/blocks.py @@ -1,9 +1,10 @@ """Block event subscriber for gossip broker.""" import asyncio -import logging from typing import TYPE_CHECKING, Any, Dict +from aitbc.aitbc_logging import get_logger + from ..config import Settings from ..metrics import event_queue_size, gossip_subscribers_total @@ -11,7 +12,7 @@ if TYPE_CHECKING: from ..bridge import BlockchainEventBridge -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class BlockEventSubscriber: diff --git a/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/contracts.py b/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/contracts.py index 3cee38e3..9ddf987d 100644 --- a/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/contracts.py +++ b/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/contracts.py @@ -1,10 +1,11 @@ """Contract event subscriber for smart contract event monitoring.""" import asyncio -import logging from typing import TYPE_CHECKING, Any, Dict, Optional -import httpx +from aitbc.http_client import AsyncAITBCHTTPClient +from aitbc.aitbc_logging import get_logger +from aitbc.exceptions import NetworkError from ..config import Settings from ..metrics import event_queue_size @@ -13,7 +14,7 @@ if TYPE_CHECKING: from ..bridge import BlockchainEventBridge -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class ContractEventSubscriber: @@ -23,7 +24,7 @@ class ContractEventSubscriber: self.settings = settings self._running = False self._bridge: "BlockchainEventBridge | None" = None - self._client: Optional[httpx.AsyncClient] = None + self._client: Optional[AsyncAITBCHTTPClient] = None # Contract addresses from configuration self.contract_addresses: Dict[str, str] = { @@ -67,12 +68,12 @@ class ContractEventSubscriber: """Set the bridge instance for event handling.""" self._bridge = bridge - async def _get_client(self) -> httpx.AsyncClient: + async def _get_client(self) -> AsyncAITBCHTTPClient: """Get or create HTTP client.""" if self._client is None: - self._client = httpx.AsyncClient( + self._client = AsyncAITBCHTTPClient( base_url=self.settings.blockchain_rpc_url, - timeout=30.0, + timeout=30 ) return self._client @@ -107,14 +108,14 @@ class ContractEventSubscriber: """Initialize block tracking from current chain height.""" try: client = await self._get_client() - response = await client.get("/head") - if response.status_code == 200: - head_data = response.json() - current_height = head_data.get("height", 0) - for contract in self.contract_addresses: - if self.contract_addresses[contract]: - self.last_processed_blocks[contract] = current_height - logger.info(f"Initialized block tracking at height {current_height}") + head_data = await client.async_get("/head") + current_height = head_data.get("height", 0) + for contract in self.contract_addresses: + if self.contract_addresses[contract]: + self.last_processed_blocks[contract] = current_height + logger.info(f"Initialized block tracking at height {current_height}") + except NetworkError as e: + logger.error(f"Network error initializing block tracking: {e}") except Exception as e: logger.error(f"Error initializing block tracking: {e}") @@ -128,17 +129,12 @@ class ContractEventSubscriber: try: # Get current chain height - response = await client.get("/head") - if response.status_code != 200: - logger.error(f"Failed to get chain head: {response.status_code}") - continue - - head_data = response.json() + head_data = await client.async_get("/head") current_height = head_data.get("height", 0) last_height = self.last_processed_blocks.get(contract_name, current_height - 100) # Query events for this contract - logs_response = await client.post( + logs_data = await client.async_post( "/eth_getLogs", json={ "address": contract_address, @@ -148,11 +144,6 @@ class ContractEventSubscriber: } ) - if logs_response.status_code != 200: - logger.error(f"Failed to get logs for {contract_name}: {logs_response.status_code}") - continue - - logs_data = logs_response.json() logs = logs_data.get("logs", []) if logs: @@ -165,6 +156,8 @@ class ContractEventSubscriber: # Update last processed block self.last_processed_blocks[contract_name] = current_height + except NetworkError as e: + logger.error(f"Network error polling events for {contract_name}: {e}") except Exception as e: logger.error(f"Error polling events for {contract_name}: {e}", exc_info=True) @@ -215,9 +208,5 @@ class ContractEventSubscriber: async def stop(self) -> None: """Stop the contract event subscriber.""" self._running = False - - if self._client: - await self._client.aclose() - self._client = None - + self._client = None logger.info("Contract event subscriber stopped") diff --git a/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/transactions.py b/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/transactions.py index a8a1def6..447f47d5 100644 --- a/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/transactions.py +++ b/apps/blockchain-event-bridge/src/blockchain_event_bridge/event_subscribers/transactions.py @@ -1,9 +1,10 @@ """Transaction event subscriber for gossip broker.""" import asyncio -import logging from typing import TYPE_CHECKING, Any, Dict +from aitbc.aitbc_logging import get_logger + from ..config import Settings from ..metrics import event_queue_size, gossip_subscribers_total @@ -11,7 +12,7 @@ if TYPE_CHECKING: from ..bridge import BlockchainEventBridge -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class TransactionEventSubscriber: diff --git a/apps/blockchain-event-bridge/src/blockchain_event_bridge/main.py b/apps/blockchain-event-bridge/src/blockchain_event_bridge/main.py index d6422b61..4f3201ca 100644 --- a/apps/blockchain-event-bridge/src/blockchain_event_bridge/main.py +++ b/apps/blockchain-event-bridge/src/blockchain_event_bridge/main.py @@ -1,9 +1,10 @@ """Main FastAPI application for blockchain event bridge.""" import asyncio -import logging from contextlib import asynccontextmanager +from aitbc.aitbc_logging import get_logger + from fastapi import FastAPI from prometheus_client import make_asgi_app @@ -16,7 +17,7 @@ from .metrics import ( actions_failed_total, ) -logger = logging.getLogger(__name__) +logger = get_logger(__name__) bridge_instance: BlockchainEventBridge | None = None diff --git a/apps/blockchain-event-bridge/src/blockchain_event_bridge/polling/batch.py b/apps/blockchain-event-bridge/src/blockchain_event_bridge/polling/batch.py index ebad01d9..ffb2e290 100644 --- a/apps/blockchain-event-bridge/src/blockchain_event_bridge/polling/batch.py +++ b/apps/blockchain-event-bridge/src/blockchain_event_bridge/polling/batch.py @@ -1,10 +1,11 @@ """Batch processing for aggregated operations.""" import asyncio -import logging from typing import Any, Dict, List -logger = logging.getLogger(__name__) +from aitbc import get_logger + +logger = get_logger(__name__) class BatchProcessor: diff --git a/apps/blockchain-event-bridge/src/blockchain_event_bridge/polling/conditions.py b/apps/blockchain-event-bridge/src/blockchain_event_bridge/polling/conditions.py index c7f4c640..5e6880a2 100644 --- a/apps/blockchain-event-bridge/src/blockchain_event_bridge/polling/conditions.py +++ b/apps/blockchain-event-bridge/src/blockchain_event_bridge/polling/conditions.py @@ -1,10 +1,11 @@ """Condition-based polling for batch operations.""" import asyncio -import logging from typing import Any, Dict -logger = logging.getLogger(__name__) +from aitbc import get_logger + +logger = get_logger(__name__) class ConditionPoller: