diff --git a/apps/blockchain-node/src/aitbc_chain/config.py b/apps/blockchain-node/src/aitbc_chain/config.py index 7f8a1999..8f6ed6f8 100755 --- a/apps/blockchain-node/src/aitbc_chain/config.py +++ b/apps/blockchain-node/src/aitbc_chain/config.py @@ -49,7 +49,11 @@ class ChainSettings(BaseSettings): max_txs_per_block: int = 500 # Only propose blocks if mempool is not empty (prevents empty blocks) - propose_only_if_mempool_not_empty: bool = False + propose_only_if_mempool_not_empty: bool = False # Deprecated: use block_generation_mode + + # Hybrid block generation settings + block_generation_mode: str = "hybrid" # "always", "mempool-only", "hybrid" + max_empty_block_interval: int = 60 # seconds before forcing empty block (heartbeat) # Monitoring interval (in seconds) blockchain_monitoring_interval_seconds: int = 60 @@ -77,6 +81,16 @@ class ChainSettings(BaseSettings): min_bulk_sync_batch_size: int = 20 # minimum batch size for dynamic bulk sync max_bulk_sync_batch_size: int = 200 # maximum batch size for dynamic bulk sync + # Adaptive sync settings + initial_sync_threshold: int = 10000 # blocks gap threshold for initial sync mode + initial_sync_max_batch_size: int = 1000 # max batch size during initial sync + initial_sync_poll_interval: float = 2.0 # poll interval during initial sync (seconds) + initial_sync_bulk_interval: int = 10 # min seconds between bulk sync during initial sync + large_gap_threshold: int = 1000 # blocks gap threshold for large gap mode + large_gap_max_batch_size: int = 500 # max batch size during large gap sync + large_gap_poll_interval: float = 3.0 # poll interval during large gap sync (seconds) + large_gap_bulk_interval: int = 30 # min seconds between bulk sync during large gap + gossip_backend: str = "memory" gossip_broadcast_url: Optional[str] = None default_peer_rpc_url: Optional[str] = None # HTTP RPC URL of default peer for bulk sync diff --git a/apps/blockchain-node/src/aitbc_chain/consensus/poa.py b/apps/blockchain-node/src/aitbc_chain/consensus/poa.py index c5e28828..31047936 100755 --- a/apps/blockchain-node/src/aitbc_chain/consensus/poa.py +++ b/apps/blockchain-node/src/aitbc_chain/consensus/poa.py @@ -107,6 +107,7 @@ class PoAProposer: self._stop_event = asyncio.Event() self._task: Optional[asyncio.Task[None]] = None self._last_proposer_id: Optional[str] = None + self._last_block_timestamp: Optional[datetime] = None async def start(self) -> None: if self._task is not None: @@ -127,6 +128,8 @@ class PoAProposer: async def _run_loop(self) -> None: # Initial sleep so we don't start proposing immediately await asyncio.sleep(self._config.interval_seconds) + from ..config import settings + block_generation_mode = getattr(settings, "block_generation_mode", "hybrid") while not self._stop_event.is_set(): if self._stop_event.is_set(): break @@ -135,11 +138,21 @@ class PoAProposer: if proposed: await self._wait_until_next_slot() else: - # If we skipped proposing, wait a regular interval - try: - await asyncio.wait_for(self._stop_event.wait(), timeout=self._config.interval_seconds) - except asyncio.TimeoutError: - pass + # If we skipped proposing, wait based on mode + if block_generation_mode == "hybrid": + # Check more frequently in hybrid mode to catch heartbeat timing + # Use 1/4 of normal interval for responsive heartbeat checks + check_interval = self._config.interval_seconds / 4 + try: + await asyncio.wait_for(self._stop_event.wait(), timeout=check_interval) + except asyncio.TimeoutError: + pass + else: + # Regular interval for other modes + try: + await asyncio.wait_for(self._stop_event.wait(), timeout=self._config.interval_seconds) + except asyncio.TimeoutError: + pass except Exception as exc: # pragma: no cover - defensive logging self._logger.exception("Failed to propose block", extra={"error": str(exc)}) await asyncio.sleep(1.0) @@ -165,12 +178,36 @@ class PoAProposer: from ..config import settings mempool = get_mempool() - # Check if we should only propose when mempool is not empty (disabled for testing) - # if getattr(settings, "propose_only_if_mempool_not_empty", True): - # mempool_size = mempool.size(self._config.chain_id) - # if mempool_size == 0: - # self._logger.info(f"[PROPOSE] Skipping block proposal: mempool is empty (chain={self._config.chain_id})") - # return False + # Hybrid block generation logic + block_generation_mode = getattr(settings, "block_generation_mode", "hybrid") + max_empty_block_interval = getattr(settings, "max_empty_block_interval", 60) + + if block_generation_mode in ["mempool-only", "hybrid"]: + mempool_size = mempool.size(self._config.chain_id) + + if block_generation_mode == "mempool-only": + # Strict mempool-only mode: skip if empty + if mempool_size == 0: + self._logger.info(f"[PROPOSE] Skipping block proposal: mempool is empty (chain={self._config.chain_id}, mode=mempool-only)") + metrics_registry.increment("sync_empty_blocks_skipped_total") + return False + elif block_generation_mode == "hybrid": + # Hybrid mode: check heartbeat interval + if self._last_block_timestamp: + time_since_last_block = (datetime.utcnow() - self._last_block_timestamp).total_seconds() + if mempool_size == 0 and time_since_last_block < max_empty_block_interval: + self._logger.info(f"[PROPOSE] Skipping block proposal: mempool empty, heartbeat not yet due (chain={self._config.chain_id}, mode=hybrid, idle_time={time_since_last_block:.1f}s)") + metrics_registry.increment("sync_empty_blocks_skipped_total") + return False + elif mempool_size == 0 and time_since_last_block >= max_empty_block_interval: + self._logger.info(f"[PROPOSE] Forcing heartbeat block: idle for {time_since_last_block:.1f}s (chain={self._config.chain_id}, mode=hybrid)") + metrics_registry.increment("sync_heartbeat_blocks_forced_total") + metrics_registry.observe("sync_time_since_last_block_seconds", time_since_last_block) + elif mempool_size == 0: + # No previous block timestamp, skip (will be set after genesis) + self._logger.info(f"[PROPOSE] Skipping block proposal: no previous block timestamp (chain={self._config.chain_id}, mode=hybrid)") + metrics_registry.increment("sync_empty_blocks_skipped_total") + return False with self._session_factory() as session: head = session.exec(select(Block).where(Block.chain_id == self._config.chain_id).order_by(Block.height.desc()).limit(1)).first() @@ -324,6 +361,9 @@ class PoAProposer: metrics_registry.increment("poa_proposer_switches_total") self._last_proposer_id = self._config.proposer_id + # Update last block timestamp for heartbeat logic + self._last_block_timestamp = timestamp + self._logger.info( "Proposed block", extra={ diff --git a/apps/blockchain-node/src/aitbc_chain/sync.py b/apps/blockchain-node/src/aitbc_chain/sync.py index 8764fb1d..ef8e8394 100755 --- a/apps/blockchain-node/src/aitbc_chain/sync.py +++ b/apps/blockchain-node/src/aitbc_chain/sync.py @@ -101,7 +101,7 @@ class ChainSync: validator: Optional[ProposerSignatureValidator] = None, validate_signatures: bool = True, batch_size: int = 50, - poll_interval: float = 0.5, + poll_interval: float = 5.0, ) -> None: self._session_factory = session_factory self._chain_id = chain_id or settings.chain_id @@ -120,24 +120,91 @@ class ChainSync: def _calculate_dynamic_batch_size(self, gap_size: int) -> int: """Calculate dynamic batch size based on gap size. - + Strategy: - - Small gaps (< 100): Use smaller batches (20-50) for precision - - Medium gaps (100-500): Use medium batches (50-100) - - Large gaps (> 500): Use larger batches (100-200) for speed + - Initial sync gaps (>10,000): Very large batches (500-1000) for maximum throughput + - Large gaps (1,000-10,000): Accelerated batches (200-500) + - Medium gaps (500-1,000): Standard batches (100-200) + - Small gaps (<500): Precision batches (20-100) """ min_batch = getattr(settings, 'min_bulk_sync_batch_size', 20) max_batch = getattr(settings, 'max_bulk_sync_batch_size', 200) - - if gap_size < 100: - # Small gaps: scale from min to 50 - return min(min_batch + gap_size // 2, 50) - elif gap_size < 500: - # Medium gaps: scale from 50 to 100 + initial_sync_threshold = getattr(settings, 'initial_sync_threshold', 10000) + initial_sync_max_batch = getattr(settings, 'initial_sync_max_batch_size', 1000) + large_gap_threshold = getattr(settings, 'large_gap_threshold', 1000) + large_gap_max_batch = getattr(settings, 'large_gap_max_batch_size', 500) + + if gap_size > initial_sync_threshold: + # Initial sync: very large batches for maximum throughput + return min(500 + (gap_size - initial_sync_threshold) // 20, initial_sync_max_batch) + elif gap_size > large_gap_threshold: + # Large gap: accelerated sync + return min(200 + (gap_size - large_gap_threshold) // 10, large_gap_max_batch) + elif gap_size > 500: + # Medium gap: standard sync + return min(100 + (gap_size - 500) // 5, max_batch) + elif gap_size > 100: + # Medium-small gaps: scale from 50 to 100 return min(50 + (gap_size - 100) // 4, 100) else: - # Large gaps: scale from 100 to max - return min(100 + (gap_size - 500) // 5, max_batch) + # Small gaps: scale from min to 50 + return min(min_batch + gap_size // 2, 50) + + def _get_adaptive_poll_interval(self, gap_size: int) -> float: + """Get adaptive polling interval based on sync mode. + + Strategy: + - Initial sync gaps (>10,000): Fast polling (2s) for maximum throughput + - Large gaps (1,000-10,000): Moderate polling (3s) + - Medium gaps (500-1,000): Standard polling (5s) + - Small gaps (<500): Steady-state polling (5s) + """ + initial_sync_threshold = getattr(settings, 'initial_sync_threshold', 10000) + initial_sync_poll_interval = getattr(settings, 'initial_sync_poll_interval', 2.0) + large_gap_threshold = getattr(settings, 'large_gap_threshold', 1000) + large_gap_poll_interval = getattr(settings, 'large_gap_poll_interval', 3.0) + + if gap_size > initial_sync_threshold: + return initial_sync_poll_interval + elif gap_size > large_gap_threshold: + return large_gap_poll_interval + else: + return self._poll_interval # Use configured steady-state poll interval + + def _get_adaptive_bulk_sync_interval(self, gap_size: int) -> int: + """Get adaptive bulk sync interval based on sync mode. + + Strategy: + - Initial sync gaps (>10,000): Frequent bulk sync (10s) for maximum throughput + - Large gaps (1,000-10,000): Moderate bulk sync (30s) + - Medium gaps (500-1,000): Standard bulk sync (60s) + - Small gaps (<500): Steady-state bulk sync (60s) + """ + initial_sync_threshold = getattr(settings, 'initial_sync_threshold', 10000) + initial_sync_bulk_interval = getattr(settings, 'initial_sync_bulk_interval', 10) + large_gap_threshold = getattr(settings, 'large_gap_threshold', 1000) + large_gap_bulk_interval = getattr(settings, 'large_gap_bulk_interval', 30) + + if gap_size > initial_sync_threshold: + return initial_sync_bulk_interval + elif gap_size > large_gap_threshold: + return large_gap_bulk_interval + else: + return self._min_bulk_sync_interval # Use configured steady-state interval + + def _get_sync_mode(self, gap_size: int) -> str: + """Determine current sync mode based on gap size.""" + initial_sync_threshold = getattr(settings, 'initial_sync_threshold', 10000) + large_gap_threshold = getattr(settings, 'large_gap_threshold', 1000) + + if gap_size > initial_sync_threshold: + return "initial_sync" + elif gap_size > large_gap_threshold: + return "large_gap" + elif gap_size > 500: + return "medium_gap" + else: + return "steady_state" async def fetch_blocks_range(self, start: int, end: int, source_url: str) -> List[Dict[str, Any]]: """Fetch a range of blocks from a source RPC.""" @@ -161,13 +228,6 @@ class ChainSync: if import_url is None: import_url = "http://127.0.0.1:8006" # default local RPC - # Rate limiting check - current_time = time.time() - time_since_last_sync = current_time - self._last_bulk_sync_time - if time_since_last_sync < self._min_bulk_sync_interval: - logger.warning("Bulk sync rate limited", extra={"time_since_last_sync": time_since_last_sync, "min_interval": self._min_bulk_sync_interval}) - return 0 - # Get local head with self._session_factory() as session: local_head = session.exec( @@ -190,8 +250,36 @@ class ChainSync: return 0 gap_size = remote_height - local_height + sync_mode = self._get_sync_mode(gap_size) dynamic_batch_size = self._calculate_dynamic_batch_size(gap_size) - logger.info("Starting bulk import", extra={"local_height": local_height, "remote_height": remote_height, "gap_size": gap_size, "batch_size": dynamic_batch_size}) + adaptive_bulk_interval = self._get_adaptive_bulk_sync_interval(gap_size) + adaptive_poll_interval = self._get_adaptive_poll_interval(gap_size) + + # Rate limiting check with adaptive interval + current_time = time.time() + time_since_last_sync = current_time - self._last_bulk_sync_time + if time_since_last_sync < adaptive_bulk_interval: + logger.warning("Bulk sync rate limited", extra={ + "time_since_last_sync": time_since_last_sync, + "min_interval": adaptive_bulk_interval, + "sync_mode": sync_mode + }) + return 0 + + logger.info("Starting bulk import", extra={ + "local_height": local_height, + "remote_height": remote_height, + "gap_size": gap_size, + "batch_size": dynamic_batch_size, + "sync_mode": sync_mode, + "bulk_interval": adaptive_bulk_interval, + "poll_interval": adaptive_poll_interval + }) + + # Record sync mode metrics + metrics_registry.set_gauge(f"sync_mode_{sync_mode}", 1.0) + metrics_registry.set_gauge("sync_gap_size", float(gap_size)) + metrics_registry.set_gauge("sync_batch_size", float(dynamic_batch_size)) imported = 0 start_height = local_height + 1 @@ -213,14 +301,26 @@ class ChainSync: break start_height = end_height + 1 - # Brief pause to avoid overwhelming the DB - await asyncio.sleep(self._poll_interval) + # Brief pause to avoid overwhelming the DB (use adaptive poll interval) + await asyncio.sleep(adaptive_poll_interval) + + logger.info("Bulk import completed", extra={ + "imported": imported, + "final_height": remote_height, + "sync_mode": sync_mode + }) + + # Record completion metrics + sync_duration = time.time() - current_time + metrics_registry.observe("sync_bulk_duration_seconds", sync_duration) + if imported > 0: + sync_rate = imported / sync_duration + metrics_registry.observe("sync_blocks_per_second", sync_rate) + metrics_registry.set_gauge("sync_chain_height", float(remote_height)) - logger.info("Bulk import completed", extra={"imported": imported, "final_height": remote_height}) - # Update last bulk sync time self._last_bulk_sync_time = current_time - + return imported def import_block(self, block_data: Dict[str, Any], transactions: Optional[List[Dict[str, Any]]] = None) -> ImportResult: