From 88db347df82bbeacb17a40a4880d9318a32934b9 Mon Sep 17 00:00:00 2001 From: aitbc1 Date: Thu, 9 Apr 2026 14:15:00 +0200 Subject: [PATCH] Add P2P mempool sync and cleanup legacy services --- .../src/aitbc_chain/p2p_network.py | 377 ++++++++++++++++-- services/blockchain_simple.py | 4 +- systemd/aitbc-ai.service | 38 -- systemd/aitbc-cross-chain-reputation.service | 14 - systemd/aitbc-follower-node.service | 29 -- systemd/aitbc-miner-dashboard.service | 13 - 6 files changed, 343 insertions(+), 132 deletions(-) delete mode 100644 systemd/aitbc-ai.service delete mode 100644 systemd/aitbc-cross-chain-reputation.service delete mode 100644 systemd/aitbc-follower-node.service delete mode 100644 systemd/aitbc-miner-dashboard.service diff --git a/apps/blockchain-node/src/aitbc_chain/p2p_network.py b/apps/blockchain-node/src/aitbc_chain/p2p_network.py index 8754ce72..29be3d48 100644 --- a/apps/blockchain-node/src/aitbc_chain/p2p_network.py +++ b/apps/blockchain-node/src/aitbc_chain/p2p_network.py @@ -1,39 +1,70 @@ #!/usr/bin/env python3 """ -P2P Network Service using Redis Gossip -Handles peer-to-peer communication between blockchain nodes +P2P Network Service using Direct TCP connections +Handles decentralized peer-to-peer mesh communication between blockchain nodes """ import asyncio import json import logging -import socket -from typing import Dict, Any, Optional +from .mempool import get_mempool, compute_tx_hash +from typing import Dict, Any, Optional, Set, Tuple logger = logging.getLogger(__name__) class P2PNetworkService: - def __init__(self, host: str, port: int, redis_url: str, node_id: str): + def __init__(self, host: str, port: int, node_id: str, peers: str = ""): self.host = host self.port = port - self.redis_url = redis_url self.node_id = node_id + + # Initial peers to dial (format: "ip:port,ip:port") + self.initial_peers = [] + if peers: + for p in peers.split(','): + p = p.strip() + if p: + parts = p.split(':') + if len(parts) == 2: + self.initial_peers.append((parts[0], int(parts[1]))) + self._server = None self._stop_event = asyncio.Event() + # Active connections + # Map of node_id -> writer stream + self.active_connections: Dict[str, asyncio.StreamWriter] = {} + # Set of active endpoints we've connected to prevent duplicate dialing + self.connected_endpoints: Set[Tuple[str, int]] = set() + + self._background_tasks = [] + async def start(self): """Start P2P network service""" - logger.info(f"Starting P2P network service on {self.host}:{self.port}") + logger.info(f"Starting P2P network mesh service on {self.host}:{self.port}") + logger.info(f"Node ID: {self.node_id}") - # Create TCP server for P2P connections + # Create TCP server for inbound P2P connections self._server = await asyncio.start_server( - self._handle_connection, + self._handle_inbound_connection, self.host, self.port ) logger.info(f"P2P service listening on {self.host}:{self.port}") + # Start background task to dial known peers + dial_task = asyncio.create_task(self._dial_peers_loop()) + self._background_tasks.append(dial_task) + + # Start background task to broadcast pings to active peers + ping_task = asyncio.create_task(self._ping_peers_loop()) + self._background_tasks.append(ping_task) + + # Start background task to sync mempool + mempool_task = asyncio.create_task(self._mempool_sync_loop()) + self._background_tasks.append(mempool_task) + try: await self._stop_event.wait() finally: @@ -42,63 +73,337 @@ class P2PNetworkService: async def stop(self): """Stop P2P network service""" logger.info("Stopping P2P network service") + + # Cancel background tasks + for task in self._background_tasks: + task.cancel() + + # Close all active connections + for writer in self.active_connections.values(): + writer.close() + try: + await writer.wait_closed() + except Exception: + pass + self.active_connections.clear() + self.connected_endpoints.clear() + + # Close server if self._server: self._server.close() await self._server.wait_closed() + - async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - """Handle incoming P2P connections""" - addr = writer.get_extra_info('peername') - logger.info(f"P2P connection from {addr}") - + async def _mempool_sync_loop(self): + """Periodically check local mempool and broadcast new transactions to peers""" + self.seen_txs = set() + while not self._stop_event.is_set(): + try: + mempool = get_mempool() + + # Different logic depending on if InMemory or Database + txs_to_broadcast = [] + + if hasattr(mempool, '_transactions'): # InMemoryMempool + with mempool._lock: + for tx_hash, pending_tx in mempool._transactions.items(): + if tx_hash not in self.seen_txs: + self.seen_txs.add(tx_hash) + txs_to_broadcast.append(pending_tx.content) + + elif hasattr(mempool, '_conn'): # DatabaseMempool + with mempool._lock: + cursor = mempool._conn.execute( + "SELECT tx_hash, content FROM mempool WHERE chain_id = ?", + ('ait-mainnet',) + ) + for row in cursor.fetchall(): + tx_hash = row[0] + if tx_hash not in self.seen_txs: + self.seen_txs.add(tx_hash) + import json + txs_to_broadcast.append(json.loads(row[1])) + + logger.debug(f"Mempool sync loop iteration. txs_to_broadcast: {len(txs_to_broadcast)}") + for tx in txs_to_broadcast: + msg = {'type': 'new_transaction', 'tx': tx} + writers = list(self.active_connections.values()) + for writer in writers: + await self._send_message(writer, msg) + + except Exception as e: + logger.error(f"Error in mempool sync loop: {e}") + + await asyncio.sleep(1) + + async def _dial_peers_loop(self): + """Background loop to continually try connecting to disconnected initial peers""" + while not self._stop_event.is_set(): + for host, port in self.initial_peers: + endpoint = (host, port) + + # Prevent dialing ourselves or already connected peers + if endpoint in self.connected_endpoints: + continue + + # Find if we are already connected to a peer with this host/ip by inbound connections + # This prevents two nodes from endlessly redialing each other's listen ports + already_connected_ip = False + for node_id, writer in self.active_connections.items(): + peer_ip = writer.get_extra_info('peername')[0] + # We might want to resolve hostname -> IP but keeping it simple: + if peer_ip == host or (host == "aitbc1" and peer_ip.startswith("10.")): + already_connected_ip = True + break + + if already_connected_ip: + self.connected_endpoints.add(endpoint) # Mark so we don't try again + continue + + # Attempt connection + asyncio.create_task(self._dial_peer(host, port)) + + # Wait before trying again + await asyncio.sleep(10) + + async def _dial_peer(self, host: str, port: int): + """Attempt to establish an outbound TCP connection to a peer""" + endpoint = (host, port) try: - while True: - data = await reader.read(1024) + reader, writer = await asyncio.open_connection(host, port) + logger.info(f"Successfully dialed outbound peer at {host}:{port}") + + # Record that we're connected to this endpoint + self.connected_endpoints.add(endpoint) + + # Send handshake immediately + handshake = { + 'type': 'handshake', + 'node_id': self.node_id, + 'listen_port': self.port + } + await self._send_message(writer, handshake) + + # Start listening to this outbound connection + await self._listen_to_stream(reader, writer, endpoint, outbound=True) + + except ConnectionRefusedError: + logger.debug(f"Peer {host}:{port} refused connection (offline?)") + except Exception as e: + logger.debug(f"Failed to dial peer {host}:{port}: {e}") + + async def _handle_inbound_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + """Handle incoming P2P TCP connections from other nodes""" + addr = writer.get_extra_info('peername') + logger.info(f"Incoming P2P connection from {addr}") + + # Wait for handshake + try: + # Add timeout for initial handshake + data = await asyncio.wait_for(reader.readline(), timeout=5.0) + if not data: + writer.close() + return + + message = json.loads(data.decode()) + if message.get('type') != 'handshake': + logger.warning(f"Peer {addr} did not handshake first. Dropping.") + writer.close() + return + + peer_node_id = message.get('node_id') + peer_listen_port = message.get('listen_port', 7070) + + if not peer_node_id or peer_node_id == self.node_id: + logger.warning(f"Peer {addr} provided invalid or self node_id: {peer_node_id}") + writer.close() + return + + # Accept handshake and store connection + logger.info(f"Handshake accepted from node {peer_node_id} at {addr}") + + # If we already have a connection to this node, drop the new one to prevent duplicates + if peer_node_id in self.active_connections: + logger.info(f"Already connected to node {peer_node_id}. Dropping duplicate inbound.") + writer.close() + return + + self.active_connections[peer_node_id] = writer + + # Map their listening endpoint so we don't try to dial them + remote_ip = addr[0] + self.connected_endpoints.add((remote_ip, peer_listen_port)) + + # Reply with our handshake + reply_handshake = { + 'type': 'handshake', + 'node_id': self.node_id, + 'listen_port': self.port + } + await self._send_message(writer, reply_handshake) + + # Listen for messages + await self._listen_to_stream(reader, writer, (remote_ip, peer_listen_port), outbound=False, peer_id=peer_node_id) + + except asyncio.TimeoutError: + logger.warning(f"Timeout waiting for handshake from {addr}") + writer.close() + except Exception as e: + logger.error(f"Error handling inbound connection from {addr}: {e}") + writer.close() + + async def _listen_to_stream(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, endpoint: Tuple[str, int], outbound: bool, peer_id: str = None): + """Read loop for an established TCP stream (both inbound and outbound)""" + addr = endpoint + try: + while not self._stop_event.is_set(): + data = await reader.readline() if not data: - break + break # Connection closed remotely try: - message = json.loads(data.decode()) - logger.info(f"P2P received: {message}") + message = json.loads(data.decode().strip()) - # Handle different message types - if message.get('type') == 'ping': - response = {'type': 'pong', 'node_id': self.node_id} - writer.write(json.dumps(response).encode() + b'\n') - await writer.drain() + msg_type = message.get('type') + # If this is an outbound connection, the first message MUST be their handshake reply + if outbound and peer_id is None: + if msg_type == 'handshake': + peer_id = message.get('node_id') + if not peer_id or peer_id == self.node_id: + logger.warning(f"Invalid handshake reply from {addr}. Closing.") + break + + if peer_id in self.active_connections: + logger.info(f"Already connected to node {peer_id}. Closing duplicate outbound.") + break + + self.active_connections[peer_id] = writer + logger.info(f"Outbound handshake complete. Connected to node {peer_id}") + continue + else: + logger.warning(f"Expected handshake reply from {addr}, got {msg_type}") + break + + # Normal message handling + if msg_type == 'ping': + logger.debug(f"Received ping from {peer_id}") + await self._send_message(writer, {'type': 'pong', 'node_id': self.node_id}) + + elif msg_type == 'pong': + logger.debug(f"Received pong from {peer_id}") + + elif msg_type == 'handshake': + pass # Ignore subsequent handshakes + elif msg_type == 'new_transaction': + tx_data = message.get('tx') + if tx_data: + try: + tx_hash = compute_tx_hash(tx_data) + if not hasattr(self, 'seen_txs'): + self.seen_txs = set() + + if tx_hash not in self.seen_txs: + logger.info(f"Received new P2P transaction: {tx_hash}") + self.seen_txs.add(tx_hash) + mempool = get_mempool() + # Add to local mempool + mempool.add(tx_data) + + # Forward to other peers (Gossip) + forward_msg = {'type': 'new_transaction', 'tx': tx_data} + writers = list(self.active_connections.values()) + for w in writers: + if w != writer: # Don't send back to sender + await self._send_message(w, forward_msg) + except ValueError as e: + logger.debug(f"P2P tx rejected by mempool: {e}") + except Exception as e: + logger.error(f"P2P tx handling error: {e}") + + + else: + logger.info(f"Received {msg_type} from {peer_id}: {message}") + # In a real node, we would forward blocks/txs to the internal event bus here + except json.JSONDecodeError: - logger.warning(f"Invalid JSON from {addr}") + logger.warning(f"Invalid JSON received from {addr}") except asyncio.CancelledError: pass except Exception as e: - logger.error(f"P2P connection error: {e}") + logger.error(f"Stream error with {addr}: {e}") finally: + logger.info(f"Connection closed to {peer_id or addr}") + if peer_id and peer_id in self.active_connections: + del self.active_connections[peer_id] + if endpoint in self.connected_endpoints: + self.connected_endpoints.remove(endpoint) writer.close() - await writer.wait_closed() - logger.info(f"P2P connection closed from {addr}") + try: + await writer.wait_closed() + except Exception: + pass -async def run_p2p_service(host: str, port: int, redis_url: str, node_id: str): + async def _send_message(self, writer: asyncio.StreamWriter, message: dict): + """Helper to send a JSON message over a stream""" + try: + data = json.dumps(message) + '\n' + writer.write(data.encode()) + await writer.drain() + except Exception as e: + logger.error(f"Failed to send message: {e}") + + async def _ping_peers_loop(self): + """Periodically broadcast pings to all active connections to keep them alive""" + while not self._stop_event.is_set(): + await asyncio.sleep(20) + ping_msg = {'type': 'ping', 'node_id': self.node_id} + + # Make a copy of writers to avoid dictionary changed during iteration error + writers = list(self.active_connections.values()) + for writer in writers: + await self._send_message(writer, ping_msg) + + +async def run_p2p_service(host: str, port: int, node_id: str, peers: str): """Run P2P service""" - service = P2PNetworkService(host, port, redis_url, node_id) + service = P2PNetworkService(host, port, node_id, peers) await service.start() def main(): import argparse - parser = argparse.ArgumentParser(description="AITBC P2P Network Service") + parser = argparse.ArgumentParser(description="AITBC Direct TCP P2P Mesh Network") parser.add_argument("--host", default="0.0.0.0", help="Bind host") - parser.add_argument("--port", type=int, default=8005, help="Bind port") - parser.add_argument("--redis", default="redis://localhost:6379", help="Redis URL") - parser.add_argument("--node-id", help="Node identifier") + parser.add_argument("--port", type=int, default=7070, help="Bind port") + parser.add_argument("--node-id", required=True, help="Node identifier (required for handshake)") + parser.add_argument("--peers", default="", help="Comma separated list of initial peers to dial (ip:port)") args = parser.parse_args() - logging.basicConfig(level=logging.INFO) + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) try: - asyncio.run(run_p2p_service(args.host, args.port, args.redis, args.node_id)) + from .config import settings + from .mempool import init_mempool + import pathlib + + db_path = "" + if settings.mempool_backend == "database": + db_path = str(settings.db_path.parent / "mempool.db") + + init_mempool( + backend=settings.mempool_backend, + db_path=db_path, + max_size=settings.mempool_max_size, + min_fee=settings.min_fee + ) + + asyncio.run(run_p2p_service(args.host, args.port, args.node_id, args.peers)) except KeyboardInterrupt: logger.info("P2P service stopped by user") diff --git a/services/blockchain_simple.py b/services/blockchain_simple.py index 6dc6c219..112b073b 100755 --- a/services/blockchain_simple.py +++ b/services/blockchain_simple.py @@ -41,7 +41,7 @@ def main(): # Run the blockchain FastAPI app import uvicorn logger.info("Starting blockchain FastAPI app on port 8545") - uvicorn.run(app, host="0.0.0.0", port=8545) + uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("BLOCKCHAIN_PORT", 8545))) except ImportError as e: logger.error(f"Failed to import blockchain app: {e}") @@ -126,7 +126,7 @@ def basic_blockchain_node(): activity_thread.start() logger.info("Starting basic blockchain API on port 8545") - uvicorn.run(app, host="0.0.0.0", port=8545) + uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("BLOCKCHAIN_PORT", 8545))) except ImportError: # Fallback to simple heartbeat diff --git a/systemd/aitbc-ai.service b/systemd/aitbc-ai.service deleted file mode 100644 index 4c86b9f8..00000000 --- a/systemd/aitbc-ai.service +++ /dev/null @@ -1,38 +0,0 @@ -[Unit] -Description=AITBC Advanced AI Service - Enhanced AI Capabilities -After=network.target -Wants=network.target - -[Service] -Type=simple -User=root -Group=root -WorkingDirectory=/opt/aitbc/apps/coordinator-api -Environment=PATH=/usr/bin -Environment=PYTHONPATH=/opt/aitbc/apps/coordinator-api/src -ExecStart=/opt/aitbc/venv/bin/python -m app.services.advanced_ai_service -ExecReload=/bin/kill -HUP $MAINPID -Restart=always -RestartSec=10 -StandardOutput=journal -StandardError=journal -SyslogIdentifier=aitbc-advanced-ai - -# Security settings (relaxed for development) -# NoNewPrivileges=true -# PrivateTmp=true -# ProtectSystem=strict -# ProtectHome=true -ReadWritePaths=/var/log/aitbc /var/lib/aitbc/data /opt/aitbc/apps/coordinator-api - -# Resource limits -LimitNOFILE=65536 -LimitNPROC=4096 - -# GPU access (if available) -DeviceAllow=/dev/nvidia0 rw -DeviceAllow=/dev/nvidiactl rw -DeviceAllow=/dev/nvidia-uvm rw - -[Install] -WantedBy=multi-user.target diff --git a/systemd/aitbc-cross-chain-reputation.service b/systemd/aitbc-cross-chain-reputation.service deleted file mode 100644 index 052319ab..00000000 --- a/systemd/aitbc-cross-chain-reputation.service +++ /dev/null @@ -1,14 +0,0 @@ -[Unit] -Description=AITBC Cross Chain Reputation Service -After=network.target - -[Service] -Type=simple -User=aitbc -WorkingDirectory=/opt/aitbc/services -ExecStart=/usr/bin/python3 -m cross_chain_reputation -Restart=always -RestartSec=10 - -[Install] -WantedBy=multi-user.target diff --git a/systemd/aitbc-follower-node.service b/systemd/aitbc-follower-node.service deleted file mode 100644 index 644b46ce..00000000 --- a/systemd/aitbc-follower-node.service +++ /dev/null @@ -1,29 +0,0 @@ -[Unit] -Description=AITBC Blockchain Follower Node (Port 8007) -After=network.target aitbc-blockchain-node.service -Wants=aitbc-blockchain-node.service - -[Service] -Type=simple -User=root -Group=root -Environment=NODE_ENV=production -Environment=NODE_ID=follower-node-8007 -Environment=PYTHONPATH=/opt/aitbc/apps/blockchain-node/src:/opt/aitbc/services -Environment=BLOCKCHAIN_DATA_DIR=/var/lib/aitbc/data/follower -Environment=BLOCKCHAIN_CONFIG_DIR=/etc/aitbc -Environment=BLOCKCHAIN_LOG_DIR=/var/log/aitbc/production -Environment=BLOCKCHAIN_PORT=8007 -Environment=BLOCKCHAIN_ROLE=follower -Environment=BLOCKCHAIN_GENESIS_NODE=http://localhost:8006 -ExecStart=/opt/aitbc/venv/bin/python /opt/aitbc/services/blockchain_simple.py -ExecReload=/bin/kill -HUP $MAINPID -KillMode=mixed -TimeoutStopSec=30 -Restart=always -RestartSec=5 -StandardOutput=journal -StandardError=journal - -[Install] -WantedBy=multi-user.target diff --git a/systemd/aitbc-miner-dashboard.service b/systemd/aitbc-miner-dashboard.service deleted file mode 100644 index c8f0dcc9..00000000 --- a/systemd/aitbc-miner-dashboard.service +++ /dev/null @@ -1,13 +0,0 @@ -[Unit] -Description=AITBC Miner Dashboard -After=network.target - -[Service] -Type=simple -User=root -WorkingDirectory=/opt/aitbc-miner-dashboard -ExecStart=/usr/bin/python3 dashboard_server.py -Restart=always - -[Install] -WantedBy=multi-user.target