Fix async/sync mismatch in chain export/import and remove duplicate import_block code
- Replace manual session creation with session_scope() in export_chain - Convert query results to lists to avoid lazy loading issues after session close - Remove await from session.execute calls in import_chain (use synchronous queries) - Change force_sync back to async and restore await on import_chain call - Remove 69 lines of duplicate import_block implementation at end of file
This commit is contained in:
@@ -677,27 +677,19 @@ async def export_chain(chain_id: str = None) -> Dict[str, Any]:
|
||||
chain_id = get_chain_id(chain_id)
|
||||
|
||||
try:
|
||||
# Use synchronous database operations in async context
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from ..database import _db_path
|
||||
|
||||
engine = create_engine(f"sqlite:///{_db_path}")
|
||||
Session = sessionmaker(bind=engine)
|
||||
session = Session()
|
||||
|
||||
try:
|
||||
# Use session_scope for database operations
|
||||
with session_scope() as session:
|
||||
# Get all blocks
|
||||
blocks_result = session.execute(select(Block).order_by(Block.height))
|
||||
blocks = blocks_result.scalars().all()
|
||||
blocks = list(blocks_result.scalars().all())
|
||||
|
||||
# Get all accounts
|
||||
accounts_result = session.execute(select(Account))
|
||||
accounts = accounts_result.scalars().all()
|
||||
accounts = list(accounts_result.scalars().all())
|
||||
|
||||
# Get all transactions
|
||||
txs_result = session.execute(select(Transaction))
|
||||
transactions = txs_result.scalars().all()
|
||||
transactions = list(txs_result.scalars().all())
|
||||
|
||||
# Build export data
|
||||
export_data = {
|
||||
@@ -768,8 +760,8 @@ async def import_chain(import_data: dict) -> Dict[str, Any]:
|
||||
raise HTTPException(status_code=400, detail="No blocks to import")
|
||||
|
||||
# Check if database has existing data
|
||||
existing_blocks = await session.execute(select(Block).order_by(Block.height))
|
||||
existing_count = len(existing_blocks.scalars().all())
|
||||
existing_blocks = session.execute(select(Block).order_by(Block.height))
|
||||
existing_count = len(list(existing_blocks.scalars().all()))
|
||||
|
||||
if existing_count > 0:
|
||||
# Backup existing data
|
||||
@@ -781,9 +773,9 @@ async def import_chain(import_data: dict) -> Dict[str, Any]:
|
||||
_logger.info(f"Backing up existing chain with {existing_count} blocks")
|
||||
|
||||
# Clear existing data
|
||||
await session.execute(select(Block).delete())
|
||||
await session.execute(select(Account).delete())
|
||||
await session.execute(select(Transaction).delete())
|
||||
session.execute(select(Block).delete())
|
||||
session.execute(select(Account).delete())
|
||||
session.execute(select(Transaction).delete())
|
||||
|
||||
# Import blocks
|
||||
for block_data in blocks:
|
||||
@@ -838,7 +830,7 @@ async def import_chain(import_data: dict) -> Dict[str, Any]:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to import chain: {str(e)}")
|
||||
|
||||
@router.post("/force-sync", summary="Force reorg to specified peer")
|
||||
def force_sync(peer_data: dict) -> Dict[str, Any]:
|
||||
async def force_sync(peer_data: dict) -> Dict[str, Any]:
|
||||
"""Force blockchain reorganization to sync with specified peer"""
|
||||
try:
|
||||
peer_url = peer_data.get("peer_url")
|
||||
@@ -861,7 +853,7 @@ def force_sync(peer_data: dict) -> Dict[str, Any]:
|
||||
raise HTTPException(status_code=400, detail=f"Peer only has {len(peer_blocks)} blocks, cannot sync to height {target_height}")
|
||||
|
||||
# Import peer's chain
|
||||
import_result = import_chain(peer_chain_data["export_data"])
|
||||
import_result = await import_chain(peer_chain_data["export_data"])
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
@@ -874,69 +866,3 @@ def force_sync(peer_data: dict) -> Dict[str, Any]:
|
||||
except Exception as e:
|
||||
_logger.error(f"Error forcing sync: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to force sync: {str(e)}")
|
||||
|
||||
async with _import_lock:
|
||||
try:
|
||||
# Rate limiting: max 1 import per second
|
||||
current_time = time.time()
|
||||
time_since_last = current_time - _last_import_time
|
||||
if False: # time_since_last < 1.0: # 1 second minimum between imports
|
||||
await asyncio.sleep(1.0 - time_since_last)
|
||||
|
||||
_last_import_time = time.time()
|
||||
|
||||
chain_id = block_data.get("chain_id") or block_data.get("chainId") or get_chain_id(None)
|
||||
|
||||
timestamp = block_data.get("timestamp")
|
||||
if isinstance(timestamp, str):
|
||||
try:
|
||||
timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
|
||||
except ValueError:
|
||||
timestamp = datetime.utcnow()
|
||||
elif timestamp is None:
|
||||
timestamp = datetime.utcnow()
|
||||
|
||||
height = block_data.get("number") or block_data.get("height")
|
||||
if height is None:
|
||||
raise ValueError("Block height is required")
|
||||
|
||||
transactions = block_data.get("transactions", [])
|
||||
normalized_block = {
|
||||
"chain_id": chain_id,
|
||||
"height": int(height),
|
||||
"hash": block_data.get("hash"),
|
||||
"parent_hash": block_data.get("parent_hash") or block_data.get("parentHash", ""),
|
||||
"proposer": block_data.get("proposer") or block_data.get("miner", ""),
|
||||
"timestamp": timestamp.isoformat() if isinstance(timestamp, datetime) else timestamp,
|
||||
"tx_count": block_data.get("tx_count", len(transactions)),
|
||||
"state_root": block_data.get("state_root") or block_data.get("stateRoot"),
|
||||
}
|
||||
|
||||
from ..config import settings as cfg
|
||||
sync = ChainSync(
|
||||
session_factory=session_scope,
|
||||
chain_id=chain_id,
|
||||
validate_signatures=cfg.sync_validate_signatures,
|
||||
)
|
||||
result = sync.import_block(normalized_block, transactions=transactions)
|
||||
|
||||
if result.accepted:
|
||||
_logger.info(f"Successfully imported block {result.height}")
|
||||
metrics_registry.increment("blocks_imported_total")
|
||||
|
||||
return {
|
||||
"success": result.accepted,
|
||||
"accepted": result.accepted,
|
||||
"block_number": result.height,
|
||||
"block_hash": result.block_hash,
|
||||
"chain_id": chain_id,
|
||||
"reason": result.reason,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
_logger.error(f"Failed to import block: {e}")
|
||||
metrics_registry.increment("block_import_errors_total")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to import block: {str(e)}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user