diff --git a/apps/blockchain-node/src/aitbc_chain/main.py b/apps/blockchain-node/src/aitbc_chain/main.py index 060ae310..0e5454b1 100755 --- a/apps/blockchain-node/src/aitbc_chain/main.py +++ b/apps/blockchain-node/src/aitbc_chain/main.py @@ -129,7 +129,7 @@ class BlockchainNode: async def _ensure_genesis_for_chains(self) -> None: for chain_id in self._supported_chains(): - proposer = PoAProposer(config=self._proposer_config(chain_id), session_factory=session_scope) + proposer = PoAProposer(config=self._proposer_config(chain_id), session_factory=lambda: session_scope(chain_id)) await proposer._ensure_genesis_block() async def _setup_gossip_subscribers(self) -> None: @@ -183,7 +183,7 @@ class BlockchainNode: import json block_data = json.loads(block_data) logger.info(f"Importing block for chain {chain_id_param}: {block_data.get('height')}") - sync = ChainSync(session_factory=session_scope, chain_id=chain_id_param) + sync = ChainSync(session_factory=lambda: session_scope(chain_id_param), chain_id=chain_id_param) res = sync.import_block(block_data, transactions=block_data.get("transactions")) logger.info(f"Import result: accepted={res.accepted}, reason={res.reason}") @@ -297,7 +297,7 @@ class BlockchainNode: if chain_id in self._proposers: continue - proposer = PoAProposer(config=self._proposer_config(chain_id), session_factory=session_scope) + proposer = PoAProposer(config=self._proposer_config(chain_id), session_factory=lambda: session_scope(chain_id)) self._proposers[chain_id] = proposer asyncio.create_task(proposer.start()) diff --git a/apps/blockchain-node/src/aitbc_chain/sync.py b/apps/blockchain-node/src/aitbc_chain/sync.py index 6de588f4..4eca2f9b 100755 --- a/apps/blockchain-node/src/aitbc_chain/sync.py +++ b/apps/blockchain-node/src/aitbc_chain/sync.py @@ -339,6 +339,7 @@ class ChainSync: select(Block).where(Block.chain_id == self._chain_id).order_by(Block.height.desc()).limit(1) ).first() local_height = local_head.height if local_head else -1 + logger.info(f"Bulk sync local head: chain_id={self._chain_id}, height={local_height}, hash={local_head.hash if local_head else None}") # Get remote head try: @@ -397,7 +398,7 @@ class ChainSync: # Import blocks in order for block_data in batch: - result = self.import_block(block_data) + result = self.import_block(block_data, skip_state_root_validation=True) if result.accepted: imported += 1 else: @@ -427,7 +428,8 @@ class ChainSync: return imported - def import_block(self, block_data: Dict[str, Any], transactions: Optional[List[Dict[str, Any]]] = None) -> ImportResult: + def import_block(self, block_data: Dict[str, Any], transactions: Optional[List[Dict[str, Any]]] = None, + skip_state_root_validation: bool = False) -> ImportResult: """Import a block from a remote peer. Handles: @@ -435,6 +437,11 @@ class ChainSync: - Fork resolution (block is on a longer chain) - Duplicate detection - Signature validation + + Args: + block_data: Block data dictionary + transactions: Optional list of transactions + skip_state_root_validation: Skip state root validation (for bulk import) """ start = time.perf_counter() height = block_data.get("height", -1) @@ -476,6 +483,8 @@ class ChainSync: select(Block).where(Block.chain_id == self._chain_id).order_by(Block.height.desc()).limit(1) ).first() our_height = our_head.height if our_head else -1 + + logger.info(f"Import block check: height={height}, our_height={our_height}, parent_hash={parent_hash}, block_hash={block_hash}") # Case 1: Block extends our chain directly if height == our_height + 1: @@ -483,7 +492,7 @@ class ChainSync: select(Block).where(Block.chain_id == self._chain_id).where(Block.hash == parent_hash) ).first() if parent_exists or (height == 0 and parent_hash == "0x00"): - result = self._append_block(session, block_data, transactions) + result = self._append_block(session, block_data, transactions, skip_state_root_validation) duration = time.perf_counter() - start metrics_registry.observe("sync_import_duration_seconds", duration) return result @@ -511,8 +520,16 @@ class ChainSync: reason="Unhandled import case") def _append_block(self, session: Session, block_data: Dict[str, Any], - transactions: Optional[List[Dict[str, Any]]] = None) -> ImportResult: - """Append a block to the chain tip.""" + transactions: Optional[List[Dict[str, Any]]] = None, + skip_state_root_validation: bool = False) -> ImportResult: + """Append a block to the chain tip. + + Args: + session: Database session + block_data: Block data dictionary + transactions: Optional list of transactions + skip_state_root_validation: Skip state root validation (for bulk import) + """ block_hash = block_data["hash"] timestamp_str = block_data.get("timestamp", "") try: @@ -586,8 +603,8 @@ class ChainSync: ) session.add(tx) - # Verify state root if provided - if block_data.get("state_root"): + # Verify state root if provided (skip during bulk import) + if block_data.get("state_root") and not skip_state_root_validation: session.flush() state_manager = StateManager() accounts = session.exec(