Fix chain import/export to handle duplicates, add metadata fields, and improve datetime parsing
Some checks failed
Integration Tests / test-service-integration (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled

- Add chain_id field to Block creation in import_block endpoint
- Remove await from synchronous session.commit in import_block
- Add _serialize_optional_timestamp helper to handle various timestamp formats
- Add _parse_datetime_value helper with proper datetime parsing and error handling
- Add _select_export_blocks to filter duplicate blocks by height during export
- Add _dedupe_import_blocks to filter
This commit is contained in:
aitbc
2026-04-14 12:33:44 +02:00
parent 2db82e3759
commit f9fb3ea053
4 changed files with 474 additions and 143 deletions

View File

@@ -649,6 +649,7 @@ async def import_block(block_data: dict) -> Dict[str, Any]:
with session_scope() as session:
# Create block
block = Block(
chain_id=chain_id,
height=block_data["height"],
hash=block_data["hash"],
parent_hash=block_data["parent_hash"],
@@ -658,7 +659,7 @@ async def import_block(block_data: dict) -> Dict[str, Any]:
tx_count=block_data.get("tx_count", 0)
)
session.add(block)
await session.commit()
session.commit()
return {
"success": True,
@@ -670,24 +671,93 @@ async def import_block(block_data: dict) -> Dict[str, Any]:
_logger.error(f"Error importing block: {e}")
raise HTTPException(status_code=500, detail=f"Failed to import block: {str(e)}")
def _serialize_optional_timestamp(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, str):
return value
if hasattr(value, "isoformat"):
return value.isoformat()
return str(value)
def _parse_datetime_value(value: Any, field_name: str) -> Optional[datetime]:
if value in (None, ""):
return None
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Invalid {field_name}: {value}") from exc
raise HTTPException(status_code=400, detail=f"Invalid {field_name} type: {type(value).__name__}")
def _select_export_blocks(session, chain_id: str) -> List[Block]:
blocks_result = session.execute(
select(Block)
.where(Block.chain_id == chain_id)
.order_by(Block.height.asc(), Block.id.desc())
)
blocks: List[Block] = []
seen_heights = set()
duplicate_count = 0
for block in blocks_result.scalars().all():
if block.height in seen_heights:
duplicate_count += 1
continue
seen_heights.add(block.height)
blocks.append(block)
if duplicate_count:
_logger.warning(f"Filtered {duplicate_count} duplicate exported blocks for chain {chain_id}")
return blocks
def _dedupe_import_blocks(blocks: List[Dict[str, Any]], chain_id: str) -> List[Dict[str, Any]]:
latest_by_height: Dict[int, Dict[str, Any]] = {}
duplicate_count = 0
for block_data in blocks:
if "height" not in block_data:
raise HTTPException(status_code=400, detail="Block height is required")
try:
height = int(block_data["height"])
except (TypeError, ValueError) as exc:
raise HTTPException(status_code=400, detail=f"Invalid block height: {block_data.get('height')}") from exc
block_chain_id = block_data.get("chain_id")
if block_chain_id and block_chain_id != chain_id:
raise HTTPException(
status_code=400,
detail=f"Mismatched block chain_id '{block_chain_id}' for import chain '{chain_id}'",
)
normalized_block = dict(block_data)
normalized_block["height"] = height
normalized_block["chain_id"] = chain_id
if height in latest_by_height:
duplicate_count += 1
latest_by_height[height] = normalized_block
if duplicate_count:
_logger.warning(f"Filtered {duplicate_count} duplicate imported blocks for chain {chain_id}")
return [latest_by_height[height] for height in sorted(latest_by_height)]
@router.get("/export-chain", summary="Export full chain state")
async def export_chain(chain_id: str = None) -> Dict[str, Any]:
"""Export full chain state as JSON for manual synchronization"""
chain_id = get_chain_id(chain_id)
try:
# Use session_scope for database operations
with session_scope() as session:
# Get all blocks - use DISTINCT to prevent duplicates
blocks_result = session.execute(select(Block).distinct().order_by(Block.height))
blocks = list(blocks_result.scalars().all())
blocks = _select_export_blocks(session, chain_id)
# Get all accounts
accounts_result = session.execute(select(Account))
accounts_result = session.execute(
select(Account)
.where(Account.chain_id == chain_id)
.order_by(Account.address)
)
accounts = list(accounts_result.scalars().all())
# Get all transactions
txs_result = session.execute(select(Transaction))
txs_result = session.execute(
select(Transaction)
.where(Transaction.chain_id == chain_id)
.order_by(Transaction.block_height, Transaction.id)
)
transactions = list(txs_result.scalars().all())
# Build export data
@@ -699,13 +769,15 @@ async def export_chain(chain_id: str = None) -> Dict[str, Any]:
"transaction_count": len(transactions),
"blocks": [
{
"chain_id": b.chain_id,
"height": b.height,
"hash": b.hash,
"parent_hash": b.parent_hash,
"proposer": b.proposer,
"timestamp": b.timestamp.isoformat() if b.timestamp else None,
"state_root": b.state_root,
"tx_count": b.tx_count
"tx_count": b.tx_count,
"block_metadata": b.block_metadata,
}
for b in blocks
],
@@ -721,14 +793,19 @@ async def export_chain(chain_id: str = None) -> Dict[str, Any]:
"transactions": [
{
"id": t.id,
"chain_id": t.chain_id,
"tx_hash": t.tx_hash,
"block_height": t.block_height,
"sender": t.sender,
"recipient": t.recipient,
"payload": t.payload,
"value": t.value,
"fee": t.fee,
"nonce": t.nonce,
"timestamp": t.timestamp.isoformat() if t.timestamp else None,
"status": t.status
"timestamp": _serialize_optional_timestamp(t.timestamp),
"status": t.status,
"created_at": t.created_at.isoformat() if t.created_at else None,
"tx_metadata": t.tx_metadata,
}
for t in transactions
]
@@ -739,151 +816,163 @@ async def export_chain(chain_id: str = None) -> Dict[str, Any]:
"export_data": export_data,
"export_size_bytes": len(json.dumps(export_data))
}
except HTTPException:
raise
except Exception as e:
_logger.error(f"Error exporting chain: {e}")
raise HTTPException(status_code=500, detail=f"Failed to export chain: {str(e)}")
@router.post("/import-chain", summary="Import chain state")
async def import_chain(import_data: dict) -> Dict[str, Any]:
"""Import chain state from JSON for manual synchronization"""
try:
chain_id = import_data.get("chain_id")
blocks = import_data.get("blocks", [])
accounts = import_data.get("accounts", [])
transactions = import_data.get("transactions", [])
# If chain_id not in import_data, try to get it from first block
if not chain_id and blocks:
chain_id = blocks[0].get("chain_id")
with session_scope() as session:
# Validate import
if not blocks:
raise HTTPException(status_code=400, detail="No blocks to import")
# Check if database has existing data
existing_blocks = session.execute(select(Block).order_by(Block.height))
existing_count = len(list(existing_blocks.scalars().all()))
if existing_count > 0:
# Backup existing data
backup_data = {
"""Import chain state from JSON for manual synchronization"""
async with _import_lock:
try:
chain_id = import_data.get("chain_id")
blocks = import_data.get("blocks", [])
accounts = import_data.get("accounts", [])
transactions = import_data.get("transactions", [])
if not chain_id and blocks:
chain_id = blocks[0].get("chain_id")
chain_id = get_chain_id(chain_id)
unique_blocks = _dedupe_import_blocks(blocks, chain_id)
with session_scope() as session:
if not unique_blocks:
raise HTTPException(status_code=400, detail="No blocks to import")
existing_blocks = session.execute(
select(Block)
.where(Block.chain_id == chain_id)
.order_by(Block.height)
)
existing_count = len(list(existing_blocks.scalars().all()))
if existing_count > 0:
_logger.info(f"Backing up existing chain with {existing_count} blocks")
_logger.info(f"Clearing existing transactions for chain {chain_id}")
session.execute(delete(Transaction).where(Transaction.chain_id == chain_id))
if accounts:
_logger.info(f"Clearing existing accounts for chain {chain_id}")
session.execute(delete(Account).where(Account.chain_id == chain_id))
_logger.info(f"Clearing existing blocks for chain {chain_id}")
session.execute(delete(Block).where(Block.chain_id == chain_id))
session.commit()
session.expire_all()
_logger.info(f"Importing {len(unique_blocks)} unique blocks (filtered from {len(blocks)} total)")
for block_data in unique_blocks:
block_timestamp = _parse_datetime_value(block_data.get("timestamp"), "block timestamp") or datetime.utcnow()
block = Block(
chain_id=chain_id,
height=block_data["height"],
hash=block_data["hash"],
parent_hash=block_data["parent_hash"],
proposer=block_data["proposer"],
timestamp=block_timestamp,
state_root=block_data.get("state_root"),
tx_count=block_data.get("tx_count", 0),
block_metadata=block_data.get("block_metadata"),
)
session.add(block)
for account_data in accounts:
account_chain_id = account_data.get("chain_id", chain_id)
if account_chain_id != chain_id:
raise HTTPException(
status_code=400,
detail=f"Mismatched account chain_id '{account_chain_id}' for import chain '{chain_id}'",
)
account = Account(
chain_id=account_chain_id,
address=account_data["address"],
balance=account_data["balance"],
nonce=account_data["nonce"],
)
session.add(account)
for tx_data in transactions:
tx_chain_id = tx_data.get("chain_id", chain_id)
if tx_chain_id != chain_id:
raise HTTPException(
status_code=400,
detail=f"Mismatched transaction chain_id '{tx_chain_id}' for import chain '{chain_id}'",
)
tx = Transaction(
id=tx_data.get("id"),
chain_id=tx_chain_id,
tx_hash=str(tx_data.get("tx_hash") or tx_data.get("id") or ""),
block_height=tx_data.get("block_height"),
sender=tx_data["sender"],
recipient=tx_data["recipient"],
payload=tx_data.get("payload", {}),
value=tx_data.get("value", 0),
fee=tx_data.get("fee", 0),
nonce=tx_data.get("nonce", 0),
timestamp=_serialize_optional_timestamp(tx_data.get("timestamp")),
status=tx_data.get("status", "pending"),
tx_metadata=tx_data.get("tx_metadata"),
)
created_at = _parse_datetime_value(tx_data.get("created_at"), "transaction created_at")
if created_at is not None:
tx.created_at = created_at
session.add(tx)
session.commit()
return {
"success": True,
"imported_blocks": len(unique_blocks),
"imported_accounts": len(accounts),
"imported_transactions": len(transactions),
"chain_id": chain_id,
"backup_timestamp": datetime.now().isoformat(),
"existing_block_count": existing_count
"message": f"Successfully imported {len(unique_blocks)} blocks",
}
_logger.info(f"Backing up existing chain with {existing_count} blocks")
# Clear existing data - only clear accounts if we have accounts to import
_logger.info(f"Clearing existing blocks for chain {chain_id}")
session.execute(delete(Block).where(Block.chain_id == chain_id))
if accounts:
_logger.info(f"Clearing existing accounts for chain {chain_id}")
session.execute(delete(Account).where(Account.chain_id == chain_id))
_logger.info(f"Clearing existing transactions for chain {chain_id}")
session.execute(delete(Transaction).where(Transaction.chain_id == chain_id))
session.commit() # Commit all deletes before imports
# Import blocks - filter duplicates by height to avoid UNIQUE constraint violations
seen_heights = set()
unique_blocks = []
for block_data in blocks:
if block_data["height"] not in seen_heights:
seen_heights.add(block_data["height"])
unique_blocks.append(block_data)
else:
_logger.warning(f"Skipping duplicate block at height {block_data['height']}")
_logger.info(f"Importing {len(unique_blocks)} unique blocks (filtered from {len(blocks)} total)")
for block_data in unique_blocks:
block = Block(
chain_id=chain_id,
height=block_data["height"],
hash=block_data["hash"],
parent_hash=block_data["parent_hash"],
proposer=block_data["proposer"],
timestamp=datetime.fromisoformat(block_data["timestamp"]) if block_data["timestamp"] else None,
state_root=block_data.get("state_root"),
tx_count=block_data["tx_count"],
block_metadata=block_data.get("block_metadata")
)
session.add(block)
# Import accounts
for account_data in accounts:
account = Account(
chain_id=account_data.get("chain_id", chain_id),
address=account_data["address"],
balance=account_data["balance"],
nonce=account_data["nonce"]
)
session.add(account)
# Import transactions
for tx_data in transactions:
tx = Transaction(
id=tx_data["id"],
block_height=tx_data["block_height"],
sender=tx_data["sender"],
recipient=tx_data["recipient"],
value=tx_data["value"],
fee=tx_data["fee"],
nonce=tx_data["nonce"],
timestamp=datetime.fromisoformat(tx_data["timestamp"]) if tx_data["timestamp"] else None,
status=tx_data["status"]
)
session.add(tx)
session.commit()
return {
"success": True,
"imported_blocks": len(blocks),
"imported_accounts": len(accounts),
"imported_transactions": len(transactions),
"chain_id": chain_id,
"message": f"Successfully imported {len(blocks)} blocks"
}
except Exception as e:
_logger.error(f"Error importing chain: {e}")
raise HTTPException(status_code=500, detail=f"Failed to import chain: {str(e)}")
except HTTPException:
raise
except Exception as e:
_logger.error(f"Error importing chain: {e}")
raise HTTPException(status_code=500, detail=f"Failed to import chain: {str(e)}")
@router.post("/force-sync", summary="Force reorg to specified peer")
async def force_sync(peer_data: dict) -> Dict[str, Any]:
"""Force blockchain reorganization to sync with specified peer"""
try:
peer_url = peer_data.get("peer_url")
target_height = peer_data.get("target_height")
if not peer_url:
raise HTTPException(status_code=400, detail="peer_url is required")
# Fetch peer's chain state
import requests
response = requests.get(f"{peer_url}/rpc/export-chain", timeout=30)
if response.status_code != 200:
raise HTTPException(status_code=400, detail=f"Failed to fetch peer chain: {response.status_code}")
peer_chain_data = response.json()
peer_blocks = peer_chain_data["export_data"]["blocks"]
if target_height and len(peer_blocks) < target_height:
raise HTTPException(status_code=400, detail=f"Peer only has {len(peer_blocks)} blocks, cannot sync to height {target_height}")
# Import peer's chain
import_result = await import_chain(peer_chain_data["export_data"])
return {
"""Force blockchain reorganization to sync with specified peer"""
try:
peer_url = peer_data.get("peer_url")
target_height = peer_data.get("target_height")
if not peer_url:
raise HTTPException(status_code=400, detail="peer_url is required")
import requests
response = requests.get(f"{peer_url}/rpc/export-chain", timeout=30)
if response.status_code != 200:
raise HTTPException(status_code=400, detail=f"Failed to fetch peer chain: {response.status_code}")
peer_chain_data = response.json()
peer_blocks = peer_chain_data["export_data"]["blocks"]
if target_height and len(peer_blocks) < target_height:
raise HTTPException(status_code=400, detail=f"Peer only has {len(peer_blocks)} blocks, cannot sync to height {target_height}")
import_result = await import_chain(peer_chain_data["export_data"])
return {
"success": True,
"synced_from": peer_url,
"synced_blocks": import_result["imported_blocks"],
"target_height": target_height or import_result["imported_blocks"],
"message": f"Successfully synced with peer {peer_url}"
}
except Exception as e:
_logger.error(f"Error forcing sync: {e}")
raise HTTPException(status_code=500, detail=f"Failed to force sync: {str(e)}")
except HTTPException:
raise
except Exception as e:
_logger.error(f"Error forcing sync: {e}")
raise HTTPException(status_code=500, detail=f"Failed to force sync: {str(e)}")

View File

@@ -16,6 +16,7 @@ from sqlmodel import Session, select
from .config import settings
from .logger import get_logger
from .state.merkle_patricia_trie import StateManager
from .state.state_transition import get_state_transition
from .metrics import metrics_registry
from .models import Block, Account
from aitbc_chain.models import Transaction as ChainTransaction