feat: implement automatic bulk sync for blockchain nodes
Some checks failed
Integration Tests / test-service-integration (push) Failing after 8s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 2s
P2P Network Verification / p2p-verification (push) Successful in 2s
Python Tests / test-python (push) Successful in 17s
Security Scanning / security-scan (push) Failing after 37s
Blockchain Synchronization Verification / sync-verification (push) Failing after 2s
Some checks failed
Integration Tests / test-service-integration (push) Failing after 8s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 2s
P2P Network Verification / p2p-verification (push) Successful in 2s
Python Tests / test-python (push) Successful in 17s
Security Scanning / security-scan (push) Failing after 37s
Blockchain Synchronization Verification / sync-verification (push) Failing after 2s
- Add auto_sync_enabled, auto_sync_threshold, auto_sync_max_retries config - Add min_bulk_sync_interval for rate limiting - Implement gap detection in process_blocks() with automatic bulk sync trigger - Add rate limiting to ChainSync.bulk_import_from() to prevent sync loops - Automatic bulk sync triggers when gap > threshold (default 10 blocks) - Retries block import after bulk sync completes - Logs sync events for monitoring and debugging
This commit is contained in:
@@ -68,6 +68,12 @@ class ChainSettings(BaseSettings):
|
|||||||
trusted_proposers: str = "" # comma-separated list of trusted proposer IDs
|
trusted_proposers: str = "" # comma-separated list of trusted proposer IDs
|
||||||
max_reorg_depth: int = 10 # max blocks to reorg on conflict
|
max_reorg_depth: int = 10 # max blocks to reorg on conflict
|
||||||
sync_validate_signatures: bool = True # validate proposer signatures on import
|
sync_validate_signatures: bool = True # validate proposer signatures on import
|
||||||
|
|
||||||
|
# Automatic bulk sync settings
|
||||||
|
auto_sync_enabled: bool = True # enable automatic bulk sync when gap detected
|
||||||
|
auto_sync_threshold: int = 10 # blocks gap threshold to trigger bulk sync
|
||||||
|
auto_sync_max_retries: int = 3 # max retry attempts for automatic bulk sync
|
||||||
|
min_bulk_sync_interval: int = 60 # minimum seconds between bulk sync attempts
|
||||||
|
|
||||||
gossip_backend: str = "memory"
|
gossip_backend: str = "memory"
|
||||||
gossip_broadcast_url: Optional[str] = None
|
gossip_broadcast_url: Optional[str] = None
|
||||||
|
|||||||
@@ -125,6 +125,7 @@ class BlockchainNode:
|
|||||||
return
|
return
|
||||||
|
|
||||||
async def process_blocks():
|
async def process_blocks():
|
||||||
|
last_bulk_sync_time = 0
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
block_data = await block_sub.queue.get()
|
block_data = await block_sub.queue.get()
|
||||||
@@ -137,6 +138,46 @@ class BlockchainNode:
|
|||||||
sync = ChainSync(session_factory=session_scope, chain_id=chain_id)
|
sync = ChainSync(session_factory=session_scope, chain_id=chain_id)
|
||||||
res = sync.import_block(block_data, transactions=block_data.get("transactions"))
|
res = sync.import_block(block_data, transactions=block_data.get("transactions"))
|
||||||
logger.info(f"Import result: accepted={res.accepted}, reason={res.reason}")
|
logger.info(f"Import result: accepted={res.accepted}, reason={res.reason}")
|
||||||
|
|
||||||
|
# Automatic bulk sync on gap detection
|
||||||
|
if not res.accepted and "Gap detected" in res.reason and settings.auto_sync_enabled:
|
||||||
|
# Parse gap size from reason string
|
||||||
|
try:
|
||||||
|
reason_parts = res.reason.split(":")
|
||||||
|
our_height = int(reason_parts[1].strip().split(",")[0].replace("our height: ", ""))
|
||||||
|
received_height = int(reason_parts[2].strip().replace("received: ", ""))
|
||||||
|
gap_size = received_height - our_height
|
||||||
|
|
||||||
|
if gap_size > settings.auto_sync_threshold:
|
||||||
|
current_time = asyncio.get_event_loop().time()
|
||||||
|
time_since_last_sync = current_time - last_bulk_sync_time
|
||||||
|
|
||||||
|
if time_since_last_sync >= settings.min_bulk_sync_interval:
|
||||||
|
logger.warning(f"Gap detected: {gap_size} blocks, triggering automatic bulk sync")
|
||||||
|
|
||||||
|
# Get source URL from block metadata if available
|
||||||
|
source_url = block_data.get("source_url")
|
||||||
|
if not source_url:
|
||||||
|
# Fallback to default peer URL from gossip backend
|
||||||
|
source_url = settings.gossip_broadcast_url
|
||||||
|
|
||||||
|
if source_url:
|
||||||
|
try:
|
||||||
|
imported = await sync.bulk_import_from(source_url)
|
||||||
|
logger.info(f"Bulk sync completed: {imported} blocks imported")
|
||||||
|
last_bulk_sync_time = current_time
|
||||||
|
|
||||||
|
# Retry block import after bulk sync
|
||||||
|
res = sync.import_block(block_data, transactions=block_data.get("transactions"))
|
||||||
|
logger.info(f"Retry import result: accepted={res.accepted}, reason={res.reason}")
|
||||||
|
except Exception as sync_exc:
|
||||||
|
logger.error(f"Automatic bulk sync failed: {sync_exc}")
|
||||||
|
else:
|
||||||
|
logger.warning("No source URL available for bulk sync")
|
||||||
|
else:
|
||||||
|
logger.info(f"Skipping bulk sync, too recent ({time_since_last_sync:.0f}s ago)")
|
||||||
|
except (ValueError, IndexError) as parse_exc:
|
||||||
|
logger.error(f"Failed to parse gap size from reason: {res.reason}, error: {parse_exc}")
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.error(f"Error processing block from gossip: {exc}")
|
logger.error(f"Error processing block from gossip: {exc}")
|
||||||
|
|
||||||
|
|||||||
@@ -111,6 +111,8 @@ class ChainSync:
|
|||||||
self._batch_size = batch_size
|
self._batch_size = batch_size
|
||||||
self._poll_interval = poll_interval
|
self._poll_interval = poll_interval
|
||||||
self._client = httpx.AsyncClient(timeout=10.0)
|
self._client = httpx.AsyncClient(timeout=10.0)
|
||||||
|
self._last_bulk_sync_time = 0
|
||||||
|
self._min_bulk_sync_interval = getattr(settings, 'min_bulk_sync_interval', 60)
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
"""Close HTTP client."""
|
"""Close HTTP client."""
|
||||||
@@ -138,6 +140,13 @@ class ChainSync:
|
|||||||
if import_url is None:
|
if import_url is None:
|
||||||
import_url = "http://127.0.0.1:8006" # default local RPC
|
import_url = "http://127.0.0.1:8006" # default local RPC
|
||||||
|
|
||||||
|
# Rate limiting check
|
||||||
|
current_time = time.time()
|
||||||
|
time_since_last_sync = current_time - self._last_bulk_sync_time
|
||||||
|
if time_since_last_sync < self._min_bulk_sync_interval:
|
||||||
|
logger.warning("Bulk sync rate limited", extra={"time_since_last_sync": time_since_last_sync, "min_interval": self._min_bulk_sync_interval})
|
||||||
|
return 0
|
||||||
|
|
||||||
# Get local head
|
# Get local head
|
||||||
with self._session_factory() as session:
|
with self._session_factory() as session:
|
||||||
local_head = session.exec(
|
local_head = session.exec(
|
||||||
@@ -185,6 +194,10 @@ class ChainSync:
|
|||||||
await asyncio.sleep(self._poll_interval)
|
await asyncio.sleep(self._poll_interval)
|
||||||
|
|
||||||
logger.info("Bulk import completed", extra={"imported": imported, "final_height": remote_height})
|
logger.info("Bulk import completed", extra={"imported": imported, "final_height": remote_height})
|
||||||
|
|
||||||
|
# Update last bulk sync time
|
||||||
|
self._last_bulk_sync_time = current_time
|
||||||
|
|
||||||
return imported
|
return imported
|
||||||
|
|
||||||
def import_block(self, block_data: Dict[str, Any], transactions: Optional[List[Dict[str, Any]]] = None) -> ImportResult:
|
def import_block(self, block_data: Dict[str, Any], transactions: Optional[List[Dict[str, Any]]] = None) -> ImportResult:
|
||||||
|
|||||||
Reference in New Issue
Block a user