From 5f2ab48b9a3032936b239155c8e9c95721e3247a Mon Sep 17 00:00:00 2001 From: aitbc1 Date: Thu, 19 Mar 2026 12:49:08 +0100 Subject: [PATCH] Shared Chain Implementation --- .../src/aitbc_chain/chain_sync.py | 180 ++++++++++++++++++ .../src/aitbc_chain/p2p_network.py | 106 +++++++++++ 2 files changed, 286 insertions(+) create mode 100644 apps/blockchain-node/src/aitbc_chain/chain_sync.py create mode 100644 apps/blockchain-node/src/aitbc_chain/p2p_network.py diff --git a/apps/blockchain-node/src/aitbc_chain/chain_sync.py b/apps/blockchain-node/src/aitbc_chain/chain_sync.py new file mode 100644 index 00000000..a59b1b44 --- /dev/null +++ b/apps/blockchain-node/src/aitbc_chain/chain_sync.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +""" +Chain Synchronization Service +Keeps blockchain nodes synchronized by sharing blocks via P2P and Redis gossip +""" + +import asyncio +import json +import logging +import time +from typing import Dict, Any, Optional, List + +logger = logging.getLogger(__name__) + +class ChainSyncService: + def __init__(self, redis_url: str, node_id: str, rpc_port: int = 8006): + self.redis_url = redis_url + self.node_id = node_id + self.rpc_port = rpc_port + self._stop_event = asyncio.Event() + self._redis = None + + async def start(self): + """Start chain synchronization service""" + logger.info(f"Starting chain sync service for node {self.node_id}") + + try: + import redis.asyncio as redis + self._redis = redis.from_url(self.redis_url) + await self._redis.ping() + logger.info("Connected to Redis for chain sync") + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + return + + # Start block broadcasting task + broadcast_task = asyncio.create_task(self._broadcast_blocks()) + + # Start block receiving task + receive_task = asyncio.create_task(self._receive_blocks()) + + try: + await self._stop_event.wait() + finally: + broadcast_task.cancel() + receive_task.cancel() + await asyncio.gather(broadcast_task, receive_task, return_exceptions=True) + + if self._redis: + await self._redis.close() + + async def stop(self): + """Stop chain synchronization service""" + logger.info("Stopping chain sync service") + self._stop_event.set() + + async def _broadcast_blocks(self): + """Broadcast local blocks to other nodes""" + import aiohttp + + last_broadcast_height = 0 + + while not self._stop_event.is_set(): + try: + # Get current head from local RPC + async with aiohttp.ClientSession() as session: + async with session.get(f"http://127.0.0.1:{self.rpc_port}/rpc/head") as resp: + if resp.status == 200: + head_data = await resp.json() + current_height = head_data.get('height', 0) + + # Broadcast new blocks + if current_height > last_broadcast_height: + for height in range(last_broadcast_height + 1, current_height + 1): + block_data = await self._get_block_by_height(height, session) + if block_data: + await self._broadcast_block(block_data) + + last_broadcast_height = current_height + logger.info(f"Broadcasted blocks up to height {current_height}") + + except Exception as e: + logger.error(f"Error in block broadcast: {e}") + + await asyncio.sleep(2) # Check every 2 seconds + + async def _receive_blocks(self): + """Receive blocks from other nodes via Redis""" + if not self._redis: + return + + pubsub = self._redis.pubsub() + await pubsub.subscribe("blocks") + + logger.info("Subscribed to block broadcasts") + + async for message in pubsub.listen(): + if self._stop_event.is_set(): + break + + if message['type'] == 'message': + try: + block_data = json.loads(message['data']) + await self._import_block(block_data) + except Exception as e: + logger.error(f"Error processing received block: {e}") + + async def _get_block_by_height(self, height: int, session) -> Optional[Dict[str, Any]]: + """Get block data by height from local RPC""" + try: + async with session.get(f"http://127.0.0.1:{self.rpc_port}/rpc/blocks?start={height}&end={height}") as resp: + if resp.status == 200: + blocks_data = await resp.json() + blocks = blocks_data.get('blocks', []) + return blocks[0] if blocks else None + except Exception as e: + logger.error(f"Error getting block {height}: {e}") + return None + + async def _broadcast_block(self, block_data: Dict[str, Any]): + """Broadcast block to other nodes via Redis""" + if not self._redis: + return + + try: + await self._redis.publish("blocks", json.dumps(block_data)) + logger.debug(f"Broadcasted block {block_data.get('height')}") + except Exception as e: + logger.error(f"Error broadcasting block: {e}") + + async def _import_block(self, block_data: Dict[str, Any]): + """Import block from another node""" + import aiohttp + + try: + # Don't import our own blocks + if block_data.get('proposer') == self.node_id: + return + + async with aiohttp.ClientSession() as session: + async with session.post( + f"http://127.0.0.1:{self.rpc_port}/rpc/importBlock", + json=block_data + ) as resp: + if resp.status == 200: + result = await resp.json() + if result.get('accepted'): + logger.info(f"Imported block {block_data.get('height')} from {block_data.get('proposer')}") + else: + logger.debug(f"Rejected block {block_data.get('height')}: {result.get('reason')}") + else: + logger.warning(f"Failed to import block: {resp.status}") + + except Exception as e: + logger.error(f"Error importing block: {e}") + +async def run_chain_sync(redis_url: str, node_id: str, rpc_port: int = 8006): + """Run chain synchronization service""" + service = ChainSyncService(redis_url, node_id, rpc_port) + await service.start() + +def main(): + import argparse + + parser = argparse.ArgumentParser(description="AITBC Chain Synchronization Service") + parser.add_argument("--redis", default="redis://localhost:6379", help="Redis URL") + parser.add_argument("--node-id", required=True, help="Node identifier") + parser.add_argument("--rpc-port", type=int, default=8006, help="RPC port") + + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO) + + try: + asyncio.run(run_chain_sync(args.redis, args.node_id, args.rpc_port)) + except KeyboardInterrupt: + logger.info("Chain sync service stopped by user") + +if __name__ == "__main__": + main() diff --git a/apps/blockchain-node/src/aitbc_chain/p2p_network.py b/apps/blockchain-node/src/aitbc_chain/p2p_network.py new file mode 100644 index 00000000..8754ce72 --- /dev/null +++ b/apps/blockchain-node/src/aitbc_chain/p2p_network.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +""" +P2P Network Service using Redis Gossip +Handles peer-to-peer communication between blockchain nodes +""" + +import asyncio +import json +import logging +import socket +from typing import Dict, Any, Optional + +logger = logging.getLogger(__name__) + +class P2PNetworkService: + def __init__(self, host: str, port: int, redis_url: str, node_id: str): + self.host = host + self.port = port + self.redis_url = redis_url + self.node_id = node_id + self._server = None + self._stop_event = asyncio.Event() + + async def start(self): + """Start P2P network service""" + logger.info(f"Starting P2P network service on {self.host}:{self.port}") + + # Create TCP server for P2P connections + self._server = await asyncio.start_server( + self._handle_connection, + self.host, + self.port + ) + + logger.info(f"P2P service listening on {self.host}:{self.port}") + + try: + await self._stop_event.wait() + finally: + await self.stop() + + async def stop(self): + """Stop P2P network service""" + logger.info("Stopping P2P network service") + if self._server: + self._server.close() + await self._server.wait_closed() + + async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + """Handle incoming P2P connections""" + addr = writer.get_extra_info('peername') + logger.info(f"P2P connection from {addr}") + + try: + while True: + data = await reader.read(1024) + if not data: + break + + try: + message = json.loads(data.decode()) + logger.info(f"P2P received: {message}") + + # Handle different message types + if message.get('type') == 'ping': + response = {'type': 'pong', 'node_id': self.node_id} + writer.write(json.dumps(response).encode() + b'\n') + await writer.drain() + + except json.JSONDecodeError: + logger.warning(f"Invalid JSON from {addr}") + + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"P2P connection error: {e}") + finally: + writer.close() + await writer.wait_closed() + logger.info(f"P2P connection closed from {addr}") + +async def run_p2p_service(host: str, port: int, redis_url: str, node_id: str): + """Run P2P service""" + service = P2PNetworkService(host, port, redis_url, node_id) + await service.start() + +def main(): + import argparse + + parser = argparse.ArgumentParser(description="AITBC P2P Network Service") + parser.add_argument("--host", default="0.0.0.0", help="Bind host") + parser.add_argument("--port", type=int, default=8005, help="Bind port") + parser.add_argument("--redis", default="redis://localhost:6379", help="Redis URL") + parser.add_argument("--node-id", help="Node identifier") + + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO) + + try: + asyncio.run(run_p2p_service(args.host, args.port, args.redis, args.node_id)) + except KeyboardInterrupt: + logger.info("P2P service stopped by user") + +if __name__ == "__main__": + main()