feat(chain_sync): add leader host routing for follower nodes
- Add leader_host parameter to ChainSyncService for follower node configuration - Route block imports to leader node when running as follower - Add --leader-host CLI argument for follower node setup - Update get_token_supply to calculate actual values from database - Replace hardcoded supply values with real account balances and counts - Calculate circulating supply from actual account balances - Use total_accounts from database query
This commit is contained in:
@@ -13,10 +13,11 @@ from typing import Dict, Any, Optional, List
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class ChainSyncService:
|
class ChainSyncService:
|
||||||
def __init__(self, redis_url: str, node_id: str, rpc_port: int = 8006):
|
def __init__(self, redis_url: str, node_id: str, rpc_port: int = 8006, leader_host: str = None):
|
||||||
self.redis_url = redis_url
|
self.redis_url = redis_url
|
||||||
self.node_id = node_id
|
self.node_id = node_id
|
||||||
self.rpc_port = rpc_port
|
self.rpc_port = rpc_port
|
||||||
|
self.leader_host = leader_host # Host of the leader node
|
||||||
self._stop_event = asyncio.Event()
|
self._stop_event = asyncio.Event()
|
||||||
self._redis = None
|
self._redis = None
|
||||||
|
|
||||||
@@ -137,9 +138,13 @@ class ChainSyncService:
|
|||||||
if block_data.get('proposer') == self.node_id:
|
if block_data.get('proposer') == self.node_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Determine target host - if we're a follower, import to leader, else import locally
|
||||||
|
target_host = self.leader_host if self.leader_host else "127.0.0.1"
|
||||||
|
target_port = self.rpc_port
|
||||||
|
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.post(
|
async with session.post(
|
||||||
f"http://127.0.0.1:{self.rpc_port}/rpc/importBlock",
|
f"http://{target_host}:{target_port}/rpc/importBlock",
|
||||||
json=block_data
|
json=block_data
|
||||||
) as resp:
|
) as resp:
|
||||||
if resp.status == 200:
|
if resp.status == 200:
|
||||||
@@ -154,9 +159,9 @@ class ChainSyncService:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error importing block: {e}")
|
logger.error(f"Error importing block: {e}")
|
||||||
|
|
||||||
async def run_chain_sync(redis_url: str, node_id: str, rpc_port: int = 8006):
|
async def run_chain_sync(redis_url: str, node_id: str, rpc_port: int = 8006, leader_host: str = None):
|
||||||
"""Run chain synchronization service"""
|
"""Run chain synchronization service"""
|
||||||
service = ChainSyncService(redis_url, node_id, rpc_port)
|
service = ChainSyncService(redis_url, node_id, rpc_port, leader_host)
|
||||||
await service.start()
|
await service.start()
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@@ -166,13 +171,14 @@ def main():
|
|||||||
parser.add_argument("--redis", default="redis://localhost:6379", help="Redis URL")
|
parser.add_argument("--redis", default="redis://localhost:6379", help="Redis URL")
|
||||||
parser.add_argument("--node-id", required=True, help="Node identifier")
|
parser.add_argument("--node-id", required=True, help="Node identifier")
|
||||||
parser.add_argument("--rpc-port", type=int, default=8006, help="RPC port")
|
parser.add_argument("--rpc-port", type=int, default=8006, help="RPC port")
|
||||||
|
parser.add_argument("--leader-host", help="Leader node host (for followers)")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
asyncio.run(run_chain_sync(args.redis, args.node_id, args.rpc_port))
|
asyncio.run(run_chain_sync(args.redis, args.node_id, args.rpc_port, args.leader_host))
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logger.info("Chain sync service stopped by user")
|
logger.info("Chain sync service stopped by user")
|
||||||
|
|
||||||
|
|||||||
@@ -653,34 +653,37 @@ async def get_blockchain_info(chain_id: str = None) -> Dict[str, Any]:
|
|||||||
async def get_token_supply(chain_id: str = None) -> Dict[str, Any]:
|
async def get_token_supply(chain_id: str = None) -> Dict[str, Any]:
|
||||||
"""Get token supply information"""
|
"""Get token supply information"""
|
||||||
from ..config import settings as cfg
|
from ..config import settings as cfg
|
||||||
|
from ..models import Account
|
||||||
|
|
||||||
# Use default chain_id from settings if not provided
|
chain_id = get_chain_id(chain_id)
|
||||||
if chain_id is None:
|
|
||||||
chain_id = cfg.chain_id
|
|
||||||
|
|
||||||
metrics_registry.increment("rpc_supply_total")
|
metrics_registry.increment("rpc_supply_total")
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
|
|
||||||
with session_scope() as session:
|
with session_scope() as session:
|
||||||
# Production implementation - no faucet in mainnet
|
# Calculate actual values from database
|
||||||
|
accounts = session.exec(select(Account).where(Account.chain_id == chain_id)).all()
|
||||||
|
total_balance = sum(account.balance for account in accounts)
|
||||||
|
total_accounts = len(accounts)
|
||||||
|
|
||||||
|
# Production implementation - calculate real circulating supply
|
||||||
if chain_id == "ait-mainnet":
|
if chain_id == "ait-mainnet":
|
||||||
response = {
|
response = {
|
||||||
"chain_id": chain_id,
|
"chain_id": chain_id,
|
||||||
"total_supply": 1000000000, # 1 billion from genesis
|
"total_supply": 1000000000, # 1 billion from genesis
|
||||||
"circulating_supply": 0, # No transactions yet
|
"circulating_supply": total_balance, # Actual tokens in circulation
|
||||||
"mint_per_unit": cfg.mint_per_unit,
|
"mint_per_unit": cfg.mint_per_unit,
|
||||||
"total_accounts": 0
|
"total_accounts": total_accounts # Actual account count
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
# Devnet with faucet
|
# Devnet with faucet - use actual calculations
|
||||||
response = {
|
response = {
|
||||||
"chain_id": chain_id,
|
"chain_id": chain_id,
|
||||||
"total_supply": 1000000000, # 1 billion from genesis
|
"total_supply": 1000000000, # 1 billion from genesis
|
||||||
"circulating_supply": 0, # No transactions yet
|
"circulating_supply": total_balance, # Actual tokens in circulation
|
||||||
"faucet_balance": 1000000000, # All tokens in faucet
|
"faucet_balance": 1000000000, # All tokens in faucet
|
||||||
"faucet_address": "ait1faucet000000000000000000000000000000000",
|
"faucet_address": "ait1faucet000000000000000000000000000000000",
|
||||||
"mint_per_unit": cfg.mint_per_unit,
|
"mint_per_unit": cfg.mint_per_unit,
|
||||||
"total_accounts": 0
|
"total_accounts": total_accounts # Actual account count
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics_registry.observe("rpc_supply_duration_seconds", time.perf_counter() - start)
|
metrics_registry.observe("rpc_supply_duration_seconds", time.perf_counter() - start)
|
||||||
|
|||||||
Reference in New Issue
Block a user