From 845c64831390ba2cd930ce3e379f16e4be6a84bb Mon Sep 17 00:00:00 2001 From: aitbc1 Date: Thu, 19 Mar 2026 16:08:48 +0100 Subject: [PATCH] feat(chain_sync): add configurable source/import hosts and improve rate limit handling - Add source_host/source_port parameters for flexible block polling configuration - Add import_host/import_port parameters to separate import target from source - Bypass rate limiting for localhost traffic (127.0.0.1, ::1) in middleware - Increase rate limit from 200 to 5000 requests per 60s for RPC throughput - Add receiver ready event to prevent dropping initial block broadcasts - Add special handling for 429 rate --- apps/blockchain-node/src/aitbc_chain/app.py | 6 +- .../src/aitbc_chain/chain_sync.py | 81 +++++++++++++++---- .../src/aitbc_chain/rpc/router.py | 2 + 3 files changed, 74 insertions(+), 15 deletions(-) diff --git a/apps/blockchain-node/src/aitbc_chain/app.py b/apps/blockchain-node/src/aitbc_chain/app.py index 9cf4a6e6..503dc7c3 100755 --- a/apps/blockchain-node/src/aitbc_chain/app.py +++ b/apps/blockchain-node/src/aitbc_chain/app.py @@ -32,6 +32,9 @@ class RateLimitMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): client_ip = request.client.host if request.client else "unknown" + # Bypass rate limiting for localhost (sync/health internal traffic) + if client_ip in {"127.0.0.1", "::1"}: + return await call_next(request) now = time.time() # Clean old entries self._requests[client_ip] = [ @@ -109,7 +112,8 @@ def create_app() -> FastAPI: # Middleware (applied in reverse order) app.add_middleware(RequestLoggingMiddleware) - app.add_middleware(RateLimitMiddleware, max_requests=200, window_seconds=60) + # Allow higher RPC throughput (sync + node traffic) + app.add_middleware(RateLimitMiddleware, max_requests=5000, window_seconds=60) app.add_middleware( CORSMiddleware, allow_origins=[ diff --git a/apps/blockchain-node/src/aitbc_chain/chain_sync.py b/apps/blockchain-node/src/aitbc_chain/chain_sync.py index 17c3589e..a91a0006 100644 --- a/apps/blockchain-node/src/aitbc_chain/chain_sync.py +++ b/apps/blockchain-node/src/aitbc_chain/chain_sync.py @@ -13,13 +13,20 @@ from typing import Dict, Any, Optional, List logger = logging.getLogger(__name__) class ChainSyncService: - def __init__(self, redis_url: str, node_id: str, rpc_port: int = 8006, leader_host: str = None): + def __init__(self, redis_url: str, node_id: str, rpc_port: int = 8006, leader_host: str = None, + source_host: str = "127.0.0.1", source_port: int = None, + import_host: str = "127.0.0.1", import_port: int = None): self.redis_url = redis_url self.node_id = node_id - self.rpc_port = rpc_port - self.leader_host = leader_host # Host of the leader node + self.rpc_port = rpc_port # kept for backward compat (local poll if source_port None) + self.leader_host = leader_host # Host of the leader node (legacy) + self.source_host = source_host + self.source_port = source_port or rpc_port + self.import_host = import_host + self.import_port = import_port or rpc_port self._stop_event = asyncio.Event() self._redis = None + self._receiver_ready = asyncio.Event() async def start(self): """Start chain synchronization service""" @@ -35,10 +42,11 @@ class ChainSyncService: return # Start block broadcasting task - broadcast_task = asyncio.create_task(self._broadcast_blocks()) - # Start block receiving task receive_task = asyncio.create_task(self._receive_blocks()) + # Wait until receiver subscribed so we don't drop the initial burst + await self._receiver_ready.wait() + broadcast_task = asyncio.create_task(self._broadcast_blocks()) try: await self._stop_event.wait() @@ -68,7 +76,7 @@ class ChainSyncService: try: # Get current head from local RPC async with aiohttp.ClientSession() as session: - async with session.get(f"http://127.0.0.1:{self.rpc_port}/rpc/head") as resp: + async with session.get(f"http://{self.source_host}:{self.source_port}/rpc/head") as resp: if resp.status == 200: head_data = await resp.json() current_height = head_data.get('height', 0) @@ -85,11 +93,19 @@ class ChainSyncService: last_broadcast_height = current_height logger.info(f"Broadcasted blocks up to height {current_height}") + elif resp.status == 429: + raise Exception("rate_limit") else: raise Exception(f"RPC returned status {resp.status}") except Exception as e: retry_count += 1 + # If rate-limited, wait longer before retrying + if str(e) == "rate_limit": + delay = base_delay * 30 + logger.warning(f"RPC rate limited, retrying in {delay}s") + await asyncio.sleep(delay) + continue if retry_count <= max_retries: delay = base_delay * (2 ** (retry_count - 1)) # Exponential backoff logger.warning(f"RPC connection failed (attempt {retry_count}/{max_retries}), retrying in {delay}s: {e}") @@ -109,6 +125,7 @@ class ChainSyncService: pubsub = self._redis.pubsub() await pubsub.subscribe("blocks") + self._receiver_ready.set() logger.info("Subscribed to block broadcasts") @@ -126,11 +143,12 @@ class ChainSyncService: async def _get_block_by_height(self, height: int, session) -> Optional[Dict[str, Any]]: """Get block data by height from local RPC""" try: - async with session.get(f"http://127.0.0.1:{self.rpc_port}/rpc/blocks?start={height}&end={height}") as resp: + async with session.get(f"http://{self.source_host}:{self.source_port}/rpc/blocks-range?start={height}&end={height}") as resp: if resp.status == 200: blocks_data = await resp.json() blocks = blocks_data.get('blocks', []) - return blocks[0] if blocks else None + block = blocks[0] if blocks else None + return block except Exception as e: logger.error(f"Error getting block {height}: {e}") return None @@ -156,8 +174,8 @@ class ChainSyncService: return # Determine target host - if we're a follower, import to leader, else import locally - target_host = self.leader_host if self.leader_host else "127.0.0.1" - target_port = self.rpc_port + target_host = self.import_host + target_port = self.import_port # Retry logic for import max_retries = 3 @@ -178,7 +196,11 @@ class ChainSyncService: logger.debug(f"Rejected block {block_data.get('height')}: {result.get('reason')}") return else: - raise Exception(f"HTTP {resp.status}") + try: + body = await resp.text() + except Exception: + body = "" + raise Exception(f"HTTP {resp.status}: {body}") except Exception as e: if attempt < max_retries - 1: @@ -192,9 +214,27 @@ class ChainSyncService: except Exception as e: logger.error(f"Error importing block: {e}") -async def run_chain_sync(redis_url: str, node_id: str, rpc_port: int = 8006, leader_host: str = None): +async def run_chain_sync( + redis_url: str, + node_id: str, + rpc_port: int = 8006, + leader_host: str = None, + source_host: str = "127.0.0.1", + source_port: int = None, + import_host: str = "127.0.0.1", + import_port: int = None, +): """Run chain synchronization service""" - service = ChainSyncService(redis_url, node_id, rpc_port, leader_host) + service = ChainSyncService( + redis_url=redis_url, + node_id=node_id, + rpc_port=rpc_port, + leader_host=leader_host, + source_host=source_host, + source_port=source_port, + import_host=import_host, + import_port=import_port, + ) await service.start() def main(): @@ -205,13 +245,26 @@ def main(): parser.add_argument("--node-id", required=True, help="Node identifier") parser.add_argument("--rpc-port", type=int, default=8006, help="RPC port") parser.add_argument("--leader-host", help="Leader node host (for followers)") + parser.add_argument("--source-host", default="127.0.0.1", help="Host to poll for head/blocks") + parser.add_argument("--source-port", type=int, help="Port to poll for head/blocks") + parser.add_argument("--import-host", default="127.0.0.1", help="Host to import blocks into") + parser.add_argument("--import-port", type=int, help="Port to import blocks into") args = parser.parse_args() logging.basicConfig(level=logging.INFO) try: - asyncio.run(run_chain_sync(args.redis, args.node_id, args.rpc_port, args.leader_host)) + asyncio.run(run_chain_sync( + args.redis, + args.node_id, + args.rpc_port, + args.leader_host, + args.source_host, + args.source_port, + args.import_host, + args.import_port, + )) except KeyboardInterrupt: logger.info("Chain sync service stopped by user") diff --git a/apps/blockchain-node/src/aitbc_chain/rpc/router.py b/apps/blockchain-node/src/aitbc_chain/rpc/router.py index 441f5681..be193a2c 100755 --- a/apps/blockchain-node/src/aitbc_chain/rpc/router.py +++ b/apps/blockchain-node/src/aitbc_chain/rpc/router.py @@ -110,6 +110,7 @@ async def get_block(height: int) -> Dict[str, Any]: "height": block.height, "hash": block.hash, "parent_hash": block.parent_hash, + "proposer": block.proposer, "timestamp": block.timestamp.isoformat(), "tx_count": block.tx_count, "state_root": block.state_root, @@ -154,6 +155,7 @@ async def get_blocks_range(start: int, end: int) -> Dict[str, Any]: "height": block.height, "hash": block.hash, "parent_hash": block.parent_hash, + "proposer": block.proposer, "timestamp": block.timestamp.isoformat(), "tx_count": block.tx_count, "state_root": block.state_root,