Files
aitbc/apps/blockchain-event-bridge/tests/test_event_subscribers.py
aitbc e60cc3226c
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Successful in 9s
Blockchain Synchronization Verification / sync-verification (push) Failing after 1s
CLI Tests / test-cli (push) Failing after 3s
Documentation Validation / validate-docs (push) Successful in 6s
Documentation Validation / validate-policies-strict (push) Successful in 2s
Integration Tests / test-service-integration (push) Successful in 40s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 1s
P2P Network Verification / p2p-verification (push) Successful in 2s
Production Tests / Production Integration Tests (push) Successful in 21s
Python Tests / test-python (push) Successful in 13s
Security Scanning / security-scan (push) Failing after 46s
Smart Contract Tests / test-solidity (map[name:aitbc-token path:packages/solidity/aitbc-token]) (push) Successful in 17s
Smart Contract Tests / lint-solidity (push) Successful in 10s
Add sys import to test files and remove obsolete integration tests
- Add sys import to 29 test files across agent-coordinator, blockchain-event-bridge, blockchain-node, and coordinator-api
- Remove apps/blockchain-event-bridge/tests/test_integration.py (obsolete bridge integration tests)
- Remove apps/coordinator-api/tests/test_integration.py (obsolete API integration tests)
- Implement GPU registration in marketplace_gpu.py with GPURegistry model persistence
2026-04-23 16:43:17 +02:00

71 lines
1.9 KiB
Python

"""Tests for event subscribers."""
import pytest
import asyncio
import sys
from unittest.mock import Mock, AsyncMock
from blockchain_event_bridge.event_subscribers.blocks import BlockEventSubscriber
from blockchain_event_bridge.event_subscribers.transactions import TransactionEventSubscriber
@pytest.mark.asyncio
async def test_block_subscriber_initialization():
"""Test block subscriber initialization."""
from blockchain_event_bridge.config import Settings
settings = Settings()
subscriber = BlockEventSubscriber(settings)
assert subscriber.settings == settings
assert subscriber._running is False
@pytest.mark.asyncio
async def test_block_subscriber_set_bridge():
"""Test setting bridge on block subscriber."""
from blockchain_event_bridge.config import Settings
from blockchain_event_bridge.bridge import BlockchainEventBridge
settings = Settings()
subscriber = BlockEventSubscriber(settings)
bridge = Mock(spec=BlockchainEventBridge)
subscriber.set_bridge(bridge)
assert subscriber._bridge == bridge
@pytest.mark.asyncio
async def test_block_subscriber_stop():
"""Test stopping block subscriber."""
from blockchain_event_bridge.config import Settings
settings = Settings()
subscriber = BlockEventSubscriber(settings)
# Start and immediately stop
task = asyncio.create_task(subscriber.run())
await asyncio.sleep(0.1) # Let it start
await subscriber.stop()
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert subscriber._running is False
@pytest.mark.asyncio
async def test_transaction_subscriber_initialization():
"""Test transaction subscriber initialization."""
from blockchain_event_bridge.config import Settings
settings = Settings()
subscriber = TransactionEventSubscriber(settings)
assert subscriber.settings == settings
assert subscriber._running is False