From d7fb2eae95a107c797fa374a23fb4c71b8d51e91 Mon Sep 17 00:00:00 2001 From: aitbc Date: Mon, 13 Apr 2026 23:20:04 +0200 Subject: [PATCH] Fix async/sync mismatch in chain export/import and remove duplicate import_block code - Replace manual session creation with session_scope() in export_chain - Convert query results to lists to avoid lazy loading issues after session close - Remove await from session.execute calls in import_chain (use synchronous queries) - Change force_sync back to async and restore await on import_chain call - Remove 69 lines of duplicate import_block implementation at end of file --- .../src/aitbc_chain/rpc/router.py | 98 +++---------------- 1 file changed, 12 insertions(+), 86 deletions(-) diff --git a/apps/blockchain-node/src/aitbc_chain/rpc/router.py b/apps/blockchain-node/src/aitbc_chain/rpc/router.py index 8d0f47dd..49ee5254 100755 --- a/apps/blockchain-node/src/aitbc_chain/rpc/router.py +++ b/apps/blockchain-node/src/aitbc_chain/rpc/router.py @@ -677,27 +677,19 @@ async def export_chain(chain_id: str = None) -> Dict[str, Any]: chain_id = get_chain_id(chain_id) try: - # Use synchronous database operations in async context - from sqlalchemy import create_engine - from sqlalchemy.orm import sessionmaker - from ..database import _db_path - - engine = create_engine(f"sqlite:///{_db_path}") - Session = sessionmaker(bind=engine) - session = Session() - - try: + # Use session_scope for database operations + with session_scope() as session: # Get all blocks blocks_result = session.execute(select(Block).order_by(Block.height)) - blocks = blocks_result.scalars().all() + blocks = list(blocks_result.scalars().all()) # Get all accounts accounts_result = session.execute(select(Account)) - accounts = accounts_result.scalars().all() + accounts = list(accounts_result.scalars().all()) # Get all transactions txs_result = session.execute(select(Transaction)) - transactions = txs_result.scalars().all() + transactions = list(txs_result.scalars().all()) # Build export data export_data = { @@ -768,8 +760,8 @@ async def import_chain(import_data: dict) -> Dict[str, Any]: raise HTTPException(status_code=400, detail="No blocks to import") # Check if database has existing data - existing_blocks = await session.execute(select(Block).order_by(Block.height)) - existing_count = len(existing_blocks.scalars().all()) + existing_blocks = session.execute(select(Block).order_by(Block.height)) + existing_count = len(list(existing_blocks.scalars().all())) if existing_count > 0: # Backup existing data @@ -781,9 +773,9 @@ async def import_chain(import_data: dict) -> Dict[str, Any]: _logger.info(f"Backing up existing chain with {existing_count} blocks") # Clear existing data - await session.execute(select(Block).delete()) - await session.execute(select(Account).delete()) - await session.execute(select(Transaction).delete()) + session.execute(select(Block).delete()) + session.execute(select(Account).delete()) + session.execute(select(Transaction).delete()) # Import blocks for block_data in blocks: @@ -838,7 +830,7 @@ async def import_chain(import_data: dict) -> Dict[str, Any]: raise HTTPException(status_code=500, detail=f"Failed to import chain: {str(e)}") @router.post("/force-sync", summary="Force reorg to specified peer") -def force_sync(peer_data: dict) -> Dict[str, Any]: +async def force_sync(peer_data: dict) -> Dict[str, Any]: """Force blockchain reorganization to sync with specified peer""" try: peer_url = peer_data.get("peer_url") @@ -861,7 +853,7 @@ def force_sync(peer_data: dict) -> Dict[str, Any]: raise HTTPException(status_code=400, detail=f"Peer only has {len(peer_blocks)} blocks, cannot sync to height {target_height}") # Import peer's chain - import_result = import_chain(peer_chain_data["export_data"]) + import_result = await import_chain(peer_chain_data["export_data"]) return { "success": True, @@ -874,69 +866,3 @@ def force_sync(peer_data: dict) -> Dict[str, Any]: except Exception as e: _logger.error(f"Error forcing sync: {e}") raise HTTPException(status_code=500, detail=f"Failed to force sync: {str(e)}") - - async with _import_lock: - try: - # Rate limiting: max 1 import per second - current_time = time.time() - time_since_last = current_time - _last_import_time - if False: # time_since_last < 1.0: # 1 second minimum between imports - await asyncio.sleep(1.0 - time_since_last) - - _last_import_time = time.time() - - chain_id = block_data.get("chain_id") or block_data.get("chainId") or get_chain_id(None) - - timestamp = block_data.get("timestamp") - if isinstance(timestamp, str): - try: - timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) - except ValueError: - timestamp = datetime.utcnow() - elif timestamp is None: - timestamp = datetime.utcnow() - - height = block_data.get("number") or block_data.get("height") - if height is None: - raise ValueError("Block height is required") - - transactions = block_data.get("transactions", []) - normalized_block = { - "chain_id": chain_id, - "height": int(height), - "hash": block_data.get("hash"), - "parent_hash": block_data.get("parent_hash") or block_data.get("parentHash", ""), - "proposer": block_data.get("proposer") or block_data.get("miner", ""), - "timestamp": timestamp.isoformat() if isinstance(timestamp, datetime) else timestamp, - "tx_count": block_data.get("tx_count", len(transactions)), - "state_root": block_data.get("state_root") or block_data.get("stateRoot"), - } - - from ..config import settings as cfg - sync = ChainSync( - session_factory=session_scope, - chain_id=chain_id, - validate_signatures=cfg.sync_validate_signatures, - ) - result = sync.import_block(normalized_block, transactions=transactions) - - if result.accepted: - _logger.info(f"Successfully imported block {result.height}") - metrics_registry.increment("blocks_imported_total") - - return { - "success": result.accepted, - "accepted": result.accepted, - "block_number": result.height, - "block_hash": result.block_hash, - "chain_id": chain_id, - "reason": result.reason, - } - - except Exception as e: - _logger.error(f"Failed to import block: {e}") - metrics_registry.increment("block_import_errors_total") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to import block: {str(e)}" - )