feat: add P2P transaction gossip protocol with mempool synchronization
- Added mempool sync loop to broadcast new transactions every 2 seconds - Implemented transaction deduplication using seen_txs set with tx_hash tracking - Added support for both InMemoryMempool and DatabaseMempool transaction retrieval - Added new_transaction message handler for receiving P2P transactions - Implemented gossip forwarding to propagate transactions across mesh network - Added automatic mempool.add() for received
This commit is contained in:
@@ -7,6 +7,7 @@ Handles decentralized peer-to-peer mesh communication between blockchain nodes
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from .mempool import get_mempool, compute_tx_hash
|
||||
from typing import Dict, Any, Optional, Set, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -88,6 +89,48 @@ class P2PNetworkService:
|
||||
self._server.close()
|
||||
await self._server.wait_closed()
|
||||
|
||||
|
||||
async def _mempool_sync_loop(self):
|
||||
"""Periodically check local mempool and broadcast new transactions to peers"""
|
||||
self.seen_txs = set()
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
mempool = get_mempool()
|
||||
|
||||
# Different logic depending on if InMemory or Database
|
||||
txs_to_broadcast = []
|
||||
|
||||
if hasattr(mempool, '_transactions'): # InMemoryMempool
|
||||
with mempool._lock:
|
||||
for tx_hash, pending_tx in mempool._transactions.items():
|
||||
if tx_hash not in self.seen_txs:
|
||||
self.seen_txs.add(tx_hash)
|
||||
txs_to_broadcast.append(pending_tx.content)
|
||||
|
||||
elif hasattr(mempool, '_conn'): # DatabaseMempool
|
||||
with mempool._lock:
|
||||
cursor = mempool._conn.execute(
|
||||
"SELECT tx_hash, content FROM mempool WHERE chain_id = ?",
|
||||
('ait-mainnet',)
|
||||
)
|
||||
for row in cursor.fetchall():
|
||||
tx_hash = row[0]
|
||||
if tx_hash not in self.seen_txs:
|
||||
self.seen_txs.add(tx_hash)
|
||||
import json
|
||||
txs_to_broadcast.append(json.loads(row[1]))
|
||||
|
||||
for tx in txs_to_broadcast:
|
||||
msg = {'type': 'new_transaction', 'tx': tx}
|
||||
writers = list(self.active_connections.values())
|
||||
for writer in writers:
|
||||
await self._send_message(writer, msg)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in mempool sync loop: {e}")
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
async def _dial_peers_loop(self):
|
||||
"""Background loop to continually try connecting to disconnected initial peers"""
|
||||
while not self._stop_event.is_set():
|
||||
@@ -247,6 +290,32 @@ class P2PNetworkService:
|
||||
|
||||
elif msg_type == 'handshake':
|
||||
pass # Ignore subsequent handshakes
|
||||
elif msg_type == 'new_transaction':
|
||||
tx_data = message.get('tx')
|
||||
if tx_data:
|
||||
try:
|
||||
tx_hash = compute_tx_hash(tx_data)
|
||||
if not hasattr(self, 'seen_txs'):
|
||||
self.seen_txs = set()
|
||||
|
||||
if tx_hash not in self.seen_txs:
|
||||
logger.info(f"Received new P2P transaction: {tx_hash}")
|
||||
self.seen_txs.add(tx_hash)
|
||||
mempool = get_mempool()
|
||||
# Add to local mempool
|
||||
mempool.add(tx_data)
|
||||
|
||||
# Forward to other peers (Gossip)
|
||||
forward_msg = {'type': 'new_transaction', 'tx': tx_data}
|
||||
writers = list(self.active_connections.values())
|
||||
for w in writers:
|
||||
if w != writer: # Don't send back to sender
|
||||
await self._send_message(w, forward_msg)
|
||||
except ValueError as e:
|
||||
logger.debug(f"P2P tx rejected by mempool: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"P2P tx handling error: {e}")
|
||||
|
||||
|
||||
else:
|
||||
logger.info(f"Received {msg_type} from {peer_id}: {message}")
|
||||
|
||||
Reference in New Issue
Block a user