Files
aitbc/apps/blockchain-node/src/aitbc_chain/rpc/router.py
aitbc d5a03acabb
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Successful in 3s
Integration Tests / test-service-integration (push) Failing after 10s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 2s
P2P Network Verification / p2p-verification (push) Successful in 2s
Python Tests / test-python (push) Successful in 14s
Security Scanning / security-scan (push) Successful in 24s
Remove debug logging from router.py and poa.py
2026-04-22 13:42:08 +02:00

1033 lines
41 KiB
Python
Executable File

from __future__ import annotations
import asyncio
import json
import time
from typing import Any, Dict, Optional, List
from datetime import datetime, timedelta
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, Field, model_validator
from sqlmodel import select, delete
from ..database import session_scope
from ..gossip import gossip_broker
from ..mempool import get_mempool
from ..metrics import metrics_registry
from ..models import Account, Block, Receipt, Transaction
from ..logger import get_logger
from ..sync import ChainSync
from ..contracts.agent_messaging_contract import messaging_contract
_logger = get_logger(__name__)
router = APIRouter()
# Global rate limiter for importBlock
_last_import_time = 0
_import_lock = asyncio.Lock()
# Global variable to store the PoA proposer
_poa_proposers: Dict[str, Any] = {}
def set_poa_proposer(proposer, chain_id: str = None):
"""Set the global PoA proposer instance"""
if chain_id is None:
chain_id = getattr(getattr(proposer, "_config", None), "chain_id", None) or get_chain_id(None)
_poa_proposers[chain_id] = proposer
def get_poa_proposer(chain_id: str = None):
"""Get the global PoA proposer instance"""
chain_id = get_chain_id(chain_id)
return _poa_proposers.get(chain_id)
def get_chain_id(chain_id: str = None) -> str:
"""Get chain_id from parameter or use default from settings"""
if chain_id is None:
from ..config import settings
return settings.chain_id
return chain_id
def get_supported_chains() -> List[str]:
from ..config import settings
chains = [chain.strip() for chain in settings.supported_chains.split(",") if chain.strip()]
if not chains and settings.chain_id:
return [settings.chain_id]
return chains
def _normalize_transaction_data(tx_data: Dict[str, Any], chain_id: str) -> Dict[str, Any]:
sender = tx_data.get("from")
recipient = tx_data.get("to")
if not isinstance(sender, str) or not sender.strip():
raise ValueError("transaction.from is required")
if not isinstance(recipient, str) or not recipient.strip():
raise ValueError("transaction.to is required")
try:
amount = int(tx_data["amount"])
except KeyError as exc:
raise ValueError("transaction.amount is required") from exc
except (TypeError, ValueError) as exc:
raise ValueError("transaction.amount must be an integer") from exc
try:
fee = int(tx_data.get("fee", 10))
except (TypeError, ValueError) as exc:
raise ValueError("transaction.fee must be an integer") from exc
try:
nonce = int(tx_data.get("nonce", 0))
except (TypeError, ValueError) as exc:
raise ValueError("transaction.nonce must be an integer") from exc
if amount < 0:
raise ValueError("transaction.amount must be non-negative")
if fee < 0:
raise ValueError("transaction.fee must be non-negative")
if nonce < 0:
raise ValueError("transaction.nonce must be non-negative")
payload = tx_data.get("payload", {})
if payload is None:
payload = {}
tx_type = tx_data.get("type", "TRANSFER")
if tx_type:
tx_type = tx_type.upper()
# Ensure payload is a dict
if isinstance(payload, str):
try:
import json
payload = json.loads(payload)
except:
payload = {}
if not isinstance(payload, dict):
payload = {}
return {
"chain_id": chain_id,
"type": tx_type,
"from": sender.strip(),
"to": recipient.strip(),
"amount": amount,
"fee": fee,
"nonce": nonce,
"payload": payload,
"signature": tx_data.get("signature") or tx_data.get("sig"),
}
def _validate_transaction_admission(tx_data: Dict[str, Any], mempool: Any) -> None:
from ..mempool import compute_tx_hash
chain_id = tx_data["chain_id"]
supported_chains = get_supported_chains()
if not chain_id:
raise ValueError("transaction.chain_id is required")
if supported_chains and chain_id not in supported_chains:
raise ValueError(f"unsupported chain_id '{chain_id}'. Supported chains: {supported_chains}")
tx_hash = compute_tx_hash(tx_data)
with session_scope() as session:
sender_account = session.get(Account, (chain_id, tx_data["from"]))
if sender_account is None:
raise ValueError(f"sender account not found on chain '{chain_id}'")
total_cost = tx_data["amount"] + tx_data["fee"]
if sender_account.balance < total_cost:
raise ValueError(
f"insufficient balance for sender '{tx_data['from']}' on chain '{chain_id}': has {sender_account.balance}, needs {total_cost}"
)
if tx_data["nonce"] != sender_account.nonce:
raise ValueError(
f"invalid nonce for sender '{tx_data['from']}' on chain '{chain_id}': expected {sender_account.nonce}, got {tx_data['nonce']}"
)
existing_tx = session.exec(
select(Transaction)
.where(Transaction.chain_id == chain_id)
.where(Transaction.tx_hash == tx_hash)
).first()
if existing_tx is not None:
raise ValueError(f"transaction '{tx_hash}' is already confirmed on chain '{chain_id}'")
existing_nonce = session.exec(
select(Transaction)
.where(Transaction.chain_id == chain_id)
.where(Transaction.sender == tx_data["from"])
.where(Transaction.nonce == tx_data["nonce"])
).first()
if existing_nonce is not None:
raise ValueError(
f"sender '{tx_data['from']}' already used nonce {tx_data['nonce']} on chain '{chain_id}'"
)
pending_txs = mempool.list_transactions(chain_id=chain_id)
if any(pending_tx.tx_hash == tx_hash for pending_tx in pending_txs):
raise ValueError(f"transaction '{tx_hash}' is already pending on chain '{chain_id}'")
if any(
pending_tx.content.get("from") == tx_data["from"] and pending_tx.content.get("nonce") == tx_data["nonce"]
for pending_tx in pending_txs
):
raise ValueError(
f"sender '{tx_data['from']}' already has pending nonce {tx_data['nonce']} on chain '{chain_id}'"
)
def _serialize_receipt(receipt: Receipt) -> Dict[str, Any]:
return {
"receipt_id": receipt.receipt_id,
"job_id": receipt.job_id,
"payload": receipt.payload,
"miner_signature": receipt.miner_signature,
"coordinator_attestations": receipt.coordinator_attestations,
"minted_amount": receipt.minted_amount,
"recorded_at": receipt.recorded_at.isoformat(),
}
class TransactionRequest(BaseModel):
type: str = Field(description="Transaction type, e.g. TRANSFER, RECEIPT_CLAIM, GPU_MARKETPLACE, EXCHANGE, MESSAGE")
sender: str = Field(alias="from") # Accept both "sender" and "from"
nonce: int
fee: int = Field(ge=0)
payload: Dict[str, Any]
sig: Optional[str] = Field(default=None, description="Signature payload")
@model_validator(mode="after")
def normalize_type(self) -> "TransactionRequest": # type: ignore[override]
normalized = self.type.upper()
valid_types = {"TRANSFER", "RECEIPT_CLAIM", "GPU_MARKETPLACE", "EXCHANGE", "MESSAGE"}
if normalized not in valid_types:
raise ValueError(f"unsupported transaction type: {normalized}. Valid types: {valid_types}")
self.type = normalized
return self
class ReceiptSubmissionRequest(BaseModel):
sender: str
nonce: int
fee: int = Field(ge=0)
payload: Dict[str, Any]
sig: Optional[str] = None
class EstimateFeeRequest(BaseModel):
type: Optional[str] = None
payload: Dict[str, Any] = Field(default_factory=dict)
@router.get("/head", summary="Get current chain head")
async def get_head(chain_id: str = None) -> Dict[str, Any]:
"""Get current chain head"""
from ..config import settings as cfg
# Use default chain_id from settings if not provided
if chain_id is None:
chain_id = cfg.chain_id
metrics_registry.increment("rpc_get_head_total")
start = time.perf_counter()
with session_scope() as session:
result = session.exec(select(Block).where(Block.chain_id == chain_id).order_by(Block.height.desc()).limit(1)).first()
if result is None:
metrics_registry.increment("rpc_get_head_not_found_total")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="no blocks yet")
metrics_registry.increment("rpc_get_head_success_total")
metrics_registry.observe("rpc_get_head_duration_seconds", time.perf_counter() - start)
return {
"height": result.height,
"hash": result.hash,
"timestamp": result.timestamp.isoformat(),
"tx_count": result.tx_count,
}
@router.get("/blocks/{height}", summary="Get block by height")
async def get_block(height: int, chain_id: str = None) -> Dict[str, Any]:
"""Get block by height"""
chain_id = get_chain_id(chain_id)
metrics_registry.increment("rpc_get_block_total")
start = time.perf_counter()
with session_scope() as session:
block = session.exec(
select(Block).where(Block.chain_id == chain_id).where(Block.height == height)
).first()
if block is None:
metrics_registry.increment("rpc_get_block_not_found_total")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="block not found")
metrics_registry.increment("rpc_get_block_success_total")
txs = session.exec(
select(Transaction)
.where(Transaction.chain_id == chain_id)
.where(Transaction.block_height == height)
).all()
tx_list = []
for tx in txs:
t = dict(tx.payload) if tx.payload else {}
t["tx_hash"] = tx.tx_hash
tx_list.append(t)
metrics_registry.observe("rpc_get_block_duration_seconds", time.perf_counter() - start)
return {
"chain_id": block.chain_id,
"height": block.height,
"hash": block.hash,
"parent_hash": block.parent_hash,
"proposer": block.proposer,
"timestamp": block.timestamp.isoformat(),
"tx_count": block.tx_count,
"state_root": block.state_root,
"transactions": tx_list,
}
@router.post("/transaction", summary="Submit transaction")
async def submit_transaction(tx_data: TransactionRequest) -> Dict[str, Any]:
"""Submit a new transaction to the mempool"""
from ..mempool import get_mempool
try:
mempool = get_mempool()
chain_id = get_chain_id(None)
# Convert TransactionRequest to dict for normalization
# _normalize_transaction_data expects "from", not "sender"
tx_data_dict = {
"from": tx_data.sender,
"to": tx_data.payload.get("to", tx_data.sender), # Get to from payload or default to sender
"amount": tx_data.payload.get("amount", 0), # Get amount from payload
"fee": tx_data.fee,
"nonce": tx_data.nonce,
"payload": tx_data.payload,
"type": tx_data.type,
"signature": tx_data.sig
}
tx_data_dict = _normalize_transaction_data(tx_data_dict, chain_id)
_validate_transaction_admission(tx_data_dict, mempool)
tx_hash = mempool.add(tx_data_dict, chain_id=chain_id)
return {
"success": True,
"transaction_hash": tx_hash,
"message": "Transaction submitted to mempool"
}
except Exception as e:
_logger.error("Failed to submit transaction", extra={"error": str(e)})
raise HTTPException(status_code=400, detail=f"Failed to submit transaction: {str(e)}")
@router.get("/mempool", summary="Get pending transactions")
async def get_mempool(chain_id: str = None, limit: int = 100) -> Dict[str, Any]:
"""Get pending transactions from mempool"""
from ..mempool import get_mempool
try:
mempool = get_mempool()
pending_txs = mempool.get_pending_transactions(chain_id=chain_id, limit=limit)
return {
"success": True,
"transactions": pending_txs,
"count": len(pending_txs)
}
except Exception as e:
_logger.error(f"Failed to get mempool", extra={"error": str(e)})
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get mempool: {str(e)}")
@router.get("/account/{address}", summary="Get account information")
async def get_account(address: str, chain_id: str = None) -> Dict[str, Any]:
"""Get account information"""
chain_id = get_chain_id(chain_id)
with session_scope() as session:
account = session.exec(select(Account).where(Account.address == address).where(Account.chain_id == chain_id)).first()
if not account:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Account not found")
return {
"address": account.address,
"balance": account.balance,
"nonce": account.nonce,
"chain_id": account.chain_id
}
@router.get("/accounts/{address}", summary="Get account information (alias)")
async def get_account_alias(address: str, chain_id: str = None) -> Dict[str, Any]:
"""Get account information (alias endpoint)"""
return await get_account(address, chain_id)
@router.post("/transactions/marketplace", summary="Submit marketplace transaction")
async def submit_marketplace_transaction(tx_data: Dict[str, Any]) -> Dict[str, Any]:
"""Submit a marketplace purchase transaction to the blockchain"""
from ..config import settings as cfg
chain_id = get_chain_id(tx_data.get("chain_id"))
metrics_registry.increment("rpc_marketplace_transaction_total")
start = time.perf_counter()
try:
with session_scope() as session:
# Validate sender account
sender_addr = tx_data.get("from")
sender_account = session.get(Account, (chain_id, sender_addr))
if not sender_account:
raise ValueError(f"Sender account not found: {sender_addr}")
# Validate balance
amount = tx_data.get("value", 0)
fee = tx_data.get("fee", 0)
total_cost = amount + fee
if sender_account.balance < total_cost:
raise ValueError(f"Insufficient balance: {sender_account.balance} < {total_cost}")
# Validate nonce
tx_nonce = tx_data.get("nonce", 0)
if tx_nonce != sender_account.nonce:
raise ValueError(f"Invalid nonce: expected {sender_account.nonce}, got {tx_nonce}")
# Get or create recipient account
recipient_addr = tx_data.get("to")
recipient_account = session.get(Account, (chain_id, recipient_addr))
if not recipient_account:
recipient_account = Account(
chain_id=chain_id,
address=recipient_addr,
balance=0,
nonce=0
)
session.add(recipient_account)
# Create transaction record
tx_hash = compute_tx_hash(tx_data)
transaction = Transaction(
chain_id=chain_id,
tx_hash=tx_hash,
sender=sender_addr,
recipient=recipient_addr,
payload=tx_data.get("payload", {}),
created_at=datetime.utcnow(),
nonce=tx_nonce,
value=amount,
fee=fee,
status="pending",
timestamp=datetime.utcnow().isoformat()
)
session.add(transaction)
# Update account balances (pending state)
sender_account.balance -= total_cost
sender_account.nonce += 1
recipient_account.balance += amount
metrics_registry.increment("rpc_marketplace_transaction_success")
duration = time.perf_counter() - start
metrics_registry.observe("rpc_marketplace_transaction_duration_seconds", duration)
_logger.info(f"Marketplace transaction submitted: {tx_hash[:16]}... from {sender_addr[:16]}... to {recipient_addr[:16]}... amount={amount}")
return {
"success": True,
"tx_hash": tx_hash,
"status": "pending",
"chain_id": chain_id,
"amount": amount,
"fee": fee,
"from": sender_addr,
"to": recipient_addr
}
except ValueError as e:
metrics_registry.increment("rpc_marketplace_transaction_validation_errors_total")
_logger.error(f"Marketplace transaction validation failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
metrics_registry.increment("rpc_marketplace_transaction_errors_total")
_logger.error(f"Failed to submit marketplace transaction", extra={"error": str(e)})
raise HTTPException(status_code=500, detail=f"Failed to submit marketplace transaction: {str(e)}")
@router.get("/transactions", summary="Query transactions")
async def query_transactions(
transaction_type: Optional[str] = None,
island_id: Optional[str] = None,
pair: Optional[str] = None,
status: Optional[str] = None,
order_id: Optional[str] = None,
limit: Optional[int] = 100,
chain_id: str = None
) -> List[Dict[str, Any]]:
"""Query transactions with optional filters"""
chain_id = get_chain_id(chain_id)
with session_scope() as session:
query = select(Transaction).where(Transaction.chain_id == chain_id)
# Apply filters based on payload fields
transactions = session.exec(query).all()
results = []
for tx in transactions:
# Filter by transaction type in payload
if transaction_type and tx.payload.get('type') != transaction_type:
continue
# Filter by island_id in payload
if island_id and tx.payload.get('island_id') != island_id:
continue
# Filter by pair in payload
if pair and tx.payload.get('pair') != pair:
continue
# Filter by status in payload
if status and tx.payload.get('status') != status:
continue
# Filter by order_id in payload
if order_id and tx.payload.get('order_id') != order_id and tx.payload.get('offer_id') != order_id and tx.payload.get('bid_id') != order_id:
continue
results.append({
"transaction_id": tx.id,
"tx_hash": tx.tx_hash,
"sender": tx.sender,
"recipient": tx.recipient,
"payload": tx.payload,
"status": tx.status,
"created_at": tx.created_at.isoformat(),
"timestamp": tx.timestamp,
"nonce": tx.nonce,
"value": tx.value,
"fee": tx.fee
})
# Apply limit
if limit:
results = results[:limit]
return results
@router.get("/blocks-range", summary="Get blocks in height range")
async def get_blocks_range(start: int = 0, end: int = 10, include_tx: bool = True, chain_id: str = None) -> Dict[str, Any]:
"""Get blocks in a height range
Args:
start: Starting block height (inclusive)
end: Ending block height (inclusive)
include_tx: Whether to include transaction data (default: True)
"""
with session_scope() as session:
from ..models import Transaction
chain_id = get_chain_id(chain_id)
blocks = session.exec(
select(Block).where(
Block.chain_id == chain_id,
Block.height >= start,
Block.height <= end,
).order_by(Block.height.asc())
).all()
result_blocks = []
for b in blocks:
block_data = {
"height": b.height,
"hash": b.hash,
"parent_hash": b.parent_hash,
"proposer": b.proposer,
"timestamp": b.timestamp.isoformat(),
"tx_count": b.tx_count,
"state_root": b.state_root,
}
if include_tx:
# Fetch transactions for this block
txs = session.exec(
select(Transaction)
.where(Transaction.chain_id == chain_id)
.where(Transaction.block_height == b.height)
).all()
block_data["transactions"] = [tx.model_dump() for tx in txs]
result_blocks.append(block_data)
return {
"success": True,
"blocks": result_blocks,
"count": len(blocks),
}
@router.post("/contracts/deploy/messaging", summary="Deploy messaging contract")
async def deploy_messaging_contract(deploy_data: dict) -> Dict[str, Any]:
"""Deploy the agent messaging contract to the blockchain"""
contract_address = "0xagent_messaging_001"
return {"success": True, "contract_address": contract_address, "status": "deployed"}
@router.get("/contracts/messaging/state", summary="Get messaging contract state")
async def get_messaging_contract_state() -> Dict[str, Any]:
"""Get the current state of the messaging contract"""
state = {
"total_topics": len(messaging_contract.topics),
"total_messages": len(messaging_contract.messages),
"total_agents": len(messaging_contract.agent_reputations)
}
return {"success": True, "contract_state": state}
@router.get("/messaging/topics", summary="Get forum topics")
async def get_forum_topics(limit: int = 50, offset: int = 0, sort_by: str = "last_activity") -> Dict[str, Any]:
"""Get list of forum topics"""
return messaging_contract.get_topics(limit, offset, sort_by)
@router.post("/messaging/topics/create", summary="Create forum topic")
async def create_forum_topic(topic_data: dict) -> Dict[str, Any]:
"""Create a new forum topic"""
return messaging_contract.create_topic(
topic_data.get("agent_id"),
topic_data.get("agent_address"),
topic_data.get("title"),
topic_data.get("description"),
topic_data.get("tags", [])
)
@router.get("/messaging/topics/{topic_id}/messages", summary="Get topic messages")
async def get_topic_messages(topic_id: str, limit: int = 50, offset: int = 0, sort_by: str = "timestamp") -> Dict[str, Any]:
"""Get messages from a forum topic"""
return messaging_contract.get_messages(topic_id, limit, offset, sort_by)
@router.post("/messaging/messages/post", summary="Post message")
async def post_message(message_data: dict) -> Dict[str, Any]:
"""Post a message to a forum topic"""
return messaging_contract.post_message(
message_data.get("agent_id"),
message_data.get("agent_address"),
message_data.get("topic_id"),
message_data.get("content"),
message_data.get("message_type", "post"),
message_data.get("parent_message_id")
)
@router.post("/messaging/messages/{message_id}/vote", summary="Vote on message")
async def vote_message(message_id: str, vote_data: dict) -> Dict[str, Any]:
"""Vote on a message (upvote/downvote)"""
return messaging_contract.vote_message(
vote_data.get("agent_id"),
vote_data.get("agent_address"),
message_id,
vote_data.get("vote_type")
)
@router.get("/messaging/messages/search", summary="Search messages")
async def search_messages(query: str, limit: int = 50) -> Dict[str, Any]:
"""Search messages by content"""
return messaging_contract.search_messages(query, limit)
@router.get("/messaging/agents/{agent_id}/reputation", summary="Get agent reputation")
async def get_agent_reputation(agent_id: str) -> Dict[str, Any]:
"""Get agent reputation information"""
return messaging_contract.get_agent_reputation(agent_id)
@router.post("/messaging/messages/{message_id}/moderate", summary="Moderate message")
async def moderate_message(message_id: str, moderation_data: dict) -> Dict[str, Any]:
"""Moderate a message (moderator only)"""
return messaging_contract.moderate_message(
moderation_data.get("moderator_agent_id"),
moderation_data.get("moderator_address"),
message_id,
moderation_data.get("action"),
moderation_data.get("reason", "")
)
@router.post("/importBlock", summary="Import a block")
async def import_block(block_data: dict) -> Dict[str, Any]:
"""Import a block into the blockchain"""
global _last_import_time
async with _import_lock:
try:
# Rate limiting: max 1 import per second
current_time = time.time()
time_since_last = current_time - _last_import_time
if time_since_last < 1.0:
await asyncio.sleep(1.0 - time_since_last)
_last_import_time = time.time()
chain_id = block_data.get("chain_id") or block_data.get("chainId") or get_chain_id(None)
timestamp = block_data.get("timestamp")
if isinstance(timestamp, str):
try:
timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
except ValueError:
timestamp = datetime.utcnow()
elif timestamp is None:
timestamp = datetime.utcnow()
with session_scope() as session:
# Check for hash conflicts across chains
block_hash = block_data["hash"]
existing_block = session.execute(
select(Block).where(Block.hash == block_hash)
).first()
if existing_block:
# Delete existing block with conflicting hash
_logger.warning(f"Deleting existing block with conflicting hash {block_hash} from chain {existing_block[0].chain_id}")
session.execute(delete(Block).where(Block.hash == block_hash))
session.commit()
# Create block
block = Block(
chain_id=chain_id,
height=block_data["height"],
hash=block_hash,
parent_hash=block_data["parent_hash"],
proposer=block_data["proposer"],
timestamp=timestamp,
state_root=block_data.get("state_root"),
tx_count=block_data.get("tx_count", 0)
)
session.add(block)
session.commit()
return {
"success": True,
"block_height": block.height,
"block_hash": block.hash,
"chain_id": chain_id
}
except Exception as e:
_logger.error(f"Error importing block: {e}")
raise HTTPException(status_code=500, detail=f"Failed to import block: {str(e)}")
def _serialize_optional_timestamp(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, str):
return value
if hasattr(value, "isoformat"):
return value.isoformat()
return str(value)
def _parse_datetime_value(value: Any, field_name: str) -> Optional[datetime]:
if value in (None, ""):
return None
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Invalid {field_name}: {value}") from exc
raise HTTPException(status_code=400, detail=f"Invalid {field_name} type: {type(value).__name__}")
def _select_export_blocks(session, chain_id: str) -> List[Block]:
blocks_result = session.execute(
select(Block)
.where(Block.chain_id == chain_id)
.order_by(Block.height.asc(), Block.id.desc())
)
blocks: List[Block] = []
seen_heights = set()
duplicate_count = 0
for block in blocks_result.scalars().all():
if block.height in seen_heights:
duplicate_count += 1
continue
seen_heights.add(block.height)
blocks.append(block)
if duplicate_count:
_logger.warning(f"Filtered {duplicate_count} duplicate exported blocks for chain {chain_id}")
return blocks
def _dedupe_import_blocks(blocks: List[Dict[str, Any]], chain_id: str) -> List[Dict[str, Any]]:
latest_by_height: Dict[int, Dict[str, Any]] = {}
duplicate_count = 0
for block_data in blocks:
if "height" not in block_data:
raise HTTPException(status_code=400, detail="Block height is required")
try:
height = int(block_data["height"])
except (TypeError, ValueError) as exc:
raise HTTPException(status_code=400, detail=f"Invalid block height: {block_data.get('height')}") from exc
block_chain_id = block_data.get("chain_id")
if block_chain_id and block_chain_id != chain_id:
raise HTTPException(
status_code=400,
detail=f"Mismatched block chain_id '{block_chain_id}' for import chain '{chain_id}'",
)
normalized_block = dict(block_data)
normalized_block["height"] = height
normalized_block["chain_id"] = chain_id
if height in latest_by_height:
duplicate_count += 1
latest_by_height[height] = normalized_block
if duplicate_count:
_logger.warning(f"Filtered {duplicate_count} duplicate imported blocks for chain {chain_id}")
return [latest_by_height[height] for height in sorted(latest_by_height)]
@router.get("/export-chain", summary="Export full chain state")
async def export_chain(chain_id: str = None) -> Dict[str, Any]:
"""Export full chain state as JSON for manual synchronization"""
chain_id = get_chain_id(chain_id)
try:
# Use session_scope for database operations
with session_scope() as session:
blocks = _select_export_blocks(session, chain_id)
accounts_result = session.execute(
select(Account)
.where(Account.chain_id == chain_id)
.order_by(Account.address)
)
accounts = list(accounts_result.scalars().all())
txs_result = session.execute(
select(Transaction)
.where(Transaction.chain_id == chain_id)
.order_by(Transaction.block_height, Transaction.id)
)
transactions = list(txs_result.scalars().all())
# Build export data
export_data = {
"chain_id": chain_id,
"export_timestamp": datetime.now().isoformat(),
"block_count": len(blocks),
"account_count": len(accounts),
"transaction_count": len(transactions),
"blocks": [
{
"chain_id": b.chain_id,
"height": b.height,
"hash": b.hash,
"parent_hash": b.parent_hash,
"proposer": b.proposer,
"timestamp": b.timestamp.isoformat() if b.timestamp else None,
"state_root": b.state_root,
"tx_count": b.tx_count,
"block_metadata": b.block_metadata,
}
for b in blocks
],
"accounts": [
{
"chain_id": a.chain_id,
"address": a.address,
"balance": a.balance,
"nonce": a.nonce
}
for a in accounts
],
"transactions": [
{
"id": t.id,
"chain_id": t.chain_id,
"tx_hash": t.tx_hash,
"block_height": t.block_height,
"sender": t.sender,
"recipient": t.recipient,
"payload": t.payload,
"value": t.value,
"fee": t.fee,
"nonce": t.nonce,
"timestamp": _serialize_optional_timestamp(t.timestamp),
"status": t.status,
"created_at": t.created_at.isoformat() if t.created_at else None,
"tx_metadata": t.tx_metadata,
}
for t in transactions
]
}
return {
"success": True,
"export_data": export_data,
"export_size_bytes": len(json.dumps(export_data))
}
except HTTPException:
raise
except Exception as e:
_logger.error(f"Error exporting chain: {e}")
raise HTTPException(status_code=500, detail=f"Failed to export chain: {str(e)}")
@router.post("/import-chain", summary="Import chain state")
async def import_chain(import_data: dict) -> Dict[str, Any]:
"""Import chain state from JSON for manual synchronization"""
async with _import_lock:
try:
chain_id = import_data.get("chain_id")
blocks = import_data.get("blocks", [])
accounts = import_data.get("accounts", [])
transactions = import_data.get("transactions", [])
if not chain_id and blocks:
chain_id = blocks[0].get("chain_id")
chain_id = get_chain_id(chain_id)
unique_blocks = _dedupe_import_blocks(blocks, chain_id)
with session_scope() as session:
if not unique_blocks:
raise HTTPException(status_code=400, detail="No blocks to import")
existing_blocks = session.execute(
select(Block)
.where(Block.chain_id == chain_id)
.order_by(Block.height)
)
existing_count = len(list(existing_blocks.scalars().all()))
if existing_count > 0:
_logger.info(f"Backing up existing chain with {existing_count} blocks")
_logger.info(f"Clearing existing transactions for chain {chain_id}")
session.execute(delete(Transaction).where(Transaction.chain_id == chain_id))
if accounts:
_logger.info(f"Clearing existing accounts for chain {chain_id}")
session.execute(delete(Account).where(Account.chain_id == chain_id))
_logger.info(f"Clearing existing blocks for chain {chain_id}")
session.execute(delete(Block).where(Block.chain_id == chain_id))
import_hashes = {block_data["hash"] for block_data in unique_blocks}
if import_hashes:
hash_conflict_result = session.execute(
select(Block.hash, Block.chain_id)
.where(Block.hash.in_(import_hashes))
)
hash_conflicts = hash_conflict_result.all()
if hash_conflicts:
conflict_chains = {chain_id for _, chain_id in hash_conflicts}
_logger.warning(f"Clearing {len(hash_conflicts)} blocks with conflicting hashes across chains: {conflict_chains}")
session.execute(delete(Block).where(Block.hash.in_(import_hashes)))
session.commit()
session.expire_all()
_logger.info(f"Importing {len(unique_blocks)} unique blocks (filtered from {len(blocks)} total)")
for block_data in unique_blocks:
block_timestamp = _parse_datetime_value(block_data.get("timestamp"), "block timestamp") or datetime.utcnow()
block = Block(
chain_id=chain_id,
height=block_data["height"],
hash=block_data["hash"],
parent_hash=block_data["parent_hash"],
proposer=block_data["proposer"],
timestamp=block_timestamp,
state_root=block_data.get("state_root"),
tx_count=block_data.get("tx_count", 0),
block_metadata=block_data.get("block_metadata"),
)
session.add(block)
for account_data in accounts:
account_chain_id = account_data.get("chain_id", chain_id)
if account_chain_id != chain_id:
raise HTTPException(
status_code=400,
detail=f"Mismatched account chain_id '{account_chain_id}' for import chain '{chain_id}'",
)
account = Account(
chain_id=account_chain_id,
address=account_data["address"],
balance=account_data["balance"],
nonce=account_data["nonce"],
)
session.add(account)
for tx_data in transactions:
tx_chain_id = tx_data.get("chain_id", chain_id)
if tx_chain_id != chain_id:
raise HTTPException(
status_code=400,
detail=f"Mismatched transaction chain_id '{tx_chain_id}' for import chain '{chain_id}'",
)
tx = Transaction(
id=tx_data.get("id"),
chain_id=tx_chain_id,
tx_hash=str(tx_data.get("tx_hash") or tx_data.get("id") or ""),
block_height=tx_data.get("block_height"),
sender=tx_data["sender"],
recipient=tx_data["recipient"],
payload=tx_data.get("payload", {}),
value=tx_data.get("value", 0),
fee=tx_data.get("fee", 0),
nonce=tx_data.get("nonce", 0),
timestamp=_serialize_optional_timestamp(tx_data.get("timestamp")),
status=tx_data.get("status", "pending"),
tx_metadata=tx_data.get("tx_metadata"),
)
created_at = _parse_datetime_value(tx_data.get("created_at"), "transaction created_at")
if created_at is not None:
tx.created_at = created_at
session.add(tx)
session.commit()
return {
"success": True,
"imported_blocks": len(unique_blocks),
"imported_accounts": len(accounts),
"imported_transactions": len(transactions),
"chain_id": chain_id,
"message": f"Successfully imported {len(unique_blocks)} blocks",
}
except HTTPException:
raise
except Exception as e:
_logger.error(f"Error importing chain: {e}")
raise HTTPException(status_code=500, detail=f"Failed to import chain: {str(e)}")
@router.post("/force-sync", summary="Force reorg to specified peer")
async def force_sync(peer_data: dict) -> Dict[str, Any]:
"""Force blockchain reorganization to sync with specified peer"""
try:
peer_url = peer_data.get("peer_url")
target_height = peer_data.get("target_height")
if not peer_url:
raise HTTPException(status_code=400, detail="peer_url is required")
import requests
response = requests.get(f"{peer_url}/rpc/export-chain", timeout=30)
if response.status_code != 200:
raise HTTPException(status_code=400, detail=f"Failed to fetch peer chain: {response.status_code}")
peer_chain_data = response.json()
peer_blocks = peer_chain_data["export_data"]["blocks"]
if target_height and len(peer_blocks) < target_height:
raise HTTPException(status_code=400, detail=f"Peer only has {len(peer_blocks)} blocks, cannot sync to height {target_height}")
import_result = await import_chain(peer_chain_data["export_data"])
return {
"success": True,
"synced_from": peer_url,
"synced_blocks": import_result["imported_blocks"],
"target_height": target_height or import_result["imported_blocks"],
"message": f"Successfully synced with peer {peer_url}"
}
except HTTPException:
raise
except Exception as e:
_logger.error(f"Error forcing sync: {e}")
raise HTTPException(status_code=500, detail=f"Failed to force sync: {str(e)}")