From 4f157e21ee309d808852c0c2012fc214e02be43c Mon Sep 17 00:00:00 2001 From: aitbc Date: Mon, 20 Apr 2026 21:11:01 +0200 Subject: [PATCH] feat: implement automatic bulk sync for blockchain nodes - Add auto_sync_enabled, auto_sync_threshold, auto_sync_max_retries config - Add min_bulk_sync_interval for rate limiting - Implement gap detection in process_blocks() with automatic bulk sync trigger - Add rate limiting to ChainSync.bulk_import_from() to prevent sync loops - Automatic bulk sync triggers when gap > threshold (default 10 blocks) - Retries block import after bulk sync completes - Logs sync events for monitoring and debugging --- .../blockchain-node/src/aitbc_chain/config.py | 6 +++ apps/blockchain-node/src/aitbc_chain/main.py | 41 +++++++++++++++++++ apps/blockchain-node/src/aitbc_chain/sync.py | 13 ++++++ 3 files changed, 60 insertions(+) diff --git a/apps/blockchain-node/src/aitbc_chain/config.py b/apps/blockchain-node/src/aitbc_chain/config.py index 268c9043..a52a58fa 100755 --- a/apps/blockchain-node/src/aitbc_chain/config.py +++ b/apps/blockchain-node/src/aitbc_chain/config.py @@ -68,6 +68,12 @@ class ChainSettings(BaseSettings): trusted_proposers: str = "" # comma-separated list of trusted proposer IDs max_reorg_depth: int = 10 # max blocks to reorg on conflict sync_validate_signatures: bool = True # validate proposer signatures on import + + # Automatic bulk sync settings + auto_sync_enabled: bool = True # enable automatic bulk sync when gap detected + auto_sync_threshold: int = 10 # blocks gap threshold to trigger bulk sync + auto_sync_max_retries: int = 3 # max retry attempts for automatic bulk sync + min_bulk_sync_interval: int = 60 # minimum seconds between bulk sync attempts gossip_backend: str = "memory" gossip_broadcast_url: Optional[str] = None diff --git a/apps/blockchain-node/src/aitbc_chain/main.py b/apps/blockchain-node/src/aitbc_chain/main.py index 059ecc12..e7c5672b 100755 --- a/apps/blockchain-node/src/aitbc_chain/main.py +++ b/apps/blockchain-node/src/aitbc_chain/main.py @@ -125,6 +125,7 @@ class BlockchainNode: return async def process_blocks(): + last_bulk_sync_time = 0 while True: try: block_data = await block_sub.queue.get() @@ -137,6 +138,46 @@ class BlockchainNode: sync = ChainSync(session_factory=session_scope, chain_id=chain_id) res = sync.import_block(block_data, transactions=block_data.get("transactions")) logger.info(f"Import result: accepted={res.accepted}, reason={res.reason}") + + # Automatic bulk sync on gap detection + if not res.accepted and "Gap detected" in res.reason and settings.auto_sync_enabled: + # Parse gap size from reason string + try: + reason_parts = res.reason.split(":") + our_height = int(reason_parts[1].strip().split(",")[0].replace("our height: ", "")) + received_height = int(reason_parts[2].strip().replace("received: ", "")) + gap_size = received_height - our_height + + if gap_size > settings.auto_sync_threshold: + current_time = asyncio.get_event_loop().time() + time_since_last_sync = current_time - last_bulk_sync_time + + if time_since_last_sync >= settings.min_bulk_sync_interval: + logger.warning(f"Gap detected: {gap_size} blocks, triggering automatic bulk sync") + + # Get source URL from block metadata if available + source_url = block_data.get("source_url") + if not source_url: + # Fallback to default peer URL from gossip backend + source_url = settings.gossip_broadcast_url + + if source_url: + try: + imported = await sync.bulk_import_from(source_url) + logger.info(f"Bulk sync completed: {imported} blocks imported") + last_bulk_sync_time = current_time + + # Retry block import after bulk sync + res = sync.import_block(block_data, transactions=block_data.get("transactions")) + logger.info(f"Retry import result: accepted={res.accepted}, reason={res.reason}") + except Exception as sync_exc: + logger.error(f"Automatic bulk sync failed: {sync_exc}") + else: + logger.warning("No source URL available for bulk sync") + else: + logger.info(f"Skipping bulk sync, too recent ({time_since_last_sync:.0f}s ago)") + except (ValueError, IndexError) as parse_exc: + logger.error(f"Failed to parse gap size from reason: {res.reason}, error: {parse_exc}") except Exception as exc: logger.error(f"Error processing block from gossip: {exc}") diff --git a/apps/blockchain-node/src/aitbc_chain/sync.py b/apps/blockchain-node/src/aitbc_chain/sync.py index 77509a7c..90aaa484 100755 --- a/apps/blockchain-node/src/aitbc_chain/sync.py +++ b/apps/blockchain-node/src/aitbc_chain/sync.py @@ -111,6 +111,8 @@ class ChainSync: self._batch_size = batch_size self._poll_interval = poll_interval self._client = httpx.AsyncClient(timeout=10.0) + self._last_bulk_sync_time = 0 + self._min_bulk_sync_interval = getattr(settings, 'min_bulk_sync_interval', 60) async def close(self) -> None: """Close HTTP client.""" @@ -138,6 +140,13 @@ class ChainSync: if import_url is None: import_url = "http://127.0.0.1:8006" # default local RPC + # Rate limiting check + current_time = time.time() + time_since_last_sync = current_time - self._last_bulk_sync_time + if time_since_last_sync < self._min_bulk_sync_interval: + logger.warning("Bulk sync rate limited", extra={"time_since_last_sync": time_since_last_sync, "min_interval": self._min_bulk_sync_interval}) + return 0 + # Get local head with self._session_factory() as session: local_head = session.exec( @@ -185,6 +194,10 @@ class ChainSync: await asyncio.sleep(self._poll_interval) logger.info("Bulk import completed", extra={"imported": imported, "final_height": remote_height}) + + # Update last bulk sync time + self._last_bulk_sync_time = current_time + return imported def import_block(self, block_data: Dict[str, Any], transactions: Optional[List[Dict[str, Any]]] = None) -> ImportResult: