debug: add detailed broadcast logging to verify gossip topic usage
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Failing after 2s
Cross-Chain Functionality Tests / test-cross-chain-sync (push) Successful in 3s
Cross-Chain Functionality Tests / test-cross-chain-transactions (push) Successful in 2s
Cross-Chain Functionality Tests / test-cross-chain-bridge (push) Has been skipped
Cross-Chain Functionality Tests / test-multi-chain-consensus (push) Successful in 2s
Cross-Chain Functionality Tests / aggregate-results (push) Has been skipped
Cross-Node Transaction Testing / transaction-test (push) Successful in 3s
Deploy to Testnet / deploy-testnet (push) Successful in 1m15s
Integration Tests / test-service-integration (push) Successful in 1m8s
Multi-Node Blockchain Health Monitoring / health-check (push) Failing after 3s
Multi-Node Stress Testing / stress-test (push) Successful in 5s
Node Failover Simulation / failover-test (push) Successful in 3s
P2P Network Verification / p2p-verification (push) Successful in 3s
Python Tests / test-python (push) Successful in 38s
Security Scanning / security-scan (push) Successful in 33s

This commit is contained in:
aitbc
2026-05-02 18:05:35 +02:00
parent c627bba425
commit 12fc0d729f
2 changed files with 14 additions and 13 deletions

View File

@@ -390,10 +390,11 @@ class PoAProposer:
# Broadcast the new block
tx_list = [tx.content for tx in processed_txs] if processed_txs else []
self._logger.info(f"Broadcasting block {block.height} to gossip")
gossip_topic = f"blocks.{self._config.chain_id}"
self._logger.info(f"[BROADCAST] block={block.height}, topic={gossip_topic}, config.chain_id={self._config.chain_id}, block.chain_id={block.chain_id}")
try:
await gossip_broker.publish(
f"blocks.{self._config.chain_id}",
gossip_topic,
{
"chain_id": self._config.chain_id,
"height": block.height,
@@ -406,7 +407,7 @@ class PoAProposer:
"transactions": tx_list,
},
)
self._logger.info(f"Successfully broadcasted block {block.height}")
self._logger.info(f"[BROADCAST SUCCESS] block={block.height}, topic={gossip_topic}")
except Exception as e:
self._logger.error(f"Failed to broadcast block {block.height}: {e}")
@@ -464,7 +465,7 @@ class PoAProposer:
# Broadcast genesis block for initial sync
await gossip_broker.publish(
"blocks",
f"blocks.{self._config.chain_id}",
{
"chain_id": self._config.chain_id,
"height": genesis.height,

View File

@@ -137,18 +137,18 @@ class BlockchainNode:
block_sub = await gossip_broker.subscribe(block_topic)
logger.info(f"Successfully subscribed to {block_topic} topic")
async def process_blocks_for_chain(chain_id=chain_id):
async def process_blocks_for_chain(chain_id_param=chain_id, block_sub_param=block_sub):
last_bulk_sync_time = 0
logger.info(f"Block processing task started for chain {chain_id}")
logger.info(f"Block processing task started for chain {chain_id_param}")
while True:
try:
block_data = await block_sub.queue.get()
logger.info(f"Received block from gossip for chain {chain_id}")
block_data = await block_sub_param.queue.get()
logger.info(f"Received block from gossip for chain {chain_id_param}")
if isinstance(block_data, str):
import json
block_data = json.loads(block_data)
logger.info(f"Importing block for chain {chain_id}: {block_data.get('height')}")
sync = ChainSync(session_factory=session_scope, chain_id=chain_id)
logger.info(f"Importing block for chain {chain_id_param}: {block_data.get('height')}")
sync = ChainSync(session_factory=session_scope, chain_id=chain_id_param)
res = sync.import_block(block_data, transactions=block_data.get("transactions"))
logger.info(f"Import result: accepted={res.accepted}, reason={res.reason}")
@@ -166,7 +166,7 @@ class BlockchainNode:
time_since_last_sync = current_time - last_bulk_sync_time
if time_since_last_sync >= settings.min_bulk_sync_interval:
logger.warning(f"Gap detected: {gap_size} blocks, triggering automatic bulk sync")
logger.warning(f"Gap detected: {gap_size} blocks, triggering automatic bulk sync (chain={chain_id_param})")
# Get source URL from block metadata if available
source_url = block_data.get("source_url")
@@ -177,7 +177,7 @@ class BlockchainNode:
if source_url:
try:
imported = await sync.bulk_import_from(source_url)
logger.info(f"Bulk sync completed: {imported} blocks imported")
logger.info(f"Bulk sync completed: {imported} blocks imported (chain={chain_id_param})")
last_bulk_sync_time = current_time
# Retry block import after bulk sync
@@ -194,7 +194,7 @@ class BlockchainNode:
except Exception as exc:
logger.error(f"Error processing block from gossip for chain {chain_id}: {exc}")
asyncio.create_task(process_blocks_for_chain(chain_id))
asyncio.create_task(process_blocks_for_chain(chain_id_param=chain_id, block_sub_param=block_sub))
except Exception as e:
logger.error(f"Failed to subscribe to blocks.{chain_id}: {e}")