fix: stabilize multichain hub and follower sync flow
This commit is contained in:
@@ -7,6 +7,7 @@ Handles decentralized peer-to-peer mesh communication between blockchain nodes
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from .config import settings
|
||||
from .mempool import get_mempool, compute_tx_hash
|
||||
from .network.nat_traversal import NATTraversalService
|
||||
from .network.island_manager import IslandManager
|
||||
@@ -89,7 +90,7 @@ class P2PNetworkService:
|
||||
self.port,
|
||||
self.island_id,
|
||||
self.island_name,
|
||||
self.config.redis_url
|
||||
settings.redis_url
|
||||
)
|
||||
await self.hub_manager.register_as_hub(self.public_endpoint[0] if self.public_endpoint else None,
|
||||
self.public_endpoint[1] if self.public_endpoint else None)
|
||||
@@ -158,6 +159,29 @@ class P2PNetworkService:
|
||||
await self._server.wait_closed()
|
||||
|
||||
|
||||
async def _send_message(self, writer: asyncio.StreamWriter, message: Dict[str, Any]):
|
||||
"""Serialize and send a newline-delimited JSON message"""
|
||||
payload = json.dumps(message).encode() + b"\n"
|
||||
writer.write(payload)
|
||||
await writer.drain()
|
||||
|
||||
|
||||
async def _ping_peers_loop(self):
|
||||
"""Periodically ping active peers to keep connections healthy"""
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
writers = list(self.active_connections.items())
|
||||
for peer_id, writer in writers:
|
||||
try:
|
||||
await self._send_message(writer, {'type': 'ping', 'node_id': self.node_id})
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to ping {peer_id}: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in ping loop: {e}")
|
||||
|
||||
await asyncio.sleep(10)
|
||||
|
||||
|
||||
async def _mempool_sync_loop(self):
|
||||
"""Periodically check local mempool and broadcast new transactions to peers"""
|
||||
self.seen_txs = set()
|
||||
@@ -170,23 +194,26 @@ class P2PNetworkService:
|
||||
|
||||
if hasattr(mempool, '_transactions'): # InMemoryMempool
|
||||
with mempool._lock:
|
||||
for tx_hash, pending_tx in mempool._transactions.items():
|
||||
if tx_hash not in self.seen_txs:
|
||||
self.seen_txs.add(tx_hash)
|
||||
txs_to_broadcast.append(pending_tx.content)
|
||||
for chain_id, chain_transactions in mempool._transactions.items():
|
||||
for tx_hash, pending_tx in chain_transactions.items():
|
||||
seen_key = (chain_id, tx_hash)
|
||||
if seen_key not in self.seen_txs:
|
||||
self.seen_txs.add(seen_key)
|
||||
txs_to_broadcast.append(pending_tx.content)
|
||||
|
||||
elif hasattr(mempool, '_conn'): # DatabaseMempool
|
||||
with mempool._lock:
|
||||
cursor = mempool._conn.execute(
|
||||
"SELECT tx_hash, content FROM mempool WHERE chain_id = ?",
|
||||
('ait-mainnet',)
|
||||
"SELECT chain_id, tx_hash, content FROM mempool"
|
||||
)
|
||||
for row in cursor.fetchall():
|
||||
tx_hash = row[0]
|
||||
if tx_hash not in self.seen_txs:
|
||||
self.seen_txs.add(tx_hash)
|
||||
chain_id = row[0]
|
||||
tx_hash = row[1]
|
||||
seen_key = (chain_id, tx_hash)
|
||||
if seen_key not in self.seen_txs:
|
||||
self.seen_txs.add(seen_key)
|
||||
import json
|
||||
txs_to_broadcast.append(json.loads(row[1]))
|
||||
txs_to_broadcast.append(json.loads(row[2]))
|
||||
|
||||
logger.debug(f"Mempool sync loop iteration. txs_to_broadcast: {len(txs_to_broadcast)}")
|
||||
for tx in txs_to_broadcast:
|
||||
@@ -297,32 +324,32 @@ class P2PNetworkService:
|
||||
|
||||
# Store peer's island information
|
||||
logger.info(f"Peer {peer_node_id} from island {peer_island_id} (hub: {peer_is_hub})")
|
||||
|
||||
|
||||
# Store peer's public endpoint if provided
|
||||
if peer_public_address and peer_public_port:
|
||||
logger.info(f"Peer {peer_node_id} public endpoint: {peer_public_address}:{peer_public_port}")
|
||||
|
||||
|
||||
# Accept handshake and store connection
|
||||
logger.info(f"Handshake accepted from node {peer_node_id} at {addr}")
|
||||
|
||||
|
||||
# If we already have a connection to this node, drop the new one to prevent duplicates
|
||||
if peer_node_id in self.active_connections:
|
||||
logger.info(f"Already connected to node {peer_node_id}. Dropping duplicate inbound.")
|
||||
writer.close()
|
||||
return
|
||||
|
||||
|
||||
self.active_connections[peer_node_id] = writer
|
||||
|
||||
|
||||
# Map their listening endpoint so we don't try to dial them
|
||||
remote_ip = addr[0]
|
||||
self.connected_endpoints.add((remote_ip, peer_listen_port))
|
||||
|
||||
|
||||
# Add peer to island manager if available
|
||||
if self.island_manager and peer_island_id:
|
||||
self.island_manager.add_island_peer(peer_island_id, peer_node_id)
|
||||
|
||||
# Add peer to hub manager if available and peer is a hub
|
||||
if self.hub_manager and peer_is_hub:
|
||||
|
||||
# Add peer to hub manager if available
|
||||
if self.hub_manager:
|
||||
from .network.hub_manager import PeerInfo
|
||||
self.hub_manager.register_peer(PeerInfo(
|
||||
node_id=peer_node_id,
|
||||
@@ -334,7 +361,7 @@ class P2PNetworkService:
|
||||
public_port=peer_public_port,
|
||||
last_seen=asyncio.get_event_loop().time()
|
||||
))
|
||||
|
||||
|
||||
# Reply with our handshake including island information
|
||||
reply_handshake = {
|
||||
'type': 'handshake',
|
||||
@@ -348,10 +375,10 @@ class P2PNetworkService:
|
||||
'public_port': self.public_endpoint[1] if self.public_endpoint else None
|
||||
}
|
||||
await self._send_message(writer, reply_handshake)
|
||||
|
||||
|
||||
# Listen for messages
|
||||
await self._listen_to_stream(reader, writer, (remote_ip, peer_listen_port), outbound=False, peer_id=peer_node_id)
|
||||
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"Timeout waiting for handshake from {addr}")
|
||||
writer.close()
|
||||
@@ -359,7 +386,8 @@ class P2PNetworkService:
|
||||
logger.error(f"Error handling inbound connection from {addr}: {e}")
|
||||
writer.close()
|
||||
|
||||
async def _listen_to_stream(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, endpoint: Tuple[str, int], outbound: bool, peer_id: str = None):
|
||||
async def _listen_to_stream(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, endpoint: Tuple[str, int],
|
||||
outbound: bool, peer_id: str = None):
|
||||
"""Read loop for an established TCP stream (both inbound and outbound)"""
|
||||
addr = endpoint
|
||||
try:
|
||||
@@ -367,35 +395,35 @@ class P2PNetworkService:
|
||||
data = await reader.readline()
|
||||
if not data:
|
||||
break # Connection closed remotely
|
||||
|
||||
|
||||
try:
|
||||
message = json.loads(data.decode().strip())
|
||||
|
||||
|
||||
msg_type = message.get('type')
|
||||
|
||||
|
||||
# If this is an outbound connection, the first message MUST be their handshake reply
|
||||
if outbound and peer_id is None:
|
||||
if msg_type == 'handshake':
|
||||
peer_id = message.get('node_id')
|
||||
peer_island_id = message.get('island_id', '')
|
||||
peer_is_hub = message.get('is_hub', False)
|
||||
|
||||
|
||||
if not peer_id or peer_id == self.node_id:
|
||||
logger.warning(f"Invalid handshake reply from {addr}. Closing.")
|
||||
break
|
||||
|
||||
|
||||
if peer_id in self.active_connections:
|
||||
logger.info(f"Already connected to node {peer_id}. Closing duplicate outbound.")
|
||||
break
|
||||
|
||||
|
||||
self.active_connections[peer_id] = writer
|
||||
|
||||
|
||||
# Add peer to island manager if available
|
||||
if self.island_manager and peer_island_id:
|
||||
self.island_manager.add_island_peer(peer_island_id, peer_id)
|
||||
|
||||
# Add peer to hub manager if available and peer is a hub
|
||||
if self.hub_manager and peer_is_hub:
|
||||
|
||||
# Add peer to hub manager if available
|
||||
if self.hub_manager:
|
||||
from .network.hub_manager import PeerInfo
|
||||
self.hub_manager.register_peer(PeerInfo(
|
||||
node_id=peer_id,
|
||||
@@ -407,23 +435,23 @@ class P2PNetworkService:
|
||||
public_port=message.get('public_port'),
|
||||
last_seen=asyncio.get_event_loop().time()
|
||||
))
|
||||
|
||||
|
||||
logger.info(f"Outbound handshake complete. Connected to node {peer_id} (island: {peer_island_id})")
|
||||
continue
|
||||
else:
|
||||
logger.warning(f"Expected handshake reply from {addr}, got {msg_type}")
|
||||
break
|
||||
|
||||
|
||||
# Normal message handling
|
||||
if msg_type == 'ping':
|
||||
logger.debug(f"Received ping from {peer_id}")
|
||||
await self._send_message(writer, {'type': 'pong', 'node_id': self.node_id})
|
||||
|
||||
|
||||
elif msg_type == 'pong':
|
||||
logger.debug(f"Received pong from {peer_id}")
|
||||
|
||||
|
||||
elif msg_type == 'handshake':
|
||||
pass # Ignore subsequent handshakes
|
||||
pass # Ignore subsequent handshakes
|
||||
elif msg_type == 'join_request':
|
||||
# Handle island join request (only if we're a hub)
|
||||
if self.hub_manager:
|
||||
@@ -463,35 +491,36 @@ class P2PNetworkService:
|
||||
if tx_data:
|
||||
try:
|
||||
tx_hash = compute_tx_hash(tx_data)
|
||||
chain_id = tx_data.get('chain_id', settings.chain_id)
|
||||
if not hasattr(self, 'seen_txs'):
|
||||
self.seen_txs = set()
|
||||
|
||||
if tx_hash not in self.seen_txs:
|
||||
|
||||
seen_key = (chain_id, tx_hash)
|
||||
if seen_key not in self.seen_txs:
|
||||
logger.info(f"Received new P2P transaction: {tx_hash}")
|
||||
self.seen_txs.add(tx_hash)
|
||||
self.seen_txs.add(seen_key)
|
||||
mempool = get_mempool()
|
||||
# Add to local mempool
|
||||
mempool.add(tx_data)
|
||||
|
||||
mempool.add(tx_data, chain_id=chain_id)
|
||||
|
||||
# Forward to other peers (Gossip)
|
||||
forward_msg = {'type': 'new_transaction', 'tx': tx_data}
|
||||
writers = list(self.active_connections.values())
|
||||
for w in writers:
|
||||
if w != writer: # Don't send back to sender
|
||||
if w != writer: # Don't send back to sender
|
||||
await self._send_message(w, forward_msg)
|
||||
except ValueError as e:
|
||||
logger.debug(f"P2P tx rejected by mempool: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"P2P tx handling error: {e}")
|
||||
|
||||
|
||||
else:
|
||||
logger.info(f"Received {msg_type} from {peer_id}: {message}")
|
||||
# In a real node, we would forward blocks/txs to the internal event bus here
|
||||
|
||||
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Invalid JSON received from {addr}")
|
||||
|
||||
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
@@ -542,7 +571,8 @@ class P2PNetworkService:
|
||||
logger.error(f"Error getting GPU specs: {e}")
|
||||
return {}
|
||||
|
||||
async def send_join_request(self, hub_address: str, hub_port: int, island_id: str, island_name: str, node_id: str, public_key_pem: str) -> Optional[dict]:
|
||||
async def send_join_request(self, hub_address: str, hub_port: int, island_id: str, island_name: str, node_id: str,
|
||||
public_key_pem: str) -> Optional[dict]:
|
||||
"""
|
||||
Send join request to a hub and wait for response
|
||||
|
||||
@@ -562,6 +592,34 @@ class P2PNetworkService:
|
||||
reader, writer = await asyncio.open_connection(hub_address, hub_port)
|
||||
logger.info(f"Connected to hub {hub_address}:{hub_port}")
|
||||
|
||||
handshake = {
|
||||
'type': 'handshake',
|
||||
'node_id': node_id,
|
||||
'listen_port': self.port,
|
||||
'island_id': island_id,
|
||||
'island_name': island_name,
|
||||
'is_hub': self.is_hub,
|
||||
'island_chain_id': self.island_chain_id,
|
||||
'public_address': self.public_endpoint[0] if self.public_endpoint else None,
|
||||
'public_port': self.public_endpoint[1] if self.public_endpoint else None,
|
||||
}
|
||||
await self._send_message(writer, handshake)
|
||||
logger.info("Sent handshake to hub")
|
||||
|
||||
data = await asyncio.wait_for(reader.readline(), timeout=10.0)
|
||||
if not data:
|
||||
logger.warning("No handshake response from hub")
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
return None
|
||||
|
||||
response = json.loads(data.decode().strip())
|
||||
if response.get('type') != 'handshake':
|
||||
logger.warning(f"Unexpected handshake response type: {response.get('type')}")
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
return None
|
||||
|
||||
# Send join request
|
||||
join_request = {
|
||||
'type': 'join_request',
|
||||
@@ -604,20 +662,32 @@ class P2PNetworkService:
|
||||
|
||||
async def run_p2p_service(host: str, port: int, node_id: str, peers: str):
|
||||
"""Run P2P service"""
|
||||
service = P2PNetworkService(host, port, node_id, peers)
|
||||
stun_servers = [server.strip() for server in settings.stun_servers.split(',') if server.strip()]
|
||||
service = P2PNetworkService(
|
||||
host,
|
||||
port,
|
||||
node_id,
|
||||
peers,
|
||||
stun_servers=stun_servers or None,
|
||||
island_id=settings.island_id,
|
||||
island_name=settings.island_name,
|
||||
is_hub=settings.is_hub,
|
||||
island_chain_id=settings.island_chain_id or settings.chain_id,
|
||||
)
|
||||
await service.start()
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description="AITBC Direct TCP P2P Mesh Network")
|
||||
parser.add_argument("--host", default="0.0.0.0", help="Bind host")
|
||||
parser.add_argument("--port", type=int, default=7070, help="Bind port")
|
||||
parser.add_argument("--node-id", required=True, help="Node identifier (required for handshake)")
|
||||
parser.add_argument("--peers", default="", help="Comma separated list of initial peers to dial (ip:port)")
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
|
||||
Reference in New Issue
Block a user