feat(chain_sync): add configurable source/import hosts and improve rate limit handling
- Add source_host/source_port parameters for flexible block polling configuration - Add import_host/import_port parameters to separate import target from source - Bypass rate limiting for localhost traffic (127.0.0.1, ::1) in middleware - Increase rate limit from 200 to 5000 requests per 60s for RPC throughput - Add receiver ready event to prevent dropping initial block broadcasts - Add special handling for 429 rate
This commit is contained in:
@@ -32,6 +32,9 @@ class RateLimitMiddleware(BaseHTTPMiddleware):
|
||||
|
||||
async def dispatch(self, request: Request, call_next):
|
||||
client_ip = request.client.host if request.client else "unknown"
|
||||
# Bypass rate limiting for localhost (sync/health internal traffic)
|
||||
if client_ip in {"127.0.0.1", "::1"}:
|
||||
return await call_next(request)
|
||||
now = time.time()
|
||||
# Clean old entries
|
||||
self._requests[client_ip] = [
|
||||
@@ -109,7 +112,8 @@ def create_app() -> FastAPI:
|
||||
|
||||
# Middleware (applied in reverse order)
|
||||
app.add_middleware(RequestLoggingMiddleware)
|
||||
app.add_middleware(RateLimitMiddleware, max_requests=200, window_seconds=60)
|
||||
# Allow higher RPC throughput (sync + node traffic)
|
||||
app.add_middleware(RateLimitMiddleware, max_requests=5000, window_seconds=60)
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=[
|
||||
|
||||
@@ -13,13 +13,20 @@ from typing import Dict, Any, Optional, List
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ChainSyncService:
|
||||
def __init__(self, redis_url: str, node_id: str, rpc_port: int = 8006, leader_host: str = None):
|
||||
def __init__(self, redis_url: str, node_id: str, rpc_port: int = 8006, leader_host: str = None,
|
||||
source_host: str = "127.0.0.1", source_port: int = None,
|
||||
import_host: str = "127.0.0.1", import_port: int = None):
|
||||
self.redis_url = redis_url
|
||||
self.node_id = node_id
|
||||
self.rpc_port = rpc_port
|
||||
self.leader_host = leader_host # Host of the leader node
|
||||
self.rpc_port = rpc_port # kept for backward compat (local poll if source_port None)
|
||||
self.leader_host = leader_host # Host of the leader node (legacy)
|
||||
self.source_host = source_host
|
||||
self.source_port = source_port or rpc_port
|
||||
self.import_host = import_host
|
||||
self.import_port = import_port or rpc_port
|
||||
self._stop_event = asyncio.Event()
|
||||
self._redis = None
|
||||
self._receiver_ready = asyncio.Event()
|
||||
|
||||
async def start(self):
|
||||
"""Start chain synchronization service"""
|
||||
@@ -35,10 +42,11 @@ class ChainSyncService:
|
||||
return
|
||||
|
||||
# Start block broadcasting task
|
||||
broadcast_task = asyncio.create_task(self._broadcast_blocks())
|
||||
|
||||
# Start block receiving task
|
||||
receive_task = asyncio.create_task(self._receive_blocks())
|
||||
# Wait until receiver subscribed so we don't drop the initial burst
|
||||
await self._receiver_ready.wait()
|
||||
broadcast_task = asyncio.create_task(self._broadcast_blocks())
|
||||
|
||||
try:
|
||||
await self._stop_event.wait()
|
||||
@@ -68,7 +76,7 @@ class ChainSyncService:
|
||||
try:
|
||||
# Get current head from local RPC
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(f"http://127.0.0.1:{self.rpc_port}/rpc/head") as resp:
|
||||
async with session.get(f"http://{self.source_host}:{self.source_port}/rpc/head") as resp:
|
||||
if resp.status == 200:
|
||||
head_data = await resp.json()
|
||||
current_height = head_data.get('height', 0)
|
||||
@@ -85,11 +93,19 @@ class ChainSyncService:
|
||||
|
||||
last_broadcast_height = current_height
|
||||
logger.info(f"Broadcasted blocks up to height {current_height}")
|
||||
elif resp.status == 429:
|
||||
raise Exception("rate_limit")
|
||||
else:
|
||||
raise Exception(f"RPC returned status {resp.status}")
|
||||
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
# If rate-limited, wait longer before retrying
|
||||
if str(e) == "rate_limit":
|
||||
delay = base_delay * 30
|
||||
logger.warning(f"RPC rate limited, retrying in {delay}s")
|
||||
await asyncio.sleep(delay)
|
||||
continue
|
||||
if retry_count <= max_retries:
|
||||
delay = base_delay * (2 ** (retry_count - 1)) # Exponential backoff
|
||||
logger.warning(f"RPC connection failed (attempt {retry_count}/{max_retries}), retrying in {delay}s: {e}")
|
||||
@@ -109,6 +125,7 @@ class ChainSyncService:
|
||||
|
||||
pubsub = self._redis.pubsub()
|
||||
await pubsub.subscribe("blocks")
|
||||
self._receiver_ready.set()
|
||||
|
||||
logger.info("Subscribed to block broadcasts")
|
||||
|
||||
@@ -126,11 +143,12 @@ class ChainSyncService:
|
||||
async def _get_block_by_height(self, height: int, session) -> Optional[Dict[str, Any]]:
|
||||
"""Get block data by height from local RPC"""
|
||||
try:
|
||||
async with session.get(f"http://127.0.0.1:{self.rpc_port}/rpc/blocks?start={height}&end={height}") as resp:
|
||||
async with session.get(f"http://{self.source_host}:{self.source_port}/rpc/blocks-range?start={height}&end={height}") as resp:
|
||||
if resp.status == 200:
|
||||
blocks_data = await resp.json()
|
||||
blocks = blocks_data.get('blocks', [])
|
||||
return blocks[0] if blocks else None
|
||||
block = blocks[0] if blocks else None
|
||||
return block
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting block {height}: {e}")
|
||||
return None
|
||||
@@ -156,8 +174,8 @@ class ChainSyncService:
|
||||
return
|
||||
|
||||
# Determine target host - if we're a follower, import to leader, else import locally
|
||||
target_host = self.leader_host if self.leader_host else "127.0.0.1"
|
||||
target_port = self.rpc_port
|
||||
target_host = self.import_host
|
||||
target_port = self.import_port
|
||||
|
||||
# Retry logic for import
|
||||
max_retries = 3
|
||||
@@ -178,7 +196,11 @@ class ChainSyncService:
|
||||
logger.debug(f"Rejected block {block_data.get('height')}: {result.get('reason')}")
|
||||
return
|
||||
else:
|
||||
raise Exception(f"HTTP {resp.status}")
|
||||
try:
|
||||
body = await resp.text()
|
||||
except Exception:
|
||||
body = "<no body>"
|
||||
raise Exception(f"HTTP {resp.status}: {body}")
|
||||
|
||||
except Exception as e:
|
||||
if attempt < max_retries - 1:
|
||||
@@ -192,9 +214,27 @@ class ChainSyncService:
|
||||
except Exception as e:
|
||||
logger.error(f"Error importing block: {e}")
|
||||
|
||||
async def run_chain_sync(redis_url: str, node_id: str, rpc_port: int = 8006, leader_host: str = None):
|
||||
async def run_chain_sync(
|
||||
redis_url: str,
|
||||
node_id: str,
|
||||
rpc_port: int = 8006,
|
||||
leader_host: str = None,
|
||||
source_host: str = "127.0.0.1",
|
||||
source_port: int = None,
|
||||
import_host: str = "127.0.0.1",
|
||||
import_port: int = None,
|
||||
):
|
||||
"""Run chain synchronization service"""
|
||||
service = ChainSyncService(redis_url, node_id, rpc_port, leader_host)
|
||||
service = ChainSyncService(
|
||||
redis_url=redis_url,
|
||||
node_id=node_id,
|
||||
rpc_port=rpc_port,
|
||||
leader_host=leader_host,
|
||||
source_host=source_host,
|
||||
source_port=source_port,
|
||||
import_host=import_host,
|
||||
import_port=import_port,
|
||||
)
|
||||
await service.start()
|
||||
|
||||
def main():
|
||||
@@ -205,13 +245,26 @@ def main():
|
||||
parser.add_argument("--node-id", required=True, help="Node identifier")
|
||||
parser.add_argument("--rpc-port", type=int, default=8006, help="RPC port")
|
||||
parser.add_argument("--leader-host", help="Leader node host (for followers)")
|
||||
parser.add_argument("--source-host", default="127.0.0.1", help="Host to poll for head/blocks")
|
||||
parser.add_argument("--source-port", type=int, help="Port to poll for head/blocks")
|
||||
parser.add_argument("--import-host", default="127.0.0.1", help="Host to import blocks into")
|
||||
parser.add_argument("--import-port", type=int, help="Port to import blocks into")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
try:
|
||||
asyncio.run(run_chain_sync(args.redis, args.node_id, args.rpc_port, args.leader_host))
|
||||
asyncio.run(run_chain_sync(
|
||||
args.redis,
|
||||
args.node_id,
|
||||
args.rpc_port,
|
||||
args.leader_host,
|
||||
args.source_host,
|
||||
args.source_port,
|
||||
args.import_host,
|
||||
args.import_port,
|
||||
))
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Chain sync service stopped by user")
|
||||
|
||||
|
||||
@@ -110,6 +110,7 @@ async def get_block(height: int) -> Dict[str, Any]:
|
||||
"height": block.height,
|
||||
"hash": block.hash,
|
||||
"parent_hash": block.parent_hash,
|
||||
"proposer": block.proposer,
|
||||
"timestamp": block.timestamp.isoformat(),
|
||||
"tx_count": block.tx_count,
|
||||
"state_root": block.state_root,
|
||||
@@ -154,6 +155,7 @@ async def get_blocks_range(start: int, end: int) -> Dict[str, Any]:
|
||||
"height": block.height,
|
||||
"hash": block.hash,
|
||||
"parent_hash": block.parent_hash,
|
||||
"proposer": block.proposer,
|
||||
"timestamp": block.timestamp.isoformat(),
|
||||
"tx_count": block.tx_count,
|
||||
"state_root": block.state_root,
|
||||
|
||||
Reference in New Issue
Block a user