Files
aitbc/apps/blockchain-event-bridge/tests/test_contract_subscriber.py
aitbc 90edea2da2
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Failing after 2s
Integration Tests / test-service-integration (push) Failing after 15s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 2s
P2P Network Verification / p2p-verification (push) Successful in 2s
Python Tests / test-python (push) Successful in 12s
Security Scanning / security-scan (push) Successful in 41s
Systemd Sync / sync-systemd (push) Successful in 7s
Add blockchain event bridge service with smart contract event integration
- Phase 1: Core bridge service with gossip broker subscription
- Phase 2: Smart contract event integration via eth_getLogs RPC endpoint
- Add contract event subscriber for AgentStaking, PerformanceVerifier, Marketplace, Bounty, CrossChainBridge
- Add contract event handlers in agent_daemon.py and marketplace.py
- Add systemd service file for blockchain-event-bridge
- Update blockchain node router.py with eth_getLogs endpoint
- Add configuration for contract addresses
- Add tests for contract subscriber and handlers (27 tests passing)
2026-04-23 10:58:00 +02:00

104 lines
3.0 KiB
Python

"""Tests for contract event subscriber."""
import pytest
from unittest.mock import Mock, AsyncMock, patch
from blockchain_event_bridge.event_subscribers.contracts import ContractEventSubscriber
@pytest.mark.asyncio
async def test_contract_subscriber_initialization():
"""Test contract subscriber initialization."""
from blockchain_event_bridge.config import Settings
settings = Settings()
subscriber = ContractEventSubscriber(settings)
assert subscriber.settings == settings
assert subscriber._running is False
assert subscriber.contract_addresses is not None
@pytest.mark.asyncio
async def test_contract_subscriber_set_bridge():
"""Test setting bridge on contract subscriber."""
from blockchain_event_bridge.config import Settings
from blockchain_event_bridge.bridge import BlockchainEventBridge
settings = Settings()
subscriber = ContractEventSubscriber(settings)
bridge = Mock(spec=BlockchainEventBridge)
subscriber.set_bridge(bridge)
assert subscriber._bridge == bridge
@pytest.mark.asyncio
async def test_contract_subscriber_disabled():
"""Test contract subscriber when disabled."""
from blockchain_event_bridge.config import Settings
settings = Settings(subscribe_contracts=False)
subscriber = ContractEventSubscriber(settings)
await subscriber.run()
assert subscriber._running is False
@pytest.mark.asyncio
async def test_contract_subscriber_stop():
"""Test stopping contract subscriber."""
from blockchain_event_bridge.config import Settings
settings = Settings(subscribe_contracts=False)
subscriber = ContractEventSubscriber(settings)
await subscriber.stop()
assert subscriber._running is False
@pytest.mark.asyncio
async def test_process_staking_event():
"""Test processing staking event."""
from blockchain_event_bridge.config import Settings
from blockchain_event_bridge.bridge import BlockchainEventBridge
settings = Settings()
subscriber = ContractEventSubscriber(settings)
bridge = Mock(spec=BlockchainEventBridge)
bridge.handle_staking_event = AsyncMock()
subscriber.set_bridge(bridge)
event_log = {
"topics": ["StakeCreated"],
"data": "{}",
"address": "0x123"
}
await subscriber._handle_staking_event(event_log)
bridge.handle_staking_event.assert_called_once_with(event_log)
@pytest.mark.asyncio
async def test_process_performance_event():
"""Test processing performance event."""
from blockchain_event_bridge.config import Settings
from blockchain_event_bridge.bridge import BlockchainEventBridge
settings = Settings()
subscriber = ContractEventSubscriber(settings)
bridge = Mock(spec=BlockchainEventBridge)
bridge.handle_performance_event = AsyncMock()
subscriber.set_bridge(bridge)
event_log = {
"topics": ["PerformanceVerified"],
"data": "{}",
"address": "0x123"
}
await subscriber._handle_performance_event(event_log)
bridge.handle_performance_event.assert_called_once_with(event_log)