feat: implement dynamic batch size for bulk sync
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Failing after 3s
Integration Tests / test-service-integration (push) Successful in 12s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 2s
P2P Network Verification / p2p-verification (push) Successful in 2s
Python Tests / test-python (push) Successful in 9s
Security Scanning / security-scan (push) Successful in 32s
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Failing after 3s
Integration Tests / test-service-integration (push) Successful in 12s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 2s
P2P Network Verification / p2p-verification (push) Successful in 2s
Python Tests / test-python (push) Successful in 9s
Security Scanning / security-scan (push) Successful in 32s
- Add _calculate_dynamic_batch_size method to sync.py - Scale batch size based on gap size: - Small gaps (< 100): 20-50 blocks for precision - Medium gaps (100-500): 50-100 blocks - Large gaps (> 500): 100-200 blocks for speed - Add min_bulk_sync_batch_size and max_bulk_sync_batch_size config params - Update bulk_import_from to use dynamic batch size - Replaces fixed 50-block batch size with adaptive sizing
This commit is contained in:
@@ -74,6 +74,8 @@ class ChainSettings(BaseSettings):
|
|||||||
auto_sync_threshold: int = 10 # blocks gap threshold to trigger bulk sync
|
auto_sync_threshold: int = 10 # blocks gap threshold to trigger bulk sync
|
||||||
auto_sync_max_retries: int = 3 # max retry attempts for automatic bulk sync
|
auto_sync_max_retries: int = 3 # max retry attempts for automatic bulk sync
|
||||||
min_bulk_sync_interval: int = 60 # minimum seconds between bulk sync attempts
|
min_bulk_sync_interval: int = 60 # minimum seconds between bulk sync attempts
|
||||||
|
min_bulk_sync_batch_size: int = 20 # minimum batch size for dynamic bulk sync
|
||||||
|
max_bulk_sync_batch_size: int = 200 # maximum batch size for dynamic bulk sync
|
||||||
|
|
||||||
gossip_backend: str = "memory"
|
gossip_backend: str = "memory"
|
||||||
gossip_broadcast_url: Optional[str] = None
|
gossip_broadcast_url: Optional[str] = None
|
||||||
|
|||||||
@@ -118,6 +118,27 @@ class ChainSync:
|
|||||||
"""Close HTTP client."""
|
"""Close HTTP client."""
|
||||||
await self._client.aclose()
|
await self._client.aclose()
|
||||||
|
|
||||||
|
def _calculate_dynamic_batch_size(self, gap_size: int) -> int:
|
||||||
|
"""Calculate dynamic batch size based on gap size.
|
||||||
|
|
||||||
|
Strategy:
|
||||||
|
- Small gaps (< 100): Use smaller batches (20-50) for precision
|
||||||
|
- Medium gaps (100-500): Use medium batches (50-100)
|
||||||
|
- Large gaps (> 500): Use larger batches (100-200) for speed
|
||||||
|
"""
|
||||||
|
min_batch = getattr(settings, 'min_bulk_sync_batch_size', 20)
|
||||||
|
max_batch = getattr(settings, 'max_bulk_sync_batch_size', 200)
|
||||||
|
|
||||||
|
if gap_size < 100:
|
||||||
|
# Small gaps: scale from min to 50
|
||||||
|
return min(min_batch + gap_size // 2, 50)
|
||||||
|
elif gap_size < 500:
|
||||||
|
# Medium gaps: scale from 50 to 100
|
||||||
|
return min(50 + (gap_size - 100) // 4, 100)
|
||||||
|
else:
|
||||||
|
# Large gaps: scale from 100 to max
|
||||||
|
return min(100 + (gap_size - 500) // 5, max_batch)
|
||||||
|
|
||||||
async def fetch_blocks_range(self, start: int, end: int, source_url: str) -> List[Dict[str, Any]]:
|
async def fetch_blocks_range(self, start: int, end: int, source_url: str) -> List[Dict[str, Any]]:
|
||||||
"""Fetch a range of blocks from a source RPC."""
|
"""Fetch a range of blocks from a source RPC."""
|
||||||
try:
|
try:
|
||||||
@@ -168,12 +189,14 @@ class ChainSync:
|
|||||||
logger.info("Already up to date", extra={"local_height": local_height, "remote_height": remote_height})
|
logger.info("Already up to date", extra={"local_height": local_height, "remote_height": remote_height})
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
logger.info("Starting bulk import", extra={"local_height": local_height, "remote_height": remote_height, "batch_size": self._batch_size})
|
gap_size = remote_height - local_height
|
||||||
|
dynamic_batch_size = self._calculate_dynamic_batch_size(gap_size)
|
||||||
|
logger.info("Starting bulk import", extra={"local_height": local_height, "remote_height": remote_height, "gap_size": gap_size, "batch_size": dynamic_batch_size})
|
||||||
|
|
||||||
imported = 0
|
imported = 0
|
||||||
start_height = local_height + 1
|
start_height = local_height + 1
|
||||||
while start_height <= remote_height:
|
while start_height <= remote_height:
|
||||||
end_height = min(start_height + self._batch_size - 1, remote_height)
|
end_height = min(start_height + dynamic_batch_size - 1, remote_height)
|
||||||
batch = await self.fetch_blocks_range(start_height, end_height, source_url)
|
batch = await self.fetch_blocks_range(start_height, end_height, source_url)
|
||||||
if not batch:
|
if not batch:
|
||||||
logger.warning("No blocks returned for range", extra={"start": start_height, "end": end_height})
|
logger.warning("No blocks returned for range", extra={"start": start_height, "end": end_height})
|
||||||
|
|||||||
Reference in New Issue
Block a user