Add logging to broadcast subscription task
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Successful in 9s
Integration Tests / test-service-integration (push) Has started running
Multi-Node Blockchain Health Monitoring / health-check (push) Has been cancelled
P2P Network Verification / p2p-verification (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has started running
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Successful in 9s
Integration Tests / test-service-integration (push) Has started running
Multi-Node Blockchain Health Monitoring / health-check (push) Has been cancelled
P2P Network Verification / p2p-verification (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has started running
This commit is contained in:
@@ -150,16 +150,27 @@ class BroadcastGossipBackend(GossipBackend):
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
async def _run_subscription() -> None:
|
||||
async with self._broadcast.subscribe(topic) as subscriber: # type: ignore[attr-defined,union-attr]
|
||||
async for event in subscriber: # type: ignore[union-attr]
|
||||
if stop_event.is_set():
|
||||
break
|
||||
data = _decode_message(getattr(event, "message", event))
|
||||
try:
|
||||
await queue.put(data)
|
||||
_set_queue_gauge(topic, queue.qsize())
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info(f"Starting broadcast subscription for topic: {topic}")
|
||||
try:
|
||||
async with self._broadcast.subscribe(topic) as subscriber: # type: ignore[attr-defined,union-attr]
|
||||
logger.info(f"Successfully subscribed to broadcast topic: {topic}")
|
||||
async for event in subscriber: # type: ignore[union-attr]
|
||||
if stop_event.is_set():
|
||||
logger.info(f"Stop event set for topic: {topic}")
|
||||
break
|
||||
data = _decode_message(getattr(event, "message", event))
|
||||
logger.info(f"Received message from broadcast for topic {topic}")
|
||||
try:
|
||||
await queue.put(data)
|
||||
_set_queue_gauge(topic, queue.qsize())
|
||||
except asyncio.CancelledError:
|
||||
logger.warning(f"Subscription cancelled for topic: {topic}")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Broadcast subscription error for topic {topic}: {e}")
|
||||
logger.info(f"Broadcast subscription ended for topic: {topic}")
|
||||
|
||||
task = asyncio.create_task(_run_subscription(), name=f"broadcast-sub:{topic}")
|
||||
async with self._lock:
|
||||
|
||||
Reference in New Issue
Block a user