Add blockchain event bridge service with smart contract event integration
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Failing after 2s
Integration Tests / test-service-integration (push) Failing after 15s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 2s
P2P Network Verification / p2p-verification (push) Successful in 2s
Python Tests / test-python (push) Successful in 12s
Security Scanning / security-scan (push) Successful in 41s
Systemd Sync / sync-systemd (push) Successful in 7s

- Phase 1: Core bridge service with gossip broker subscription
- Phase 2: Smart contract event integration via eth_getLogs RPC endpoint
- Add contract event subscriber for AgentStaking, PerformanceVerifier, Marketplace, Bounty, CrossChainBridge
- Add contract event handlers in agent_daemon.py and marketplace.py
- Add systemd service file for blockchain-event-bridge
- Update blockchain node router.py with eth_getLogs endpoint
- Add configuration for contract addresses
- Add tests for contract subscriber and handlers (27 tests passing)
This commit is contained in:
aitbc
2026-04-23 10:58:00 +02:00
parent ab45a81bd7
commit 90edea2da2
29 changed files with 3704 additions and 0 deletions

View File

@@ -0,0 +1,37 @@
# Blockchain Event Bridge Configuration
# Service Configuration
BIND_HOST=127.0.0.1
BIND_PORT=8204
# Blockchain RPC
BLOCKCHAIN_RPC_URL=http://localhost:8006
# Gossip Broker
GOSSIP_BACKEND=memory
# GOSSIP_BROADCAST_URL=redis://localhost:6379
# Coordinator API
COORDINATOR_API_URL=http://localhost:8011
# COORDINATOR_API_KEY=your_api_key_here
# Event Subscription Filters
SUBSCRIBE_BLOCKS=true
SUBSCRIBE_TRANSACTIONS=true
SUBSCRIBE_CONTRACTS=false
# Smart Contract Addresses (Phase 2)
# AGENT_STAKING_ADDRESS=0x...
# PERFORMANCE_VERIFIER_ADDRESS=0x...
# MARKETPLACE_ADDRESS=0x...
# BOUNTY_ADDRESS=0x...
# BRIDGE_ADDRESS=0x...
# Action Handler Enable/Disable Flags
ENABLE_AGENT_DAEMON_TRIGGER=true
ENABLE_COORDINATOR_API_TRIGGER=true
ENABLE_MARKETPLACE_TRIGGER=true
# Polling Configuration
ENABLE_POLLING=false
POLLING_INTERVAL_SECONDS=60

View File

@@ -0,0 +1,112 @@
# Blockchain Event Bridge
Bridge between AITBC blockchain events and OpenClaw agent triggers using a hybrid event-driven and polling approach.
## Overview
This service connects AITBC blockchain events (blocks, transactions, smart contract events) to OpenClaw agent actions through:
- **Event-driven**: Subscribe to gossip broker topics for real-time critical triggers
- **Polling**: Periodic checks for batch operations and conditions
- **Smart Contract Events**: Monitor contract events via blockchain RPC (Phase 2)
## Features
- Subscribes to blockchain block events via gossip broker
- Subscribes to transaction events (when available)
- Monitors smart contract events via blockchain RPC:
- AgentStaking (stake creation, rewards, tier updates)
- PerformanceVerifier (performance verification, penalties, rewards)
- AgentServiceMarketplace (service listings, purchases)
- BountyIntegration (bounty creation, completion)
- CrossChainBridge (bridge initiation, completion)
- Triggers coordinator API actions based on blockchain events
- Triggers agent daemon actions for agent wallet transactions
- Triggers marketplace state updates
- Configurable action handlers (enable/disable per type)
- Prometheus metrics for monitoring
- Health check endpoint
## Installation
```bash
cd apps/blockchain-event-bridge
poetry install
```
## Configuration
Environment variables:
- `BLOCKCHAIN_RPC_URL` - Blockchain RPC endpoint (default: `http://localhost:8006`)
- `GOSSIP_BACKEND` - Gossip broker backend: `memory`, `broadcast`, or `redis` (default: `memory`)
- `GOSSIP_BROADCAST_URL` - Broadcast URL for Redis backend (optional)
- `COORDINATOR_API_URL` - Coordinator API endpoint (default: `http://localhost:8011`)
- `COORDINATOR_API_KEY` - Coordinator API key (optional)
- `SUBSCRIBE_BLOCKS` - Subscribe to block events (default: `true`)
- `SUBSCRIBE_TRANSACTIONS` - Subscribe to transaction events (default: `true`)
- `ENABLE_AGENT_DAEMON_TRIGGER` - Enable agent daemon triggers (default: `true`)
- `ENABLE_COORDINATOR_API_TRIGGER` - Enable coordinator API triggers (default: `true`)
- `ENABLE_MARKETPLACE_TRIGGER` - Enable marketplace triggers (default: `true`)
- `ENABLE_POLLING` - Enable polling layer (default: `false`)
- `POLLING_INTERVAL_SECONDS` - Polling interval in seconds (default: `60`)
## Running
### Development
```bash
poetry run uvicorn blockchain_event_bridge.main:app --reload --host 127.0.0.1 --port 8204
```
### Production (Systemd)
```bash
sudo systemctl start aitbc-blockchain-event-bridge
sudo systemctl enable aitbc-blockchain-event-bridge
```
## API Endpoints
- `GET /` - Service information
- `GET /health` - Health check
- `GET /metrics` - Prometheus metrics
## Architecture
```
blockchain-event-bridge/
├── src/blockchain_event_bridge/
│ ├── main.py # FastAPI app
│ ├── config.py # Settings
│ ├── bridge.py # Core bridge logic
│ ├── metrics.py # Prometheus metrics
│ ├── event_subscribers/ # Event subscription modules
│ ├── action_handlers/ # Action handler modules
│ └── polling/ # Polling modules
└── tests/
```
## Event Flow
1. Blockchain publishes block event to gossip broker (topic: "blocks")
2. Block event subscriber receives event
3. Bridge parses block data and extracts transactions
4. Bridge triggers appropriate action handlers:
- Coordinator API handler for AI jobs, agent messages
- Agent daemon handler for agent wallet transactions
- Marketplace handler for marketplace listings
5. Action handlers make HTTP calls to respective services
6. Metrics are recorded for monitoring
## Testing
```bash
poetry run pytest
```
## Future Enhancements
- Phase 2: Smart contract event subscription
- Phase 3: Enhanced polling layer for batch operations
- WebSocket support for real-time event streaming
- Event replay for missed events

1398
apps/blockchain-event-bridge/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,28 @@
[tool.poetry]
name = "blockchain-event-bridge"
version = "0.1.0"
description = "Bridge between AITBC blockchain events and OpenClaw agent triggers"
authors = ["AITBC Team"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.13"
fastapi = "^0.115.0"
uvicorn = {extras = ["standard"], version = "^0.32.0"}
httpx = "^0.27.0"
pydantic = "^2.9.0"
pydantic-settings = "^2.6.0"
prometheus-client = "^0.21.0"
aiosqlite = "^0.20.0"
[tool.poetry.group.dev.dependencies]
pytest = "^8.3.0"
pytest-asyncio = "^0.24.0"
pytest-cov = "^6.0.0"
black = "^24.10.0"
ruff = "^0.8.0"
mypy = "^1.13.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -0,0 +1,29 @@
[pytest]
# pytest configuration for blockchain-event-bridge
# Test discovery
python_files = test_*.py
python_classes = Test*
python_functions = test_*
# Custom markers
markers =
unit: Unit tests (fast, isolated)
integration: Integration tests (may require external services)
slow: Slow running tests
# Additional options
addopts =
--verbose
--tb=short
# Python path for imports
pythonpath =
.
src
# Warnings
filterwarnings =
ignore::UserWarning
ignore::DeprecationWarning
ignore::PendingDeprecationWarning

View File

@@ -0,0 +1,3 @@
"""Blockchain Event Bridge - Connects AITBC blockchain events to OpenClaw agent triggers."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,7 @@
"""Action handler modules for OpenClaw agent triggers."""
from .coordinator_api import CoordinatorAPIHandler
from .agent_daemon import AgentDaemonHandler
from .marketplace import MarketplaceHandler
__all__ = ["CoordinatorAPIHandler", "AgentDaemonHandler", "MarketplaceHandler"]

View File

@@ -0,0 +1,246 @@
"""Agent daemon action handler for triggering autonomous agent responses."""
import httpx
import logging
from typing import Any, Dict
logger = logging.getLogger(__name__)
class AgentDaemonHandler:
"""Handles actions that trigger the agent daemon to process transactions."""
def __init__(self, blockchain_rpc_url: str) -> None:
self.blockchain_rpc_url = blockchain_rpc_url.rstrip("/")
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client."""
if self._client is None:
self._client = httpx.AsyncClient(
base_url=self.blockchain_rpc_url,
timeout=30.0,
)
return self._client
async def close(self) -> None:
"""Close HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def handle_transaction(self, tx_data: Dict[str, Any]) -> None:
"""Handle a transaction that may require agent daemon response."""
tx_hash = tx_data.get("hash", "unknown")
tx_type = tx_data.get("type", "unknown")
recipient = tx_data.get("to")
logger.info(f"Checking transaction {tx_hash} for agent daemon trigger")
# Check if this is a message to an agent wallet
if self._is_agent_transaction(tx_data):
await self._notify_agent_daemon(tx_data)
def _is_agent_transaction(self, tx_data: Dict[str, Any]) -> bool:
"""Check if transaction is addressed to an agent wallet."""
# In a real implementation, this would check against a registry of agent addresses
# For now, we'll check if the transaction has a payload that looks like an agent message
payload = tx_data.get("payload", {})
# Check for agent message indicators
if isinstance(payload, dict):
# Check for trigger message or agent-specific fields
if "trigger" in payload or "agent" in payload or "command" in payload:
return True
return False
async def _notify_agent_daemon(self, tx_data: Dict[str, Any]) -> None:
"""Notify agent daemon about a transaction requiring processing."""
try:
# The agent daemon currently polls the blockchain database directly
# This handler could be enhanced to send a direct notification
# For now, we'll log that the agent daemon should pick this up on its next poll
tx_hash = tx_data.get("hash", "unknown")
recipient = tx_data.get("to")
logger.info(f"Agent daemon should process transaction {tx_hash} to {recipient}")
# Future enhancement: send direct notification via agent-coordinator API
# client = await self._get_client()
# response = await client.post(f"/v1/agents/{recipient}/notify", json=tx_data)
except Exception as e:
logger.error(f"Error notifying agent daemon: {e}", exc_info=True)
# Phase 2: Contract event handlers
async def handle_staking_event(self, event_log: Dict[str, Any]) -> None:
"""Handle AgentStaking contract event."""
event_type = event_log.get("topics", [""])[0] if event_log.get("topics") else "Unknown"
logger.info(f"Handling staking event: {event_type}")
# Route based on event type
if "StakeCreated" in event_type:
await self._handle_stake_created(event_log)
elif "RewardsDistributed" in event_type:
await self._handle_rewards_distributed(event_log)
elif "AgentTierUpdated" in event_type:
await self._handle_agent_tier_updated(event_log)
async def _handle_stake_created(self, event_log: Dict[str, Any]) -> None:
"""Handle StakeCreated event."""
try:
# Extract event data
data = event_log.get("data", "{}")
logger.info(f"StakeCreated event: {data}")
# Call coordinator API to update agent reputation
# This would call the reputation service to update agent tier based on stake
logger.info("Would call coordinator API reputation service to update agent stake")
except Exception as e:
logger.error(f"Error handling StakeCreated event: {e}", exc_info=True)
async def _handle_rewards_distributed(self, event_log: Dict[str, Any]) -> None:
"""Handle RewardsDistributed event."""
try:
data = event_log.get("data", "{}")
logger.info(f"RewardsDistributed event: {data}")
# Call coordinator API to update agent rewards
logger.info("Would call coordinator API to update agent rewards")
except Exception as e:
logger.error(f"Error handling RewardsDistributed event: {e}", exc_info=True)
async def _handle_agent_tier_updated(self, event_log: Dict[str, Any]) -> None:
"""Handle AgentTierUpdated event."""
try:
data = event_log.get("data", "{}")
logger.info(f"AgentTierUpdated event: {data}")
# Call coordinator API to update agent tier
logger.info("Would call coordinator API reputation service to update agent tier")
except Exception as e:
logger.error(f"Error handling AgentTierUpdated event: {e}", exc_info=True)
async def handle_performance_event(self, event_log: Dict[str, Any]) -> None:
"""Handle PerformanceVerifier contract event."""
event_type = event_log.get("topics", [""])[0] if event_log.get("topics") else "Unknown"
logger.info(f"Handling performance event: {event_type}")
# Route based on event type
if "PerformanceVerified" in event_type:
await self._handle_performance_verified(event_log)
elif "PenaltyApplied" in event_type:
await self._handle_penalty_applied(event_log)
elif "RewardIssued" in event_type:
await self._handle_reward_issued(event_log)
async def _handle_performance_verified(self, event_log: Dict[str, Any]) -> None:
"""Handle PerformanceVerified event."""
try:
data = event_log.get("data", "{}")
logger.info(f"PerformanceVerified event: {data}")
# Call coordinator API to update performance metrics
logger.info("Would call coordinator API performance service to update metrics")
except Exception as e:
logger.error(f"Error handling PerformanceVerified event: {e}", exc_info=True)
async def _handle_penalty_applied(self, event_log: Dict[str, Any]) -> None:
"""Handle PenaltyApplied event."""
try:
data = event_log.get("data", "{}")
logger.info(f"PenaltyApplied event: {data}")
# Call coordinator API to update agent penalties
logger.info("Would call coordinator API performance service to apply penalty")
except Exception as e:
logger.error(f"Error handling PenaltyApplied event: {e}", exc_info=True)
async def _handle_reward_issued(self, event_log: Dict[str, Any]) -> None:
"""Handle RewardIssued event."""
try:
data = event_log.get("data", "{}")
logger.info(f"RewardIssued event: {data}")
# Call coordinator API to update agent rewards
logger.info("Would call coordinator API performance service to issue reward")
except Exception as e:
logger.error(f"Error handling RewardIssued event: {e}", exc_info=True)
async def handle_bounty_event(self, event_log: Dict[str, Any]) -> None:
"""Handle BountyIntegration contract event."""
event_type = event_log.get("topics", [""])[0] if event_log.get("topics") else "Unknown"
logger.info(f"Handling bounty event: {event_type}")
# Route based on event type
if "BountyCreated" in event_type:
await self._handle_bounty_created(event_log)
elif "BountyCompleted" in event_type:
await self._handle_bounty_completed(event_log)
async def _handle_bounty_created(self, event_log: Dict[str, Any]) -> None:
"""Handle BountyCreated event."""
try:
data = event_log.get("data", "{}")
logger.info(f"BountyCreated event: {data}")
# Call coordinator API to sync new bounty
logger.info("Would call coordinator API bounty service to sync bounty")
except Exception as e:
logger.error(f"Error handling BountyCreated event: {e}", exc_info=True)
async def _handle_bounty_completed(self, event_log: Dict[str, Any]) -> None:
"""Handle BountyCompleted event."""
try:
data = event_log.get("data", "{}")
logger.info(f"BountyCompleted event: {data}")
# Call coordinator API to complete bounty
logger.info("Would call coordinator API bounty service to complete bounty")
except Exception as e:
logger.error(f"Error handling BountyCompleted event: {e}", exc_info=True)
async def handle_bridge_event(self, event_log: Dict[str, Any]) -> None:
"""Handle CrossChainBridge contract event."""
event_type = event_log.get("topics", [""])[0] if event_log.get("topics") else "Unknown"
logger.info(f"Handling bridge event: {event_type}")
# Route based on event type
if "BridgeInitiated" in event_type:
await self._handle_bridge_initiated(event_log)
elif "BridgeCompleted" in event_type:
await self._handle_bridge_completed(event_log)
async def _handle_bridge_initiated(self, event_log: Dict[str, Any]) -> None:
"""Handle BridgeInitiated event."""
try:
data = event_log.get("data", "{}")
logger.info(f"BridgeInitiated event: {data}")
# Call coordinator API to track bridge
logger.info("Would call coordinator API cross-chain service to track bridge")
except Exception as e:
logger.error(f"Error handling BridgeInitiated event: {e}", exc_info=True)
async def _handle_bridge_completed(self, event_log: Dict[str, Any]) -> None:
"""Handle BridgeCompleted event."""
try:
data = event_log.get("data", "{}")
logger.info(f"BridgeCompleted event: {data}")
# Call coordinator API to complete bridge
logger.info("Would call coordinator API cross-chain service to complete bridge")
except Exception as e:
logger.error(f"Error handling BridgeCompleted event: {e}", exc_info=True)

View File

@@ -0,0 +1,125 @@
"""Coordinator API action handler for triggering OpenClaw agent actions."""
import httpx
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class CoordinatorAPIHandler:
"""Handles actions that trigger coordinator API endpoints."""
def __init__(self, base_url: str, api_key: Optional[str] = None) -> None:
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client."""
if self._client is None:
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=headers,
timeout=30.0,
)
return self._client
async def close(self) -> None:
"""Close HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def handle_block(self, block_data: Dict[str, Any], transactions: List[Dict[str, Any]]) -> None:
"""Handle a new block by triggering coordinator API actions."""
logger.info(f"Handling block {block_data.get('height')} with {len(transactions)} transactions")
# Filter relevant transactions (AI jobs, agent messages, etc.)
for tx in transactions:
await self.handle_transaction(tx)
async def handle_transaction(self, tx_data: Dict[str, Any]) -> None:
"""Handle a single transaction."""
tx_type = tx_data.get("type", "unknown")
tx_hash = tx_data.get("hash", "unknown")
logger.info(f"Handling transaction {tx_hash} of type {tx_type}")
# Route based on transaction type
if tx_type == "ai_job":
await self._trigger_ai_job_processing(tx_data)
elif tx_type == "agent_message":
await self._trigger_agent_message_processing(tx_data)
elif tx_type == "marketplace":
await self._trigger_marketplace_update(tx_data)
async def _trigger_ai_job_processing(self, tx_data: Dict[str, Any]) -> None:
"""Trigger AI job processing via coordinator API."""
try:
client = await self._get_client()
# Extract job details from transaction payload
payload = tx_data.get("payload", {})
job_id = payload.get("job_id")
if job_id:
# Notify coordinator about new AI job
response = await client.post(f"/v1/ai-jobs/{job_id}/notify", json=tx_data)
response.raise_for_status()
logger.info(f"Successfully notified coordinator about AI job {job_id}")
except httpx.HTTPError as e:
logger.error(f"HTTP error triggering AI job processing: {e}")
except Exception as e:
logger.error(f"Error triggering AI job processing: {e}", exc_info=True)
async def _trigger_agent_message_processing(self, tx_data: Dict[str, Any]) -> None:
"""Trigger agent message processing via coordinator API."""
try:
client = await self._get_client()
# Extract message details
payload = tx_data.get("payload", {})
recipient = tx_data.get("to")
if recipient:
# Notify coordinator about agent message
response = await client.post(
f"/v1/agents/{recipient}/message",
json={"transaction": tx_data, "payload": payload}
)
response.raise_for_status()
logger.info(f"Successfully notified coordinator about message to {recipient}")
except httpx.HTTPError as e:
logger.error(f"HTTP error triggering agent message processing: {e}")
except Exception as e:
logger.error(f"Error triggering agent message processing: {e}", exc_info=True)
async def _trigger_marketplace_update(self, tx_data: Dict[str, Any]) -> None:
"""Trigger marketplace state update via coordinator API."""
try:
client = await self._get_client()
# Extract marketplace details
payload = tx_data.get("payload", {})
listing_id = payload.get("listing_id")
if listing_id:
# Update marketplace state
response = await client.post(
f"/v1/marketplace/{listing_id}/sync",
json={"transaction": tx_data}
)
response.raise_for_status()
logger.info(f"Successfully updated marketplace listing {listing_id}")
except httpx.HTTPError as e:
logger.error(f"HTTP error triggering marketplace update: {e}")
except Exception as e:
logger.error(f"Error triggering marketplace update: {e}", exc_info=True)

View File

@@ -0,0 +1,119 @@
"""Marketplace action handler for triggering marketplace state updates."""
import httpx
import logging
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
class MarketplaceHandler:
"""Handles actions that trigger marketplace state updates."""
def __init__(self, coordinator_api_url: str, api_key: str | None = None) -> None:
self.base_url = coordinator_api_url.rstrip("/")
self.api_key = api_key
self._client: httpx.AsyncClient | None = None
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client."""
if self._client is None:
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=headers,
timeout=30.0,
)
return self._client
async def close(self) -> None:
"""Close HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def handle_block(self, block_data: Dict[str, Any], transactions: List[Dict[str, Any]]) -> None:
"""Handle a new block by updating marketplace state."""
logger.info(f"Processing block {block_data.get('height')} for marketplace updates")
# Filter marketplace-related transactions
marketplace_txs = self._filter_marketplace_transactions(transactions)
if marketplace_txs:
await self._sync_marketplace_state(marketplace_txs)
def _filter_marketplace_transactions(self, transactions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Filter transactions that affect marketplace state."""
marketplace_txs = []
for tx in transactions:
tx_type = tx.get("type", "unknown")
payload = tx.get("payload", {})
# Check for marketplace-related transaction types
if tx_type in ["marketplace", "listing", "purchase", "service"]:
marketplace_txs.append(tx)
elif isinstance(payload, dict):
# Check for marketplace-related payload fields
if any(key in payload for key in ["listing_id", "service_id", "marketplace"]):
marketplace_txs.append(tx)
return marketplace_txs
async def _sync_marketplace_state(self, transactions: List[Dict[str, Any]]) -> None:
"""Synchronize marketplace state with blockchain."""
try:
client = await self._get_client()
# Send batch of marketplace transactions for processing
response = await client.post(
"/v1/marketplace/sync",
json={"transactions": transactions}
)
response.raise_for_status()
logger.info(f"Successfully synced {len(transactions)} marketplace transactions")
except httpx.HTTPError as e:
logger.error(f"HTTP error syncing marketplace state: {e}")
except Exception as e:
logger.error(f"Error syncing marketplace state: {e}", exc_info=True)
# Phase 2: Contract event handlers
async def handle_contract_event(self, event_log: Dict[str, Any]) -> None:
"""Handle AgentServiceMarketplace contract event."""
event_type = event_log.get("topics", [""])[0] if event_log.get("topics") else "Unknown"
logger.info(f"Handling marketplace contract event: {event_type}")
# Route based on event type
if "ServiceListed" in event_type:
await self._handle_service_listed(event_log)
elif "ServicePurchased" in event_type:
await self._handle_service_purchased(event_log)
async def _handle_service_listed(self, event_log: Dict[str, Any]) -> None:
"""Handle ServiceListed event."""
try:
data = event_log.get("data", "{}")
logger.info(f"ServiceListed event: {data}")
# Call coordinator API to sync marketplace listing
logger.info("Would call coordinator API marketplace service to sync listing")
except Exception as e:
logger.error(f"Error handling ServiceListed event: {e}", exc_info=True)
async def _handle_service_purchased(self, event_log: Dict[str, Any]) -> None:
"""Handle ServicePurchased event."""
try:
data = event_log.get("data", "{}")
logger.info(f"ServicePurchased event: {data}")
# Call coordinator API to sync marketplace purchase
logger.info("Would call coordinator API marketplace service to sync purchase")
except Exception as e:
logger.error(f"Error handling ServicePurchased event: {e}", exc_info=True)

View File

@@ -0,0 +1,287 @@
"""Core bridge logic for blockchain event to OpenClaw agent trigger mapping."""
import asyncio
import logging
from typing import Any, Dict, Optional
from .config import Settings
from .event_subscribers.blocks import BlockEventSubscriber
from .event_subscribers.transactions import TransactionEventSubscriber
from .event_subscribers.contracts import ContractEventSubscriber
from .action_handlers.coordinator_api import CoordinatorAPIHandler
from .action_handlers.agent_daemon import AgentDaemonHandler
from .action_handlers.marketplace import MarketplaceHandler
from .metrics import (
events_received_total,
events_processed_total,
actions_triggered_total,
actions_failed_total,
event_processing_duration_seconds,
action_execution_duration_seconds,
)
logger = logging.getLogger(__name__)
class BlockchainEventBridge:
"""Main bridge service connecting blockchain events to OpenClaw agent actions."""
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._running = False
self._tasks: set[asyncio.Task] = set()
# Event subscribers
self.block_subscriber: Optional[BlockEventSubscriber] = None
self.transaction_subscriber: Optional[TransactionEventSubscriber] = None
self.contract_subscriber: Optional[ContractEventSubscriber] = None
# Action handlers
self.coordinator_handler: Optional[CoordinatorAPIHandler] = None
self.agent_daemon_handler: Optional[AgentDaemonHandler] = None
self.marketplace_handler: Optional[MarketplaceHandler] = None
async def start(self) -> None:
"""Start the bridge service."""
if self._running:
logger.warning("Bridge already running")
return
logger.info("Initializing blockchain event bridge...")
# Initialize action handlers
if self.settings.enable_coordinator_api_trigger:
self.coordinator_handler = CoordinatorAPIHandler(
self.settings.coordinator_api_url,
self.settings.coordinator_api_key,
)
logger.info("Coordinator API handler initialized")
if self.settings.enable_agent_daemon_trigger:
self.agent_daemon_handler = AgentDaemonHandler(self.settings.blockchain_rpc_url)
logger.info("Agent daemon handler initialized")
if self.settings.enable_marketplace_trigger:
self.marketplace_handler = MarketplaceHandler(
self.settings.coordinator_api_url,
self.settings.coordinator_api_key,
)
logger.info("Marketplace handler initialized")
# Initialize event subscribers
if self.settings.subscribe_blocks:
self.block_subscriber = BlockEventSubscriber(self.settings)
self.block_subscriber.set_bridge(self)
task = asyncio.create_task(self.block_subscriber.run(), name="block-subscriber")
self._tasks.add(task)
logger.info("Block event subscriber started")
if self.settings.subscribe_transactions:
self.transaction_subscriber = TransactionEventSubscriber(self.settings)
self.transaction_subscriber.set_bridge(self)
task = asyncio.create_task(self.transaction_subscriber.run(), name="transaction-subscriber")
self._tasks.add(task)
logger.info("Transaction event subscriber started")
# Initialize contract event subscriber (Phase 2)
if self.settings.subscribe_contracts:
self.contract_subscriber = ContractEventSubscriber(self.settings)
self.contract_subscriber.set_bridge(self)
task = asyncio.create_task(self.contract_subscriber.run(), name="contract-subscriber")
self._tasks.add(task)
logger.info("Contract event subscriber started")
self._running = True
logger.info("Blockchain event bridge started successfully")
async def stop(self) -> None:
"""Stop the bridge service."""
if not self._running:
return
logger.info("Stopping blockchain event bridge...")
# Cancel all tasks
for task in self._tasks:
task.cancel()
# Wait for tasks to complete
if self._tasks:
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks.clear()
self._running = False
logger.info("Blockchain event bridge stopped")
@property
def is_running(self) -> bool:
"""Check if the bridge is running."""
return self._running
async def handle_block_event(self, block_data: Dict[str, Any]) -> None:
"""Handle a new block event."""
event_type = "block"
events_received_total.labels(event_type=event_type).inc()
with event_processing_duration_seconds.labels(event_type=event_type).time():
try:
# Extract transactions from block
transactions = block_data.get("transactions", [])
# Trigger actions based on block content
if transactions and self.settings.enable_coordinator_api_trigger:
await self._trigger_coordinator_actions(block_data, transactions)
if transactions and self.settings.enable_marketplace_trigger:
await self._trigger_marketplace_actions(block_data, transactions)
events_processed_total.labels(event_type=event_type, status="success").inc()
logger.info(f"Processed block event: height={block_data.get('height')}, txs={len(transactions)}")
except Exception as e:
events_processed_total.labels(event_type=event_type, status="error").inc()
logger.error(f"Error processing block event: {e}", exc_info=True)
async def handle_transaction_event(self, tx_data: Dict[str, Any]) -> None:
"""Handle a transaction event."""
event_type = "transaction"
events_received_total.labels(event_type=event_type).inc()
with event_processing_duration_seconds.labels(event_type=event_type).time():
try:
# Trigger actions based on transaction type
if self.settings.enable_agent_daemon_trigger:
await self._trigger_agent_daemon_actions(tx_data)
if self.settings.enable_coordinator_api_trigger:
await self._trigger_coordinator_transaction_actions(tx_data)
events_processed_total.labels(event_type=event_type, status="success").inc()
logger.info(f"Processed transaction event: hash={tx_data.get('hash')}")
except Exception as e:
events_processed_total.labels(event_type=event_type, status="error").inc()
logger.error(f"Error processing transaction event: {e}", exc_info=True)
async def _trigger_coordinator_actions(self, block_data: Dict[str, Any], transactions: list) -> None:
"""Trigger coordinator API actions based on block data."""
if not self.coordinator_handler:
return
with action_execution_duration_seconds.labels(action_type="coordinator_api").time():
try:
await self.coordinator_handler.handle_block(block_data, transactions)
actions_triggered_total.labels(action_type="coordinator_api").inc()
except Exception as e:
actions_failed_total.labels(action_type="coordinator_api").inc()
logger.error(f"Error triggering coordinator API actions: {e}", exc_info=True)
async def _trigger_marketplace_actions(self, block_data: Dict[str, Any], transactions: list) -> None:
"""Trigger marketplace actions based on block data."""
if not self.marketplace_handler:
return
with action_execution_duration_seconds.labels(action_type="marketplace").time():
try:
await self.marketplace_handler.handle_block(block_data, transactions)
actions_triggered_total.labels(action_type="marketplace").inc()
except Exception as e:
actions_failed_total.labels(action_type="marketplace").inc()
logger.error(f"Error triggering marketplace actions: {e}", exc_info=True)
async def _trigger_agent_daemon_actions(self, tx_data: Dict[str, Any]) -> None:
"""Trigger agent daemon actions based on transaction data."""
if not self.agent_daemon_handler:
return
with action_execution_duration_seconds.labels(action_type="agent_daemon").time():
try:
await self.agent_daemon_handler.handle_transaction(tx_data)
actions_triggered_total.labels(action_type="agent_daemon").inc()
except Exception as e:
actions_failed_total.labels(action_type="agent_daemon").inc()
logger.error(f"Error triggering agent daemon actions: {e}", exc_info=True)
async def _trigger_coordinator_transaction_actions(self, tx_data: Dict[str, Any]) -> None:
"""Trigger coordinator API actions based on transaction data."""
if not self.coordinator_handler:
return
with action_execution_duration_seconds.labels(action_type="coordinator_api").time():
try:
await self.coordinator_handler.handle_transaction(tx_data)
actions_triggered_total.labels(action_type="coordinator_api").inc()
except Exception as e:
actions_failed_total.labels(action_type="coordinator_api").inc()
logger.error(f"Error triggering coordinator API transaction actions: {e}", exc_info=True)
# Phase 2: Contract event handlers
async def handle_staking_event(self, event_log: Dict[str, Any]) -> None:
"""Handle AgentStaking contract event."""
event_type = "staking_event"
events_received_total.labels(event_type=event_type).inc()
with event_processing_duration_seconds.labels(event_type=event_type).time():
try:
if self.agent_daemon_handler:
await self.agent_daemon_handler.handle_staking_event(event_log)
events_processed_total.labels(event_type=event_type, status="success").inc()
except Exception as e:
events_processed_total.labels(event_type=event_type, status="error").inc()
logger.error(f"Error processing staking event: {e}", exc_info=True)
async def handle_performance_event(self, event_log: Dict[str, Any]) -> None:
"""Handle PerformanceVerifier contract event."""
event_type = "performance_event"
events_received_total.labels(event_type=event_type).inc()
with event_processing_duration_seconds.labels(event_type=event_type).time():
try:
if self.agent_daemon_handler:
await self.agent_daemon_handler.handle_performance_event(event_log)
events_processed_total.labels(event_type=event_type, status="success").inc()
except Exception as e:
events_processed_total.labels(event_type=event_type, status="error").inc()
logger.error(f"Error processing performance event: {e}", exc_info=True)
async def handle_marketplace_event(self, event_log: Dict[str, Any]) -> None:
"""Handle AgentServiceMarketplace contract event."""
event_type = "marketplace_event"
events_received_total.labels(event_type=event_type).inc()
with event_processing_duration_seconds.labels(event_type=event_type).time():
try:
if self.marketplace_handler:
await self.marketplace_handler.handle_contract_event(event_log)
events_processed_total.labels(event_type=event_type, status="success").inc()
except Exception as e:
events_processed_total.labels(event_type=event_type, status="error").inc()
logger.error(f"Error processing marketplace event: {e}", exc_info=True)
async def handle_bounty_event(self, event_log: Dict[str, Any]) -> None:
"""Handle BountyIntegration contract event."""
event_type = "bounty_event"
events_received_total.labels(event_type=event_type).inc()
with event_processing_duration_seconds.labels(event_type=event_type).time():
try:
if self.agent_daemon_handler:
await self.agent_daemon_handler.handle_bounty_event(event_log)
events_processed_total.labels(event_type=event_type, status="success").inc()
except Exception as e:
events_processed_total.labels(event_type=event_type, status="error").inc()
logger.error(f"Error processing bounty event: {e}", exc_info=True)
async def handle_bridge_event(self, event_log: Dict[str, Any]) -> None:
"""Handle CrossChainBridge contract event."""
event_type = "bridge_event"
events_received_total.labels(event_type=event_type).inc()
with event_processing_duration_seconds.labels(event_type=event_type).time():
try:
if self.agent_daemon_handler:
await self.agent_daemon_handler.handle_bridge_event(event_log)
events_processed_total.labels(event_type=event_type, status="success").inc()
except Exception as e:
events_processed_total.labels(event_type=event_type, status="error").inc()
logger.error(f"Error processing bridge event: {e}", exc_info=True)

View File

@@ -0,0 +1,54 @@
"""Configuration settings for blockchain event bridge."""
from typing import Optional
from pydantic import Field
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Configuration settings for the blockchain event bridge."""
# Service configuration
app_name: str = "Blockchain Event Bridge"
bind_host: str = Field(default="127.0.0.1")
bind_port: int = Field(default=8204)
# Blockchain RPC
blockchain_rpc_url: str = Field(default="http://localhost:8006")
# Gossip broker
gossip_backend: str = Field(default="memory") # memory, broadcast, redis
gossip_broadcast_url: Optional[str] = Field(default=None)
# Coordinator API
coordinator_api_url: str = Field(default="http://localhost:8011")
coordinator_api_key: Optional[str] = Field(default=None)
# Event subscription filters
subscribe_blocks: bool = Field(default=True)
subscribe_transactions: bool = Field(default=True)
subscribe_contracts: bool = Field(default=False) # Phase 2
# Smart contract addresses (Phase 2)
agent_staking_address: Optional[str] = Field(default=None)
performance_verifier_address: Optional[str] = Field(default=None)
marketplace_address: Optional[str] = Field(default=None)
bounty_address: Optional[str] = Field(default=None)
bridge_address: Optional[str] = Field(default=None)
# Action handler enable/disable flags
enable_agent_daemon_trigger: bool = Field(default=True)
enable_coordinator_api_trigger: bool = Field(default=True)
enable_marketplace_trigger: bool = Field(default=True)
# Polling configuration (Phase 3)
enable_polling: bool = Field(default=False)
polling_interval_seconds: int = Field(default=60)
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
extra = "ignore"
settings = Settings()

View File

@@ -0,0 +1,7 @@
"""Event subscriber modules for blockchain events."""
from .blocks import BlockEventSubscriber
from .transactions import TransactionEventSubscriber
from .contracts import ContractEventSubscriber
__all__ = ["BlockEventSubscriber", "TransactionEventSubscriber", "ContractEventSubscriber"]

View File

@@ -0,0 +1,107 @@
"""Block event subscriber for gossip broker."""
import asyncio
import logging
from typing import TYPE_CHECKING, Any, Dict
from ..config import Settings
from ..metrics import event_queue_size, gossip_subscribers_total
if TYPE_CHECKING:
from ..bridge import BlockchainEventBridge
logger = logging.getLogger(__name__)
class BlockEventSubscriber:
"""Subscribes to block events from the gossip broker."""
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._running = False
self._bridge: "BlockchainEventBridge | None" = None
self._subscription = None
def set_bridge(self, bridge: "BlockchainEventBridge") -> None:
"""Set the bridge instance for event handling."""
self._bridge = bridge
async def run(self) -> None:
"""Run the block event subscriber."""
if self._running:
logger.warning("Block event subscriber already running")
return
self._running = True
logger.info("Starting block event subscriber...")
# Import gossip broker from blockchain-node
try:
# Add blockchain-node to path for import
import sys
from pathlib import Path
blockchain_node_src = Path("/opt/aitbc/apps/blockchain-node/src")
if str(blockchain_node_src) not in sys.path:
sys.path.insert(0, str(blockchain_node_src))
from aitbc_chain.gossip.broker import create_backend, GossipBroker
# Create gossip backend
backend = create_backend(
self.settings.gossip_backend,
broadcast_url=self.settings.gossip_broadcast_url
)
self._broker = GossipBroker(backend)
# Subscribe to blocks topic
self._subscription = await self._broker.subscribe("blocks", max_queue_size=100)
gossip_subscribers_total.set(1)
logger.info("Successfully subscribed to blocks topic")
except ImportError as e:
logger.error(f"Failed to import gossip broker: {e}")
logger.info("Using mock implementation for development")
await self._run_mock()
return
# Process block events
while self._running:
try:
block_data = await self._subscription.get()
event_queue_size.labels(topic="blocks").set(self._subscription.queue.qsize())
logger.info(f"Received block event: height={block_data.get('height')}")
if self._bridge:
await self._bridge.handle_block_event(block_data)
except asyncio.CancelledError:
logger.info("Block event subscriber cancelled")
break
except Exception as e:
logger.error(f"Error processing block event: {e}", exc_info=True)
await asyncio.sleep(1) # Avoid tight error loop
async def _run_mock(self) -> None:
"""Run a mock subscriber for development/testing when gossip broker is unavailable."""
logger.warning("Using mock block event subscriber - no real events will be processed")
await asyncio.sleep(60) # Keep alive but do nothing
async def stop(self) -> None:
"""Stop the block event subscriber."""
if not self._running:
return
logger.info("Stopping block event subscriber...")
self._running = False
if self._subscription:
self._subscription.close()
if hasattr(self, '_broker'):
await self._broker.shutdown()
gossip_subscribers_total.set(0)
logger.info("Block event subscriber stopped")

View File

@@ -0,0 +1,223 @@
"""Contract event subscriber for smart contract event monitoring."""
import asyncio
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional
import httpx
from ..config import Settings
from ..metrics import event_queue_size
if TYPE_CHECKING:
from ..bridge import BlockchainEventBridge
logger = logging.getLogger(__name__)
class ContractEventSubscriber:
"""Subscribes to smart contract events via blockchain RPC."""
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._running = False
self._bridge: "BlockchainEventBridge | None" = None
self._client: Optional[httpx.AsyncClient] = None
# Contract addresses from configuration
self.contract_addresses: Dict[str, str] = {
"AgentStaking": settings.agent_staking_address or "",
"PerformanceVerifier": settings.performance_verifier_address or "",
"AgentServiceMarketplace": settings.marketplace_address or "",
"BountyIntegration": settings.bounty_address or "",
"CrossChainBridge": settings.bridge_address or "",
}
# Event topics/signatures for each contract
self.event_topics: Dict[str, list[str]] = {
"AgentStaking": [
"StakeCreated",
"RewardsDistributed",
"AgentTierUpdated",
],
"PerformanceVerifier": [
"PerformanceVerified",
"PenaltyApplied",
"RewardIssued",
],
"AgentServiceMarketplace": [
"ServiceListed",
"ServicePurchased",
],
"BountyIntegration": [
"BountyCreated",
"BountyCompleted",
],
"CrossChainBridge": [
"BridgeInitiated",
"BridgeCompleted",
],
}
# Track last processed block for each contract
self.last_processed_blocks: Dict[str, int] = {}
def set_bridge(self, bridge: "BlockchainEventBridge") -> None:
"""Set the bridge instance for event handling."""
self._bridge = bridge
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client."""
if self._client is None:
self._client = httpx.AsyncClient(
base_url=self.settings.blockchain_rpc_url,
timeout=30.0,
)
return self._client
async def run(self) -> None:
"""Run the contract event subscriber."""
if not self.settings.subscribe_contracts:
logger.info("Contract event subscription disabled")
return
if self._running:
logger.warning("Contract event subscriber already running")
return
self._running = True
logger.info("Starting contract event subscriber...")
# Initialize last processed blocks from current chain height
await self._initialize_block_tracking()
while self._running:
try:
await self._poll_contract_events()
await asyncio.sleep(self.settings.polling_interval_seconds)
except asyncio.CancelledError:
logger.info("Contract event subscriber cancelled")
break
except Exception as e:
logger.error(f"Error in contract event subscriber: {e}", exc_info=True)
await asyncio.sleep(5)
async def _initialize_block_tracking(self) -> None:
"""Initialize block tracking from current chain height."""
try:
client = await self._get_client()
response = await client.get("/head")
if response.status_code == 200:
head_data = response.json()
current_height = head_data.get("height", 0)
for contract in self.contract_addresses:
if self.contract_addresses[contract]:
self.last_processed_blocks[contract] = current_height
logger.info(f"Initialized block tracking at height {current_height}")
except Exception as e:
logger.error(f"Error initializing block tracking: {e}")
async def _poll_contract_events(self) -> None:
"""Poll for contract events from blockchain."""
client = await self._get_client()
for contract_name, contract_address in self.contract_addresses.items():
if not contract_address:
continue
try:
# Get current chain height
response = await client.get("/head")
if response.status_code != 200:
logger.error(f"Failed to get chain head: {response.status_code}")
continue
head_data = response.json()
current_height = head_data.get("height", 0)
last_height = self.last_processed_blocks.get(contract_name, current_height - 100)
# Query events for this contract
logs_response = await client.post(
"/eth_getLogs",
json={
"address": contract_address,
"from_block": last_height + 1,
"to_block": current_height,
"topics": self.event_topics.get(contract_name, []),
}
)
if logs_response.status_code != 200:
logger.error(f"Failed to get logs for {contract_name}: {logs_response.status_code}")
continue
logs_data = logs_response.json()
logs = logs_data.get("logs", [])
if logs:
logger.info(f"Found {len(logs)} events for {contract_name}")
# Process each log
for log in logs:
await self._process_contract_event(contract_name, log)
# Update last processed block
self.last_processed_blocks[contract_name] = current_height
except Exception as e:
logger.error(f"Error polling events for {contract_name}: {e}", exc_info=True)
async def _process_contract_event(self, contract_name: str, log: Dict[str, Any]) -> None:
"""Process a contract event."""
event_type = log.get("topics", [""])[0] if log.get("topics") else "Unknown"
logger.info(f"Processing {contract_name} event: {event_type}")
if self._bridge:
# Route event to appropriate handler based on contract type
if contract_name == "AgentStaking":
await self._handle_staking_event(log)
elif contract_name == "PerformanceVerifier":
await self._handle_performance_event(log)
elif contract_name == "AgentServiceMarketplace":
await self._handle_marketplace_event(log)
elif contract_name == "BountyIntegration":
await self._handle_bounty_event(log)
elif contract_name == "CrossChainBridge":
await self._handle_bridge_event(log)
async def _handle_staking_event(self, log: Dict[str, Any]) -> None:
"""Handle AgentStaking contract event."""
if self._bridge:
await self._bridge.handle_staking_event(log)
async def _handle_performance_event(self, log: Dict[str, Any]) -> None:
"""Handle PerformanceVerifier contract event."""
if self._bridge:
await self._bridge.handle_performance_event(log)
async def _handle_marketplace_event(self, log: Dict[str, Any]) -> None:
"""Handle AgentServiceMarketplace contract event."""
if self._bridge:
await self._bridge.handle_marketplace_event(log)
async def _handle_bounty_event(self, log: Dict[str, Any]) -> None:
"""Handle BountyIntegration contract event."""
if self._bridge:
await self._bridge.handle_bounty_event(log)
async def _handle_bridge_event(self, log: Dict[str, Any]) -> None:
"""Handle CrossChainBridge contract event."""
if self._bridge:
await self._bridge.handle_bridge_event(log)
async def stop(self) -> None:
"""Stop the contract event subscriber."""
self._running = False
if self._client:
await self._client.aclose()
self._client = None
logger.info("Contract event subscriber stopped")

View File

@@ -0,0 +1,113 @@
"""Transaction event subscriber for gossip broker."""
import asyncio
import logging
from typing import TYPE_CHECKING, Any, Dict
from ..config import Settings
from ..metrics import event_queue_size, gossip_subscribers_total
if TYPE_CHECKING:
from ..bridge import BlockchainEventBridge
logger = logging.getLogger(__name__)
class TransactionEventSubscriber:
"""Subscribes to transaction events from the gossip broker."""
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._running = False
self._bridge: "BlockchainEventBridge | None" = None
self._subscription = None
def set_bridge(self, bridge: "BlockchainEventBridge") -> None:
"""Set the bridge instance for event handling."""
self._bridge = bridge
async def run(self) -> None:
"""Run the transaction event subscriber."""
if self._running:
logger.warning("Transaction event subscriber already running")
return
self._running = True
logger.info("Starting transaction event subscriber...")
# Import gossip broker from blockchain-node
try:
# Add blockchain-node to path for import
import sys
from pathlib import Path
blockchain_node_src = Path("/opt/aitbc/apps/blockchain-node/src")
if str(blockchain_node_src) not in sys.path:
sys.path.insert(0, str(blockchain_node_src))
from aitbc_chain.gossip.broker import create_backend, GossipBroker
# Create gossip backend
backend = create_backend(
self.settings.gossip_backend,
broadcast_url=self.settings.gossip_broadcast_url
)
self._broker = GossipBroker(backend)
# Subscribe to transactions topic (if available)
# Note: Currently transactions are embedded in block events
# This subscriber will be enhanced when transaction events are published separately
try:
self._subscription = await self._broker.subscribe("transactions", max_queue_size=100)
gossip_subscribers_total.inc()
logger.info("Successfully subscribed to transactions topic")
except Exception:
logger.info("Transactions topic not available - will extract from block events")
await self._run_mock()
return
except ImportError as e:
logger.error(f"Failed to import gossip broker: {e}")
logger.info("Using mock implementation for development")
await self._run_mock()
return
# Process transaction events
while self._running:
try:
tx_data = await self._subscription.get()
event_queue_size.labels(topic="transactions").set(self._subscription.queue.qsize())
logger.info(f"Received transaction event: hash={tx_data.get('hash')}")
if self._bridge:
await self._bridge.handle_transaction_event(tx_data)
except asyncio.CancelledError:
logger.info("Transaction event subscriber cancelled")
break
except Exception as e:
logger.error(f"Error processing transaction event: {e}", exc_info=True)
await asyncio.sleep(1)
async def _run_mock(self) -> None:
"""Run a mock subscriber for development/testing."""
logger.warning("Using mock transaction event subscriber - no real events will be processed")
await asyncio.sleep(60)
async def stop(self) -> None:
"""Stop the transaction event subscriber."""
if not self._running:
return
logger.info("Stopping transaction event subscriber...")
self._running = False
if self._subscription:
self._subscription.close()
if hasattr(self, '_broker'):
await self._broker.shutdown()
gossip_subscribers_total.dec()
logger.info("Transaction event subscriber stopped")

View File

@@ -0,0 +1,74 @@
"""Main FastAPI application for blockchain event bridge."""
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from prometheus_client import make_asgi_app
from .config import settings
from .bridge import BlockchainEventBridge
from .metrics import (
events_received_total,
events_processed_total,
actions_triggered_total,
actions_failed_total,
)
logger = logging.getLogger(__name__)
bridge_instance: BlockchainEventBridge | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup/shutdown."""
global bridge_instance
logger.info(f"Starting {settings.app_name}...")
# Initialize and start the bridge
bridge_instance = BlockchainEventBridge(settings)
await bridge_instance.start()
logger.info(f"{settings.app_name} started successfully")
yield
# Shutdown
logger.info(f"Shutting down {settings.app_name}...")
if bridge_instance:
await bridge_instance.stop()
logger.info(f"{settings.app_name} shut down successfully")
app = FastAPI(
title=settings.app_name,
description="Bridge between AITBC blockchain events and OpenClaw agent triggers",
version="0.1.0",
lifespan=lifespan,
)
# Add Prometheus metrics endpoint
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"bridge_running": bridge_instance is not None and bridge_instance.is_running,
}
@app.get("/")
async def root():
"""Root endpoint."""
return {
"service": settings.app_name,
"version": "0.1.0",
"status": "running",
}

View File

@@ -0,0 +1,61 @@
"""Prometheus metrics for blockchain event bridge."""
from prometheus_client import Counter, Histogram, Gauge
# Event metrics
events_received_total = Counter(
"bridge_events_received_total",
"Total number of events received from blockchain",
["event_type"]
)
events_processed_total = Counter(
"bridge_events_processed_total",
"Total number of events processed",
["event_type", "status"]
)
# Action metrics
actions_triggered_total = Counter(
"bridge_actions_triggered_total",
"Total number of actions triggered",
["action_type"]
)
actions_failed_total = Counter(
"bridge_actions_failed_total",
"Total number of actions that failed",
["action_type"]
)
# Performance metrics
event_processing_duration_seconds = Histogram(
"bridge_event_processing_duration_seconds",
"Time spent processing events",
["event_type"]
)
action_execution_duration_seconds = Histogram(
"bridge_action_execution_duration_seconds",
"Time spent executing actions",
["action_type"]
)
# Queue metrics
event_queue_size = Gauge(
"bridge_event_queue_size",
"Current size of event queue",
["topic"]
)
# Connection metrics
gossip_subscribers_total = Gauge(
"bridge_gossip_subscribers_total",
"Number of active gossip broker subscriptions"
)
coordinator_api_requests_total = Counter(
"bridge_coordinator_api_requests_total",
"Total number of coordinator API requests",
["endpoint", "method", "status"]
)

View File

@@ -0,0 +1,6 @@
"""Polling modules for batch operations and condition-based triggers."""
from .conditions import ConditionPoller
from .batch import BatchProcessor
__all__ = ["ConditionPoller", "BatchProcessor"]

View File

@@ -0,0 +1,70 @@
"""Batch processing for aggregated operations."""
import asyncio
import logging
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
class BatchProcessor:
"""Processes events in batches for efficiency."""
def __init__(self, settings: Any) -> None:
self.settings = settings
self._running = False
self._batch_queue: List[Dict[str, Any]] = []
self._batch_size = 50
async def run(self) -> None:
"""Run the batch processor."""
if not self.settings.enable_polling:
logger.info("Batch processing disabled")
return
self._running = True
logger.info("Starting batch processor...")
while self._running:
try:
await self._process_batch()
await asyncio.sleep(self.settings.polling_interval_seconds)
except asyncio.CancelledError:
logger.info("Batch processor cancelled")
break
except Exception as e:
logger.error(f"Error in batch processor: {e}", exc_info=True)
await asyncio.sleep(5)
async def add_to_batch(self, event: Dict[str, Any]) -> None:
"""Add an event to the batch queue."""
self._batch_queue.append(event)
if len(self._batch_queue) >= self._batch_size:
await self._process_batch()
async def _process_batch(self) -> None:
"""Process the current batch of events."""
if not self._batch_queue:
return
batch = self._batch_queue.copy()
self._batch_queue.clear()
logger.info(f"Processing batch of {len(batch)} events")
# Placeholder for Phase 3 implementation
# Examples:
# - Batch agent reputation updates
# - Batch marketplace state synchronization
# - Batch performance metric aggregation
async def stop(self) -> None:
"""Stop the batch processor."""
self._running = False
# Process remaining events
if self._batch_queue:
await self._process_batch()
logger.info("Batch processor stopped")

View File

@@ -0,0 +1,50 @@
"""Condition-based polling for batch operations."""
import asyncio
import logging
from typing import Any, Dict
logger = logging.getLogger(__name__)
class ConditionPoller:
"""Polls for specific conditions that should trigger OpenClaw actions."""
def __init__(self, settings: Any) -> None:
self.settings = settings
self._running = False
async def run(self) -> None:
"""Run the condition poller."""
if not self.settings.enable_polling:
logger.info("Condition polling disabled")
return
self._running = True
logger.info("Starting condition poller...")
while self._running:
try:
await self._check_conditions()
await asyncio.sleep(self.settings.polling_interval_seconds)
except asyncio.CancelledError:
logger.info("Condition poller cancelled")
break
except Exception as e:
logger.error(f"Error in condition poller: {e}", exc_info=True)
await asyncio.sleep(5)
async def _check_conditions(self) -> None:
"""Check for conditions that should trigger actions."""
# Placeholder for Phase 3 implementation
# Examples:
# - Agent performance thresholds (SLA violations)
# - Marketplace capacity planning
# - Governance proposal voting deadlines
# - Cross-chain bridge status
pass
async def stop(self) -> None:
"""Stop the condition poller."""
self._running = False
logger.info("Condition poller stopped")

View File

@@ -0,0 +1 @@
"""Tests for blockchain event bridge."""

View File

@@ -0,0 +1,116 @@
"""Tests for action handlers."""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from blockchain_event_bridge.action_handlers.coordinator_api import CoordinatorAPIHandler
from blockchain_event_bridge.action_handlers.agent_daemon import AgentDaemonHandler
from blockchain_event_bridge.action_handlers.marketplace import MarketplaceHandler
@pytest.mark.asyncio
async def test_coordinator_api_handler_initialization():
"""Test coordinator API handler initialization."""
handler = CoordinatorAPIHandler("http://localhost:8011", "test-key")
assert handler.base_url == "http://localhost:8011"
assert handler.api_key == "test-key"
assert handler._client is None
@pytest.mark.asyncio
async def test_coordinator_api_handler_handle_block():
"""Test coordinator API handler handling a block."""
handler = CoordinatorAPIHandler("http://localhost:8011")
block_data = {"height": 100, "hash": "0x123"}
transactions = [{"type": "ai_job", "hash": "0x456"}]
with patch.object(handler, "handle_transaction", new_callable=AsyncMock) as mock_handle_tx:
await handler.handle_block(block_data, transactions)
mock_handle_tx.assert_called_once_with(transactions[0])
@pytest.mark.asyncio
async def test_coordinator_api_handler_close():
"""Test closing coordinator API handler."""
handler = CoordinatorAPIHandler("http://localhost:8011")
# Create a client first
await handler._get_client()
assert handler._client is not None
await handler.close()
assert handler._client is None
@pytest.mark.asyncio
async def test_agent_daemon_handler_initialization():
"""Test agent daemon handler initialization."""
handler = AgentDaemonHandler("http://localhost:8006")
assert handler.blockchain_rpc_url == "http://localhost:8006"
assert handler._client is None
@pytest.mark.asyncio
async def test_agent_daemon_handler_handle_transaction():
"""Test agent daemon handler handling a transaction."""
handler = AgentDaemonHandler("http://localhost:8006")
tx_data = {
"hash": "0x123",
"type": "agent_message",
"to": "agent_address",
"payload": {"trigger": "process"}
}
with patch.object(handler, "_notify_agent_daemon", new_callable=AsyncMock) as mock_notify:
await handler.handle_transaction(tx_data)
mock_notify.assert_called_once_with(tx_data)
@pytest.mark.asyncio
async def test_agent_daemon_handler_is_agent_transaction():
"""Test checking if transaction is an agent transaction."""
handler = AgentDaemonHandler("http://localhost:8006")
# Agent transaction
assert handler._is_agent_transaction({"payload": {"trigger": "test"}}) is True
assert handler._is_agent_transaction({"payload": {"agent": "test"}}) is True
# Not an agent transaction
assert handler._is_agent_transaction({"payload": {"other": "test"}}) is False
assert handler._is_agent_transaction({}) is False
@pytest.mark.asyncio
async def test_marketplace_handler_initialization():
"""Test marketplace handler initialization."""
handler = MarketplaceHandler("http://localhost:8011", "test-key")
assert handler.base_url == "http://localhost:8011"
assert handler.api_key == "test-key"
assert handler._client is None
@pytest.mark.asyncio
async def test_marketplace_handler_filter_marketplace_transactions():
"""Test filtering marketplace transactions."""
handler = MarketplaceHandler("http://localhost:8011")
transactions = [
{"type": "marketplace", "hash": "0x1"},
{"type": "transfer", "hash": "0x2"},
{"type": "listing", "hash": "0x3"},
{"type": "transfer", "payload": {"listing_id": "123"}, "hash": "0x4"},
]
filtered = handler._filter_marketplace_transactions(transactions)
assert len(filtered) == 3
assert filtered[0]["hash"] == "0x1"
assert filtered[1]["hash"] == "0x3"
assert filtered[2]["hash"] == "0x4"

View File

@@ -0,0 +1,77 @@
"""Tests for contract event handlers."""
import pytest
from unittest.mock import Mock, AsyncMock
from blockchain_event_bridge.action_handlers.agent_daemon import AgentDaemonHandler
from blockchain_event_bridge.action_handlers.marketplace import MarketplaceHandler
@pytest.mark.asyncio
async def test_agent_daemon_handle_staking_event():
"""Test agent daemon handler for staking events."""
handler = AgentDaemonHandler("http://localhost:8006")
event_log = {
"topics": ["StakeCreated"],
"data": '{"stakeId": "123", "staker": "0xabc"}'
}
# Should not raise an error
await handler.handle_staking_event(event_log)
@pytest.mark.asyncio
async def test_agent_daemon_handle_performance_event():
"""Test agent daemon handler for performance events."""
handler = AgentDaemonHandler("http://localhost:8006")
event_log = {
"topics": ["PerformanceVerified"],
"data": '{"verificationId": "456", "withinSLA": true}'
}
# Should not raise an error
await handler.handle_performance_event(event_log)
@pytest.mark.asyncio
async def test_agent_daemon_handle_bounty_event():
"""Test agent daemon handler for bounty events."""
handler = AgentDaemonHandler("http://localhost:8006")
event_log = {
"topics": ["BountyCreated"],
"data": '{"bountyId": "789", "creator": "0xdef"}'
}
# Should not raise an error
await handler.handle_bounty_event(event_log)
@pytest.mark.asyncio
async def test_agent_daemon_handle_bridge_event():
"""Test agent daemon handler for bridge events."""
handler = AgentDaemonHandler("http://localhost:8006")
event_log = {
"topics": ["BridgeInitiated"],
"data": '{"requestId": "101", "sourceChain": "ethereum"}'
}
# Should not raise an error
await handler.handle_bridge_event(event_log)
@pytest.mark.asyncio
async def test_marketplace_handle_contract_event():
"""Test marketplace handler for contract events."""
handler = MarketplaceHandler("http://localhost:8011")
event_log = {
"topics": ["ServiceListed"],
"data": '{"serviceId": "202", "provider": "0x123"}'
}
# Should not raise an error
await handler.handle_contract_event(event_log)

View File

@@ -0,0 +1,103 @@
"""Tests for contract event subscriber."""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from blockchain_event_bridge.event_subscribers.contracts import ContractEventSubscriber
@pytest.mark.asyncio
async def test_contract_subscriber_initialization():
"""Test contract subscriber initialization."""
from blockchain_event_bridge.config import Settings
settings = Settings()
subscriber = ContractEventSubscriber(settings)
assert subscriber.settings == settings
assert subscriber._running is False
assert subscriber.contract_addresses is not None
@pytest.mark.asyncio
async def test_contract_subscriber_set_bridge():
"""Test setting bridge on contract subscriber."""
from blockchain_event_bridge.config import Settings
from blockchain_event_bridge.bridge import BlockchainEventBridge
settings = Settings()
subscriber = ContractEventSubscriber(settings)
bridge = Mock(spec=BlockchainEventBridge)
subscriber.set_bridge(bridge)
assert subscriber._bridge == bridge
@pytest.mark.asyncio
async def test_contract_subscriber_disabled():
"""Test contract subscriber when disabled."""
from blockchain_event_bridge.config import Settings
settings = Settings(subscribe_contracts=False)
subscriber = ContractEventSubscriber(settings)
await subscriber.run()
assert subscriber._running is False
@pytest.mark.asyncio
async def test_contract_subscriber_stop():
"""Test stopping contract subscriber."""
from blockchain_event_bridge.config import Settings
settings = Settings(subscribe_contracts=False)
subscriber = ContractEventSubscriber(settings)
await subscriber.stop()
assert subscriber._running is False
@pytest.mark.asyncio
async def test_process_staking_event():
"""Test processing staking event."""
from blockchain_event_bridge.config import Settings
from blockchain_event_bridge.bridge import BlockchainEventBridge
settings = Settings()
subscriber = ContractEventSubscriber(settings)
bridge = Mock(spec=BlockchainEventBridge)
bridge.handle_staking_event = AsyncMock()
subscriber.set_bridge(bridge)
event_log = {
"topics": ["StakeCreated"],
"data": "{}",
"address": "0x123"
}
await subscriber._handle_staking_event(event_log)
bridge.handle_staking_event.assert_called_once_with(event_log)
@pytest.mark.asyncio
async def test_process_performance_event():
"""Test processing performance event."""
from blockchain_event_bridge.config import Settings
from blockchain_event_bridge.bridge import BlockchainEventBridge
settings = Settings()
subscriber = ContractEventSubscriber(settings)
bridge = Mock(spec=BlockchainEventBridge)
bridge.handle_performance_event = AsyncMock()
subscriber.set_bridge(bridge)
event_log = {
"topics": ["PerformanceVerified"],
"data": "{}",
"address": "0x123"
}
await subscriber._handle_performance_event(event_log)
bridge.handle_performance_event.assert_called_once_with(event_log)

View File

@@ -0,0 +1,69 @@
"""Tests for event subscribers."""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock
from blockchain_event_bridge.event_subscribers.blocks import BlockEventSubscriber
from blockchain_event_bridge.event_subscribers.transactions import TransactionEventSubscriber
@pytest.mark.asyncio
async def test_block_subscriber_initialization():
"""Test block subscriber initialization."""
from blockchain_event_bridge.config import Settings
settings = Settings()
subscriber = BlockEventSubscriber(settings)
assert subscriber.settings == settings
assert subscriber._running is False
@pytest.mark.asyncio
async def test_block_subscriber_set_bridge():
"""Test setting bridge on block subscriber."""
from blockchain_event_bridge.config import Settings
from blockchain_event_bridge.bridge import BlockchainEventBridge
settings = Settings()
subscriber = BlockEventSubscriber(settings)
bridge = Mock(spec=BlockchainEventBridge)
subscriber.set_bridge(bridge)
assert subscriber._bridge == bridge
@pytest.mark.asyncio
async def test_block_subscriber_stop():
"""Test stopping block subscriber."""
from blockchain_event_bridge.config import Settings
settings = Settings()
subscriber = BlockEventSubscriber(settings)
# Start and immediately stop
task = asyncio.create_task(subscriber.run())
await asyncio.sleep(0.1) # Let it start
await subscriber.stop()
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert subscriber._running is False
@pytest.mark.asyncio
async def test_transaction_subscriber_initialization():
"""Test transaction subscriber initialization."""
from blockchain_event_bridge.config import Settings
settings = Settings()
subscriber = TransactionEventSubscriber(settings)
assert subscriber.settings == settings
assert subscriber._running is False

View File

@@ -0,0 +1,70 @@
"""Integration tests for blockchain event bridge."""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from blockchain_event_bridge.bridge import BlockchainEventBridge
from blockchain_event_bridge.config import Settings
@pytest.mark.asyncio
async def test_bridge_initialization():
"""Test bridge initialization."""
settings = Settings()
bridge = BlockchainEventBridge(settings)
assert bridge.settings == settings
assert bridge.is_running is False
@pytest.mark.asyncio
async def test_bridge_start_stop():
"""Test bridge start and stop."""
settings = Settings(
subscribe_blocks=False,
subscribe_transactions=False,
)
bridge = BlockchainEventBridge(settings)
await bridge.start()
assert bridge.is_running is True
await bridge.stop()
assert bridge.is_running is False
@pytest.mark.asyncio
async def test_bridge_handle_block_event():
"""Test bridge handling a block event."""
settings = Settings(
enable_coordinator_api_trigger=False,
enable_marketplace_trigger=False,
)
bridge = BlockchainEventBridge(settings)
block_data = {
"height": 100,
"hash": "0x123",
"transactions": []
}
# Should not raise an error even without handlers
await bridge.handle_block_event(block_data)
@pytest.mark.asyncio
async def test_bridge_handle_transaction_event():
"""Test bridge handling a transaction event."""
settings = Settings(
enable_agent_daemon_trigger=False,
enable_coordinator_api_trigger=False,
)
bridge = BlockchainEventBridge(settings)
tx_data = {
"hash": "0x456",
"type": "transfer"
}
# Should not raise an error even without handlers
await bridge.handle_transaction_event(tx_data)

View File

@@ -1030,3 +1030,82 @@ async def force_sync(peer_data: dict) -> Dict[str, Any]:
except Exception as e:
_logger.error(f"Error forcing sync: {e}")
raise HTTPException(status_code=500, detail=f"Failed to force sync: {str(e)}")
class GetLogsRequest(BaseModel):
"""Request model for eth_getLogs RPC endpoint."""
address: Optional[str] = Field(None, description="Contract address to filter logs")
from_block: Optional[int] = Field(None, description="Starting block height")
to_block: Optional[int] = Field(None, description="Ending block height")
topics: Optional[List[str]] = Field(None, description="Event topics to filter")
class LogEntry(BaseModel):
"""Single log entry from smart contract event."""
address: str
topics: List[str]
data: str
block_number: int
transaction_hash: str
log_index: int
class GetLogsResponse(BaseModel):
"""Response model for eth_getLogs RPC endpoint."""
logs: List[LogEntry]
count: int
@router.post("/eth_getLogs", summary="Query smart contract event logs")
async def get_logs(
request: GetLogsRequest,
chain_id: Optional[str] = None
) -> GetLogsResponse:
"""
Query smart contract event logs using eth_getLogs-compatible endpoint.
Filters Receipt model for logs matching contract address and event topics.
"""
chain_id = get_chain_id(chain_id)
with session_scope() as session:
# Build query for receipts
query = select(Receipt).where(Receipt.chain_id == chain_id)
# Filter by block range
if request.from_block is not None:
query = query.where(Receipt.block_height >= request.from_block)
if request.to_block is not None:
query = query.where(Receipt.block_height <= request.to_block)
# Execute query
receipts = session.execute(query).scalars().all()
logs = []
for receipt in receipts:
# Extract event logs from receipt payload
payload = receipt.payload or {}
events = payload.get("events", [])
for event in events:
# Filter by contract address if specified
if request.address and event.get("address") != request.address:
continue
# Filter by topics if specified
if request.topics:
event_topics = event.get("topics", [])
if not any(topic in event_topics for topic in request.topics):
continue
# Create log entry
log_entry = LogEntry(
address=event.get("address", ""),
topics=event.get("topics", []),
data=str(event.get("data", "")),
block_number=receipt.block_height or 0,
transaction_hash=receipt.receipt_id,
log_index=event.get("logIndex", 0)
)
logs.append(log_entry)
return GetLogsResponse(logs=logs, count=len(logs))

View File

@@ -0,0 +1,33 @@
[Unit]
Description=AITBC Blockchain Event Bridge Service
After=network.target aitbc-blockchain-node.service
Wants=aitbc-blockchain-node.service
[Service]
Type=simple
User=aitbc
Group=aitbc
WorkingDirectory=/opt/aitbc/apps/blockchain-event-bridge
Environment="PATH=/opt/aitbc/apps/blockchain-event-bridge/.venv/bin:/usr/local/bin:/usr/bin:/bin"
EnvironmentFile=/etc/aitbc/blockchain-event-bridge.env
# Poetry virtualenv
ExecStart=/opt/aitbc/apps/blockchain-event-bridge/.venv/bin/uvicorn blockchain_event_bridge.main:app --host 127.0.0.1 --port 8204
# Restart policy
Restart=always
RestartSec=10
# Security
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/lib/aitbc
ReadWritePaths=/var/log/aitbc
# Resource limits
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target