diff --git a/apps/blockchain-node/src/aitbc_chain/config.py b/apps/blockchain-node/src/aitbc_chain/config.py index 9ec70c1e..e7dfd954 100755 --- a/apps/blockchain-node/src/aitbc_chain/config.py +++ b/apps/blockchain-node/src/aitbc_chain/config.py @@ -74,6 +74,8 @@ class ChainSettings(BaseSettings): auto_sync_threshold: int = 10 # blocks gap threshold to trigger bulk sync auto_sync_max_retries: int = 3 # max retry attempts for automatic bulk sync min_bulk_sync_interval: int = 60 # minimum seconds between bulk sync attempts + min_bulk_sync_batch_size: int = 20 # minimum batch size for dynamic bulk sync + max_bulk_sync_batch_size: int = 200 # maximum batch size for dynamic bulk sync gossip_backend: str = "memory" gossip_broadcast_url: Optional[str] = None diff --git a/apps/blockchain-node/src/aitbc_chain/sync.py b/apps/blockchain-node/src/aitbc_chain/sync.py index 90aaa484..83facfce 100755 --- a/apps/blockchain-node/src/aitbc_chain/sync.py +++ b/apps/blockchain-node/src/aitbc_chain/sync.py @@ -118,6 +118,27 @@ class ChainSync: """Close HTTP client.""" await self._client.aclose() + def _calculate_dynamic_batch_size(self, gap_size: int) -> int: + """Calculate dynamic batch size based on gap size. + + Strategy: + - Small gaps (< 100): Use smaller batches (20-50) for precision + - Medium gaps (100-500): Use medium batches (50-100) + - Large gaps (> 500): Use larger batches (100-200) for speed + """ + min_batch = getattr(settings, 'min_bulk_sync_batch_size', 20) + max_batch = getattr(settings, 'max_bulk_sync_batch_size', 200) + + if gap_size < 100: + # Small gaps: scale from min to 50 + return min(min_batch + gap_size // 2, 50) + elif gap_size < 500: + # Medium gaps: scale from 50 to 100 + return min(50 + (gap_size - 100) // 4, 100) + else: + # Large gaps: scale from 100 to max + return min(100 + (gap_size - 500) // 5, max_batch) + async def fetch_blocks_range(self, start: int, end: int, source_url: str) -> List[Dict[str, Any]]: """Fetch a range of blocks from a source RPC.""" try: @@ -168,12 +189,14 @@ class ChainSync: logger.info("Already up to date", extra={"local_height": local_height, "remote_height": remote_height}) return 0 - logger.info("Starting bulk import", extra={"local_height": local_height, "remote_height": remote_height, "batch_size": self._batch_size}) + gap_size = remote_height - local_height + dynamic_batch_size = self._calculate_dynamic_batch_size(gap_size) + logger.info("Starting bulk import", extra={"local_height": local_height, "remote_height": remote_height, "gap_size": gap_size, "batch_size": dynamic_batch_size}) imported = 0 start_height = local_height + 1 while start_height <= remote_height: - end_height = min(start_height + self._batch_size - 1, remote_height) + end_height = min(start_height + dynamic_batch_size - 1, remote_height) batch = await self.fetch_blocks_range(start_height, end_height, source_url) if not batch: logger.warning("No blocks returned for range", extra={"start": start_height, "end": end_height})