diff --git a/apps/blockchain-node/src/aitbc_chain/p2p_network.py b/apps/blockchain-node/src/aitbc_chain/p2p_network.py index 131382e1..0a781cee 100644 --- a/apps/blockchain-node/src/aitbc_chain/p2p_network.py +++ b/apps/blockchain-node/src/aitbc_chain/p2p_network.py @@ -7,6 +7,7 @@ Handles decentralized peer-to-peer mesh communication between blockchain nodes import asyncio import json import logging +from .mempool import get_mempool, compute_tx_hash from typing import Dict, Any, Optional, Set, Tuple logger = logging.getLogger(__name__) @@ -88,6 +89,48 @@ class P2PNetworkService: self._server.close() await self._server.wait_closed() + + async def _mempool_sync_loop(self): + """Periodically check local mempool and broadcast new transactions to peers""" + self.seen_txs = set() + while not self._stop_event.is_set(): + try: + mempool = get_mempool() + + # Different logic depending on if InMemory or Database + txs_to_broadcast = [] + + if hasattr(mempool, '_transactions'): # InMemoryMempool + with mempool._lock: + for tx_hash, pending_tx in mempool._transactions.items(): + if tx_hash not in self.seen_txs: + self.seen_txs.add(tx_hash) + txs_to_broadcast.append(pending_tx.content) + + elif hasattr(mempool, '_conn'): # DatabaseMempool + with mempool._lock: + cursor = mempool._conn.execute( + "SELECT tx_hash, content FROM mempool WHERE chain_id = ?", + ('ait-mainnet',) + ) + for row in cursor.fetchall(): + tx_hash = row[0] + if tx_hash not in self.seen_txs: + self.seen_txs.add(tx_hash) + import json + txs_to_broadcast.append(json.loads(row[1])) + + for tx in txs_to_broadcast: + msg = {'type': 'new_transaction', 'tx': tx} + writers = list(self.active_connections.values()) + for writer in writers: + await self._send_message(writer, msg) + + except Exception as e: + logger.error(f"Error in mempool sync loop: {e}") + + await asyncio.sleep(2) + async def _dial_peers_loop(self): """Background loop to continually try connecting to disconnected initial peers""" while not self._stop_event.is_set(): @@ -247,6 +290,32 @@ class P2PNetworkService: elif msg_type == 'handshake': pass # Ignore subsequent handshakes + elif msg_type == 'new_transaction': + tx_data = message.get('tx') + if tx_data: + try: + tx_hash = compute_tx_hash(tx_data) + if not hasattr(self, 'seen_txs'): + self.seen_txs = set() + + if tx_hash not in self.seen_txs: + logger.info(f"Received new P2P transaction: {tx_hash}") + self.seen_txs.add(tx_hash) + mempool = get_mempool() + # Add to local mempool + mempool.add(tx_data) + + # Forward to other peers (Gossip) + forward_msg = {'type': 'new_transaction', 'tx': tx_data} + writers = list(self.active_connections.values()) + for w in writers: + if w != writer: # Don't send back to sender + await self._send_message(w, forward_msg) + except ValueError as e: + logger.debug(f"P2P tx rejected by mempool: {e}") + except Exception as e: + logger.error(f"P2P tx handling error: {e}") + else: logger.info(f"Received {msg_type} from {peer_id}: {message}")