From 12fc0d729feb7f2da7fba361e96190f6c1ae5d9a Mon Sep 17 00:00:00 2001 From: aitbc Date: Sat, 2 May 2026 18:05:35 +0200 Subject: [PATCH] debug: add detailed broadcast logging to verify gossip topic usage --- .../src/aitbc_chain/consensus/poa.py | 9 +++++---- apps/blockchain-node/src/aitbc_chain/main.py | 18 +++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/apps/blockchain-node/src/aitbc_chain/consensus/poa.py b/apps/blockchain-node/src/aitbc_chain/consensus/poa.py index bac64aea..3cba60ff 100755 --- a/apps/blockchain-node/src/aitbc_chain/consensus/poa.py +++ b/apps/blockchain-node/src/aitbc_chain/consensus/poa.py @@ -390,10 +390,11 @@ class PoAProposer: # Broadcast the new block tx_list = [tx.content for tx in processed_txs] if processed_txs else [] - self._logger.info(f"Broadcasting block {block.height} to gossip") + gossip_topic = f"blocks.{self._config.chain_id}" + self._logger.info(f"[BROADCAST] block={block.height}, topic={gossip_topic}, config.chain_id={self._config.chain_id}, block.chain_id={block.chain_id}") try: await gossip_broker.publish( - f"blocks.{self._config.chain_id}", + gossip_topic, { "chain_id": self._config.chain_id, "height": block.height, @@ -406,7 +407,7 @@ class PoAProposer: "transactions": tx_list, }, ) - self._logger.info(f"Successfully broadcasted block {block.height}") + self._logger.info(f"[BROADCAST SUCCESS] block={block.height}, topic={gossip_topic}") except Exception as e: self._logger.error(f"Failed to broadcast block {block.height}: {e}") @@ -464,7 +465,7 @@ class PoAProposer: # Broadcast genesis block for initial sync await gossip_broker.publish( - "blocks", + f"blocks.{self._config.chain_id}", { "chain_id": self._config.chain_id, "height": genesis.height, diff --git a/apps/blockchain-node/src/aitbc_chain/main.py b/apps/blockchain-node/src/aitbc_chain/main.py index 77d2156c..fb7b145f 100755 --- a/apps/blockchain-node/src/aitbc_chain/main.py +++ b/apps/blockchain-node/src/aitbc_chain/main.py @@ -137,18 +137,18 @@ class BlockchainNode: block_sub = await gossip_broker.subscribe(block_topic) logger.info(f"Successfully subscribed to {block_topic} topic") - async def process_blocks_for_chain(chain_id=chain_id): + async def process_blocks_for_chain(chain_id_param=chain_id, block_sub_param=block_sub): last_bulk_sync_time = 0 - logger.info(f"Block processing task started for chain {chain_id}") + logger.info(f"Block processing task started for chain {chain_id_param}") while True: try: - block_data = await block_sub.queue.get() - logger.info(f"Received block from gossip for chain {chain_id}") + block_data = await block_sub_param.queue.get() + logger.info(f"Received block from gossip for chain {chain_id_param}") if isinstance(block_data, str): import json block_data = json.loads(block_data) - logger.info(f"Importing block for chain {chain_id}: {block_data.get('height')}") - sync = ChainSync(session_factory=session_scope, chain_id=chain_id) + logger.info(f"Importing block for chain {chain_id_param}: {block_data.get('height')}") + sync = ChainSync(session_factory=session_scope, chain_id=chain_id_param) res = sync.import_block(block_data, transactions=block_data.get("transactions")) logger.info(f"Import result: accepted={res.accepted}, reason={res.reason}") @@ -166,7 +166,7 @@ class BlockchainNode: time_since_last_sync = current_time - last_bulk_sync_time if time_since_last_sync >= settings.min_bulk_sync_interval: - logger.warning(f"Gap detected: {gap_size} blocks, triggering automatic bulk sync") + logger.warning(f"Gap detected: {gap_size} blocks, triggering automatic bulk sync (chain={chain_id_param})") # Get source URL from block metadata if available source_url = block_data.get("source_url") @@ -177,7 +177,7 @@ class BlockchainNode: if source_url: try: imported = await sync.bulk_import_from(source_url) - logger.info(f"Bulk sync completed: {imported} blocks imported") + logger.info(f"Bulk sync completed: {imported} blocks imported (chain={chain_id_param})") last_bulk_sync_time = current_time # Retry block import after bulk sync @@ -194,7 +194,7 @@ class BlockchainNode: except Exception as exc: logger.error(f"Error processing block from gossip for chain {chain_id}: {exc}") - asyncio.create_task(process_blocks_for_chain(chain_id)) + asyncio.create_task(process_blocks_for_chain(chain_id_param=chain_id, block_sub_param=block_sub)) except Exception as e: logger.error(f"Failed to subscribe to blocks.{chain_id}: {e}")