Add manual chain export/import RPC endpoints and fix chain_id in wallet adapter
- Add /export-chain endpoint to export full blockchain state (blocks, accounts, transactions) as JSON - Add /import-chain endpoint to import chain state with backup and data clearing - Add /force-sync endpoint to trigger reorg by fetching and importing peer's chain state - Fix duplicate import_block implementation (remove redundant async with _import_lock block) - Fix wallet adapter to use chain_id=ait-testnet instead of ait
This commit is contained in:
@@ -625,6 +625,244 @@ async def moderate_message(message_id: str, moderation_data: dict) -> Dict[str,
|
||||
async def import_block(block_data: dict) -> Dict[str, Any]:
|
||||
"""Import a block into the blockchain"""
|
||||
global _last_import_time
|
||||
|
||||
async with _import_lock:
|
||||
try:
|
||||
# Rate limiting: max 1 import per second
|
||||
current_time = time.time()
|
||||
time_since_last = current_time - _last_import_time
|
||||
if time_since_last < 1.0:
|
||||
await asyncio.sleep(1.0 - time_since_last)
|
||||
|
||||
_last_import_time = time.time()
|
||||
|
||||
chain_id = block_data.get("chain_id") or block_data.get("chainId") or get_chain_id(None)
|
||||
|
||||
timestamp = block_data.get("timestamp")
|
||||
if isinstance(timestamp, str):
|
||||
try:
|
||||
timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
|
||||
except ValueError:
|
||||
timestamp = datetime.utcnow()
|
||||
elif timestamp is None:
|
||||
timestamp = datetime.utcnow()
|
||||
|
||||
with session_scope() as session:
|
||||
# Create block
|
||||
block = Block(
|
||||
height=block_data["height"],
|
||||
hash=block_data["hash"],
|
||||
parent_hash=block_data["parent_hash"],
|
||||
proposer=block_data["proposer"],
|
||||
timestamp=timestamp,
|
||||
state_root=block_data.get("state_root"),
|
||||
tx_count=block_data.get("tx_count", 0)
|
||||
)
|
||||
session.add(block)
|
||||
await session.commit()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"block_height": block.height,
|
||||
"block_hash": block.hash,
|
||||
"chain_id": chain_id
|
||||
}
|
||||
except Exception as e:
|
||||
_logger.error(f"Error importing block: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to import block: {str(e)}")
|
||||
|
||||
@router.get("/export-chain", summary="Export full chain state")
|
||||
async def export_chain(chain_id: str = None) -> Dict[str, Any]:
|
||||
"""Export full chain state as JSON for manual synchronization"""
|
||||
chain_id = get_chain_id(chain_id)
|
||||
|
||||
try:
|
||||
with session_scope() as session:
|
||||
# Get all blocks
|
||||
blocks_result = await session.execute(select(Block).order_by(Block.height))
|
||||
blocks = blocks_result.scalars().all()
|
||||
|
||||
# Get all accounts
|
||||
accounts_result = await session.execute(select(Account))
|
||||
accounts = accounts_result.scalars().all()
|
||||
|
||||
# Get all transactions
|
||||
txs_result = await session.execute(select(Transaction))
|
||||
transactions = txs_result.scalars().all()
|
||||
|
||||
# Build export data
|
||||
export_data = {
|
||||
"chain_id": chain_id,
|
||||
"export_timestamp": datetime.now().isoformat(),
|
||||
"block_count": len(blocks),
|
||||
"account_count": len(accounts),
|
||||
"transaction_count": len(transactions),
|
||||
"blocks": [
|
||||
{
|
||||
"height": b.height,
|
||||
"hash": b.hash,
|
||||
"parent_hash": b.parent_hash,
|
||||
"proposer": b.proposer,
|
||||
"timestamp": b.timestamp.isoformat() if b.timestamp else None,
|
||||
"state_root": b.state_root,
|
||||
"tx_count": b.tx_count
|
||||
}
|
||||
for b in blocks
|
||||
],
|
||||
"accounts": [
|
||||
{
|
||||
"address": a.address,
|
||||
"balance": a.balance,
|
||||
"nonce": a.nonce
|
||||
}
|
||||
for a in accounts
|
||||
],
|
||||
"transactions": [
|
||||
{
|
||||
"id": t.id,
|
||||
"block_height": t.block_height,
|
||||
"sender": t.sender,
|
||||
"recipient": t.recipient,
|
||||
"value": t.value,
|
||||
"fee": t.fee,
|
||||
"nonce": t.nonce,
|
||||
"timestamp": t.timestamp.isoformat() if t.timestamp else None,
|
||||
"status": t.status
|
||||
}
|
||||
for t in transactions
|
||||
]
|
||||
}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"export_data": export_data,
|
||||
"export_size_bytes": len(json.dumps(export_data))
|
||||
}
|
||||
except Exception as e:
|
||||
_logger.error(f"Error exporting chain: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to export chain: {str(e)}")
|
||||
|
||||
@router.post("/import-chain", summary="Import chain state")
|
||||
async def import_chain(import_data: dict) -> Dict[str, Any]:
|
||||
"""Import chain state from JSON for manual synchronization"""
|
||||
try:
|
||||
chain_id = import_data.get("chain_id")
|
||||
blocks = import_data.get("blocks", [])
|
||||
accounts = import_data.get("accounts", [])
|
||||
transactions = import_data.get("transactions", [])
|
||||
|
||||
with session_scope() as session:
|
||||
# Validate import
|
||||
if not blocks:
|
||||
raise HTTPException(status_code=400, detail="No blocks to import")
|
||||
|
||||
# Check if database has existing data
|
||||
existing_blocks = await session.execute(select(Block).order_by(Block.height))
|
||||
existing_count = len(existing_blocks.scalars().all())
|
||||
|
||||
if existing_count > 0:
|
||||
# Backup existing data
|
||||
backup_data = {
|
||||
"chain_id": chain_id,
|
||||
"backup_timestamp": datetime.now().isoformat(),
|
||||
"existing_block_count": existing_count
|
||||
}
|
||||
_logger.info(f"Backing up existing chain with {existing_count} blocks")
|
||||
|
||||
# Clear existing data
|
||||
await session.execute(select(Block).delete())
|
||||
await session.execute(select(Account).delete())
|
||||
await session.execute(select(Transaction).delete())
|
||||
|
||||
# Import blocks
|
||||
for block_data in blocks:
|
||||
block = Block(
|
||||
height=block_data["height"],
|
||||
hash=block_data["hash"],
|
||||
parent_hash=block_data["parent_hash"],
|
||||
proposer=block_data["proposer"],
|
||||
timestamp=datetime.fromisoformat(block_data["timestamp"]) if block_data["timestamp"] else None,
|
||||
state_root=block_data.get("state_root"),
|
||||
tx_count=block_data["tx_count"]
|
||||
)
|
||||
session.add(block)
|
||||
|
||||
# Import accounts
|
||||
for account_data in accounts:
|
||||
account = Account(
|
||||
address=account_data["address"],
|
||||
balance=account_data["balance"],
|
||||
nonce=account_data["nonce"]
|
||||
)
|
||||
session.add(account)
|
||||
|
||||
# Import transactions
|
||||
for tx_data in transactions:
|
||||
tx = Transaction(
|
||||
id=tx_data["id"],
|
||||
block_height=tx_data["block_height"],
|
||||
sender=tx_data["sender"],
|
||||
recipient=tx_data["recipient"],
|
||||
value=tx_data["value"],
|
||||
fee=tx_data["fee"],
|
||||
nonce=tx_data["nonce"],
|
||||
timestamp=datetime.fromisoformat(tx_data["timestamp"]) if tx_data["timestamp"] else None,
|
||||
status=tx_data["status"]
|
||||
)
|
||||
session.add(tx)
|
||||
|
||||
await session.commit()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"imported_blocks": len(blocks),
|
||||
"imported_accounts": len(accounts),
|
||||
"imported_transactions": len(transactions),
|
||||
"chain_id": chain_id,
|
||||
"message": f"Successfully imported {len(blocks)} blocks"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
_logger.error(f"Error importing chain: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to import chain: {str(e)}")
|
||||
|
||||
@router.post("/force-sync", summary="Force reorg to specified peer")
|
||||
async def force_sync(peer_data: dict) -> Dict[str, Any]:
|
||||
"""Force blockchain reorganization to sync with specified peer"""
|
||||
try:
|
||||
peer_url = peer_data.get("peer_url")
|
||||
target_height = peer_data.get("target_height")
|
||||
|
||||
if not peer_url:
|
||||
raise HTTPException(status_code=400, detail="peer_url is required")
|
||||
|
||||
# Fetch peer's chain state
|
||||
import requests
|
||||
response = requests.get(f"{peer_url}/rpc/export-chain", timeout=30)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise HTTPException(status_code=400, detail=f"Failed to fetch peer chain: {response.status_code}")
|
||||
|
||||
peer_chain_data = response.json()
|
||||
peer_blocks = peer_chain_data["export_data"]["blocks"]
|
||||
|
||||
if target_height and len(peer_blocks) < target_height:
|
||||
raise HTTPException(status_code=400, detail=f"Peer only has {len(peer_blocks)} blocks, cannot sync to height {target_height}")
|
||||
|
||||
# Import peer's chain
|
||||
import_result = await import_chain(peer_chain_data["export_data"])
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"synced_from": peer_url,
|
||||
"synced_blocks": import_result["imported_blocks"],
|
||||
"target_height": target_height or import_result["imported_blocks"],
|
||||
"message": f"Successfully synced with peer {peer_url}"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
_logger.error(f"Error forcing sync: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to force sync: {str(e)}")
|
||||
|
||||
async with _import_lock:
|
||||
try:
|
||||
|
||||
@@ -311,7 +311,7 @@ class DualModeWalletAdapter:
|
||||
|
||||
rpc_url = self.config.blockchain_rpc_url
|
||||
try:
|
||||
resp = httpx.get(f"{rpc_url}/rpc/getBalance/{from_address}?chain_id=ait-mainnet", timeout=5)
|
||||
resp = httpx.get(f"{rpc_url}/rpc/account/{from_address}?chain_id=ait-testnet", timeout=5)
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
chain_balance = data.get("balance", 0)
|
||||
|
||||
Reference in New Issue
Block a user