From 2db82e37595d23c5c2bb4d19d674b2fdb3a23401 Mon Sep 17 00:00:00 2001 From: aitbc Date: Tue, 14 Apr 2026 09:20:32 +0200 Subject: [PATCH] Add duplicate block filtering and chain-scoped deletion to chain import/export - Add DISTINCT clause to block query in export_chain to prevent duplicate blocks in export - Add chain_id filter to all delete operations in import_chain (blocks, accounts, transactions) - Add commit after deletions before starting imports to ensure clean state - Add duplicate block filtering by height before import to prevent UNIQUE constraint violations - Add logging for deletion operations showing chain_id being cleared --- .../src/aitbc_chain/rpc/router.py | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/apps/blockchain-node/src/aitbc_chain/rpc/router.py b/apps/blockchain-node/src/aitbc_chain/rpc/router.py index eda8dc8f..386b4810 100755 --- a/apps/blockchain-node/src/aitbc_chain/rpc/router.py +++ b/apps/blockchain-node/src/aitbc_chain/rpc/router.py @@ -678,8 +678,8 @@ async def export_chain(chain_id: str = None) -> Dict[str, Any]: try: # Use session_scope for database operations with session_scope() as session: - # Get all blocks - blocks_result = session.execute(select(Block).order_by(Block.height)) + # Get all blocks - use DISTINCT to prevent duplicates + blocks_result = session.execute(select(Block).distinct().order_by(Block.height)) blocks = list(blocks_result.scalars().all()) # Get all accounts @@ -775,13 +775,28 @@ async def import_chain(import_data: dict) -> Dict[str, Any]: _logger.info(f"Backing up existing chain with {existing_count} blocks") # Clear existing data - only clear accounts if we have accounts to import - session.execute(delete(Block)) + _logger.info(f"Clearing existing blocks for chain {chain_id}") + session.execute(delete(Block).where(Block.chain_id == chain_id)) if accounts: - session.execute(delete(Account)) - session.execute(delete(Transaction)) + _logger.info(f"Clearing existing accounts for chain {chain_id}") + session.execute(delete(Account).where(Account.chain_id == chain_id)) + _logger.info(f"Clearing existing transactions for chain {chain_id}") + session.execute(delete(Transaction).where(Transaction.chain_id == chain_id)) + session.commit() # Commit all deletes before imports - # Import blocks + # Import blocks - filter duplicates by height to avoid UNIQUE constraint violations + seen_heights = set() + unique_blocks = [] for block_data in blocks: + if block_data["height"] not in seen_heights: + seen_heights.add(block_data["height"]) + unique_blocks.append(block_data) + else: + _logger.warning(f"Skipping duplicate block at height {block_data['height']}") + + _logger.info(f"Importing {len(unique_blocks)} unique blocks (filtered from {len(blocks)} total)") + + for block_data in unique_blocks: block = Block( chain_id=chain_id, height=block_data["height"], @@ -790,7 +805,8 @@ async def import_chain(import_data: dict) -> Dict[str, Any]: proposer=block_data["proposer"], timestamp=datetime.fromisoformat(block_data["timestamp"]) if block_data["timestamp"] else None, state_root=block_data.get("state_root"), - tx_count=block_data["tx_count"] + tx_count=block_data["tx_count"], + block_metadata=block_data.get("block_metadata") ) session.add(block)