From 79516a4388e7328e28517184916defba93b1ff02 Mon Sep 17 00:00:00 2001 From: aitbc Date: Mon, 13 Apr 2026 22:41:28 +0200 Subject: [PATCH] Add manual chain export/import RPC endpoints and fix chain_id in wallet adapter - Add /export-chain endpoint to export full blockchain state (blocks, accounts, transactions) as JSON - Add /import-chain endpoint to import chain state with backup and data clearing - Add /force-sync endpoint to trigger reorg by fetching and importing peer's chain state - Fix duplicate import_block implementation (remove redundant async with _import_lock block) - Fix wallet adapter to use chain_id=ait-testnet instead of ait --- .../src/aitbc_chain/rpc/router.py | 238 ++++++++++++++++++ cli/utils/dual_mode_wallet_adapter.py | 2 +- 2 files changed, 239 insertions(+), 1 deletion(-) diff --git a/apps/blockchain-node/src/aitbc_chain/rpc/router.py b/apps/blockchain-node/src/aitbc_chain/rpc/router.py index 687728e8..2f9ebb57 100755 --- a/apps/blockchain-node/src/aitbc_chain/rpc/router.py +++ b/apps/blockchain-node/src/aitbc_chain/rpc/router.py @@ -625,6 +625,244 @@ async def moderate_message(message_id: str, moderation_data: dict) -> Dict[str, async def import_block(block_data: dict) -> Dict[str, Any]: """Import a block into the blockchain""" global _last_import_time + + async with _import_lock: + try: + # Rate limiting: max 1 import per second + current_time = time.time() + time_since_last = current_time - _last_import_time + if time_since_last < 1.0: + await asyncio.sleep(1.0 - time_since_last) + + _last_import_time = time.time() + + chain_id = block_data.get("chain_id") or block_data.get("chainId") or get_chain_id(None) + + timestamp = block_data.get("timestamp") + if isinstance(timestamp, str): + try: + timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) + except ValueError: + timestamp = datetime.utcnow() + elif timestamp is None: + timestamp = datetime.utcnow() + + with session_scope() as session: + # Create block + block = Block( + height=block_data["height"], + hash=block_data["hash"], + parent_hash=block_data["parent_hash"], + proposer=block_data["proposer"], + timestamp=timestamp, + state_root=block_data.get("state_root"), + tx_count=block_data.get("tx_count", 0) + ) + session.add(block) + await session.commit() + + return { + "success": True, + "block_height": block.height, + "block_hash": block.hash, + "chain_id": chain_id + } + except Exception as e: + _logger.error(f"Error importing block: {e}") + raise HTTPException(status_code=500, detail=f"Failed to import block: {str(e)}") + +@router.get("/export-chain", summary="Export full chain state") +async def export_chain(chain_id: str = None) -> Dict[str, Any]: + """Export full chain state as JSON for manual synchronization""" + chain_id = get_chain_id(chain_id) + + try: + with session_scope() as session: + # Get all blocks + blocks_result = await session.execute(select(Block).order_by(Block.height)) + blocks = blocks_result.scalars().all() + + # Get all accounts + accounts_result = await session.execute(select(Account)) + accounts = accounts_result.scalars().all() + + # Get all transactions + txs_result = await session.execute(select(Transaction)) + transactions = txs_result.scalars().all() + + # Build export data + export_data = { + "chain_id": chain_id, + "export_timestamp": datetime.now().isoformat(), + "block_count": len(blocks), + "account_count": len(accounts), + "transaction_count": len(transactions), + "blocks": [ + { + "height": b.height, + "hash": b.hash, + "parent_hash": b.parent_hash, + "proposer": b.proposer, + "timestamp": b.timestamp.isoformat() if b.timestamp else None, + "state_root": b.state_root, + "tx_count": b.tx_count + } + for b in blocks + ], + "accounts": [ + { + "address": a.address, + "balance": a.balance, + "nonce": a.nonce + } + for a in accounts + ], + "transactions": [ + { + "id": t.id, + "block_height": t.block_height, + "sender": t.sender, + "recipient": t.recipient, + "value": t.value, + "fee": t.fee, + "nonce": t.nonce, + "timestamp": t.timestamp.isoformat() if t.timestamp else None, + "status": t.status + } + for t in transactions + ] + } + + return { + "success": True, + "export_data": export_data, + "export_size_bytes": len(json.dumps(export_data)) + } + except Exception as e: + _logger.error(f"Error exporting chain: {e}") + raise HTTPException(status_code=500, detail=f"Failed to export chain: {str(e)}") + +@router.post("/import-chain", summary="Import chain state") +async def import_chain(import_data: dict) -> Dict[str, Any]: + """Import chain state from JSON for manual synchronization""" + try: + chain_id = import_data.get("chain_id") + blocks = import_data.get("blocks", []) + accounts = import_data.get("accounts", []) + transactions = import_data.get("transactions", []) + + with session_scope() as session: + # Validate import + if not blocks: + raise HTTPException(status_code=400, detail="No blocks to import") + + # Check if database has existing data + existing_blocks = await session.execute(select(Block).order_by(Block.height)) + existing_count = len(existing_blocks.scalars().all()) + + if existing_count > 0: + # Backup existing data + backup_data = { + "chain_id": chain_id, + "backup_timestamp": datetime.now().isoformat(), + "existing_block_count": existing_count + } + _logger.info(f"Backing up existing chain with {existing_count} blocks") + + # Clear existing data + await session.execute(select(Block).delete()) + await session.execute(select(Account).delete()) + await session.execute(select(Transaction).delete()) + + # Import blocks + for block_data in blocks: + block = Block( + height=block_data["height"], + hash=block_data["hash"], + parent_hash=block_data["parent_hash"], + proposer=block_data["proposer"], + timestamp=datetime.fromisoformat(block_data["timestamp"]) if block_data["timestamp"] else None, + state_root=block_data.get("state_root"), + tx_count=block_data["tx_count"] + ) + session.add(block) + + # Import accounts + for account_data in accounts: + account = Account( + address=account_data["address"], + balance=account_data["balance"], + nonce=account_data["nonce"] + ) + session.add(account) + + # Import transactions + for tx_data in transactions: + tx = Transaction( + id=tx_data["id"], + block_height=tx_data["block_height"], + sender=tx_data["sender"], + recipient=tx_data["recipient"], + value=tx_data["value"], + fee=tx_data["fee"], + nonce=tx_data["nonce"], + timestamp=datetime.fromisoformat(tx_data["timestamp"]) if tx_data["timestamp"] else None, + status=tx_data["status"] + ) + session.add(tx) + + await session.commit() + + return { + "success": True, + "imported_blocks": len(blocks), + "imported_accounts": len(accounts), + "imported_transactions": len(transactions), + "chain_id": chain_id, + "message": f"Successfully imported {len(blocks)} blocks" + } + + except Exception as e: + _logger.error(f"Error importing chain: {e}") + raise HTTPException(status_code=500, detail=f"Failed to import chain: {str(e)}") + +@router.post("/force-sync", summary="Force reorg to specified peer") +async def force_sync(peer_data: dict) -> Dict[str, Any]: + """Force blockchain reorganization to sync with specified peer""" + try: + peer_url = peer_data.get("peer_url") + target_height = peer_data.get("target_height") + + if not peer_url: + raise HTTPException(status_code=400, detail="peer_url is required") + + # Fetch peer's chain state + import requests + response = requests.get(f"{peer_url}/rpc/export-chain", timeout=30) + + if response.status_code != 200: + raise HTTPException(status_code=400, detail=f"Failed to fetch peer chain: {response.status_code}") + + peer_chain_data = response.json() + peer_blocks = peer_chain_data["export_data"]["blocks"] + + if target_height and len(peer_blocks) < target_height: + raise HTTPException(status_code=400, detail=f"Peer only has {len(peer_blocks)} blocks, cannot sync to height {target_height}") + + # Import peer's chain + import_result = await import_chain(peer_chain_data["export_data"]) + + return { + "success": True, + "synced_from": peer_url, + "synced_blocks": import_result["imported_blocks"], + "target_height": target_height or import_result["imported_blocks"], + "message": f"Successfully synced with peer {peer_url}" + } + + except Exception as e: + _logger.error(f"Error forcing sync: {e}") + raise HTTPException(status_code=500, detail=f"Failed to force sync: {str(e)}") async with _import_lock: try: diff --git a/cli/utils/dual_mode_wallet_adapter.py b/cli/utils/dual_mode_wallet_adapter.py index bd47cd51..a33f3768 100755 --- a/cli/utils/dual_mode_wallet_adapter.py +++ b/cli/utils/dual_mode_wallet_adapter.py @@ -311,7 +311,7 @@ class DualModeWalletAdapter: rpc_url = self.config.blockchain_rpc_url try: - resp = httpx.get(f"{rpc_url}/rpc/getBalance/{from_address}?chain_id=ait-mainnet", timeout=5) + resp = httpx.get(f"{rpc_url}/rpc/account/{from_address}?chain_id=ait-testnet", timeout=5) if resp.status_code == 200: data = resp.json() chain_balance = data.get("balance", 0)