Shared Chain Implementation
This commit is contained in:
180
apps/blockchain-node/src/aitbc_chain/chain_sync.py
Normal file
180
apps/blockchain-node/src/aitbc_chain/chain_sync.py
Normal file
@@ -0,0 +1,180 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Chain Synchronization Service
|
||||||
|
Keeps blockchain nodes synchronized by sharing blocks via P2P and Redis gossip
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from typing import Dict, Any, Optional, List
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class ChainSyncService:
|
||||||
|
def __init__(self, redis_url: str, node_id: str, rpc_port: int = 8006):
|
||||||
|
self.redis_url = redis_url
|
||||||
|
self.node_id = node_id
|
||||||
|
self.rpc_port = rpc_port
|
||||||
|
self._stop_event = asyncio.Event()
|
||||||
|
self._redis = None
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
"""Start chain synchronization service"""
|
||||||
|
logger.info(f"Starting chain sync service for node {self.node_id}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
import redis.asyncio as redis
|
||||||
|
self._redis = redis.from_url(self.redis_url)
|
||||||
|
await self._redis.ping()
|
||||||
|
logger.info("Connected to Redis for chain sync")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to connect to Redis: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Start block broadcasting task
|
||||||
|
broadcast_task = asyncio.create_task(self._broadcast_blocks())
|
||||||
|
|
||||||
|
# Start block receiving task
|
||||||
|
receive_task = asyncio.create_task(self._receive_blocks())
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._stop_event.wait()
|
||||||
|
finally:
|
||||||
|
broadcast_task.cancel()
|
||||||
|
receive_task.cancel()
|
||||||
|
await asyncio.gather(broadcast_task, receive_task, return_exceptions=True)
|
||||||
|
|
||||||
|
if self._redis:
|
||||||
|
await self._redis.close()
|
||||||
|
|
||||||
|
async def stop(self):
|
||||||
|
"""Stop chain synchronization service"""
|
||||||
|
logger.info("Stopping chain sync service")
|
||||||
|
self._stop_event.set()
|
||||||
|
|
||||||
|
async def _broadcast_blocks(self):
|
||||||
|
"""Broadcast local blocks to other nodes"""
|
||||||
|
import aiohttp
|
||||||
|
|
||||||
|
last_broadcast_height = 0
|
||||||
|
|
||||||
|
while not self._stop_event.is_set():
|
||||||
|
try:
|
||||||
|
# Get current head from local RPC
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
async with session.get(f"http://127.0.0.1:{self.rpc_port}/rpc/head") as resp:
|
||||||
|
if resp.status == 200:
|
||||||
|
head_data = await resp.json()
|
||||||
|
current_height = head_data.get('height', 0)
|
||||||
|
|
||||||
|
# Broadcast new blocks
|
||||||
|
if current_height > last_broadcast_height:
|
||||||
|
for height in range(last_broadcast_height + 1, current_height + 1):
|
||||||
|
block_data = await self._get_block_by_height(height, session)
|
||||||
|
if block_data:
|
||||||
|
await self._broadcast_block(block_data)
|
||||||
|
|
||||||
|
last_broadcast_height = current_height
|
||||||
|
logger.info(f"Broadcasted blocks up to height {current_height}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in block broadcast: {e}")
|
||||||
|
|
||||||
|
await asyncio.sleep(2) # Check every 2 seconds
|
||||||
|
|
||||||
|
async def _receive_blocks(self):
|
||||||
|
"""Receive blocks from other nodes via Redis"""
|
||||||
|
if not self._redis:
|
||||||
|
return
|
||||||
|
|
||||||
|
pubsub = self._redis.pubsub()
|
||||||
|
await pubsub.subscribe("blocks")
|
||||||
|
|
||||||
|
logger.info("Subscribed to block broadcasts")
|
||||||
|
|
||||||
|
async for message in pubsub.listen():
|
||||||
|
if self._stop_event.is_set():
|
||||||
|
break
|
||||||
|
|
||||||
|
if message['type'] == 'message':
|
||||||
|
try:
|
||||||
|
block_data = json.loads(message['data'])
|
||||||
|
await self._import_block(block_data)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing received block: {e}")
|
||||||
|
|
||||||
|
async def _get_block_by_height(self, height: int, session) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Get block data by height from local RPC"""
|
||||||
|
try:
|
||||||
|
async with session.get(f"http://127.0.0.1:{self.rpc_port}/rpc/blocks?start={height}&end={height}") as resp:
|
||||||
|
if resp.status == 200:
|
||||||
|
blocks_data = await resp.json()
|
||||||
|
blocks = blocks_data.get('blocks', [])
|
||||||
|
return blocks[0] if blocks else None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting block {height}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _broadcast_block(self, block_data: Dict[str, Any]):
|
||||||
|
"""Broadcast block to other nodes via Redis"""
|
||||||
|
if not self._redis:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._redis.publish("blocks", json.dumps(block_data))
|
||||||
|
logger.debug(f"Broadcasted block {block_data.get('height')}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error broadcasting block: {e}")
|
||||||
|
|
||||||
|
async def _import_block(self, block_data: Dict[str, Any]):
|
||||||
|
"""Import block from another node"""
|
||||||
|
import aiohttp
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Don't import our own blocks
|
||||||
|
if block_data.get('proposer') == self.node_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
async with session.post(
|
||||||
|
f"http://127.0.0.1:{self.rpc_port}/rpc/importBlock",
|
||||||
|
json=block_data
|
||||||
|
) as resp:
|
||||||
|
if resp.status == 200:
|
||||||
|
result = await resp.json()
|
||||||
|
if result.get('accepted'):
|
||||||
|
logger.info(f"Imported block {block_data.get('height')} from {block_data.get('proposer')}")
|
||||||
|
else:
|
||||||
|
logger.debug(f"Rejected block {block_data.get('height')}: {result.get('reason')}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to import block: {resp.status}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error importing block: {e}")
|
||||||
|
|
||||||
|
async def run_chain_sync(redis_url: str, node_id: str, rpc_port: int = 8006):
|
||||||
|
"""Run chain synchronization service"""
|
||||||
|
service = ChainSyncService(redis_url, node_id, rpc_port)
|
||||||
|
await service.start()
|
||||||
|
|
||||||
|
def main():
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="AITBC Chain Synchronization Service")
|
||||||
|
parser.add_argument("--redis", default="redis://localhost:6379", help="Redis URL")
|
||||||
|
parser.add_argument("--node-id", required=True, help="Node identifier")
|
||||||
|
parser.add_argument("--rpc-port", type=int, default=8006, help="RPC port")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(run_chain_sync(args.redis, args.node_id, args.rpc_port))
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("Chain sync service stopped by user")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
106
apps/blockchain-node/src/aitbc_chain/p2p_network.py
Normal file
106
apps/blockchain-node/src/aitbc_chain/p2p_network.py
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
P2P Network Service using Redis Gossip
|
||||||
|
Handles peer-to-peer communication between blockchain nodes
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
from typing import Dict, Any, Optional
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class P2PNetworkService:
|
||||||
|
def __init__(self, host: str, port: int, redis_url: str, node_id: str):
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.redis_url = redis_url
|
||||||
|
self.node_id = node_id
|
||||||
|
self._server = None
|
||||||
|
self._stop_event = asyncio.Event()
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
"""Start P2P network service"""
|
||||||
|
logger.info(f"Starting P2P network service on {self.host}:{self.port}")
|
||||||
|
|
||||||
|
# Create TCP server for P2P connections
|
||||||
|
self._server = await asyncio.start_server(
|
||||||
|
self._handle_connection,
|
||||||
|
self.host,
|
||||||
|
self.port
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"P2P service listening on {self.host}:{self.port}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._stop_event.wait()
|
||||||
|
finally:
|
||||||
|
await self.stop()
|
||||||
|
|
||||||
|
async def stop(self):
|
||||||
|
"""Stop P2P network service"""
|
||||||
|
logger.info("Stopping P2P network service")
|
||||||
|
if self._server:
|
||||||
|
self._server.close()
|
||||||
|
await self._server.wait_closed()
|
||||||
|
|
||||||
|
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||||
|
"""Handle incoming P2P connections"""
|
||||||
|
addr = writer.get_extra_info('peername')
|
||||||
|
logger.info(f"P2P connection from {addr}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = await reader.read(1024)
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
message = json.loads(data.decode())
|
||||||
|
logger.info(f"P2P received: {message}")
|
||||||
|
|
||||||
|
# Handle different message types
|
||||||
|
if message.get('type') == 'ping':
|
||||||
|
response = {'type': 'pong', 'node_id': self.node_id}
|
||||||
|
writer.write(json.dumps(response).encode() + b'\n')
|
||||||
|
await writer.drain()
|
||||||
|
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.warning(f"Invalid JSON from {addr}")
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"P2P connection error: {e}")
|
||||||
|
finally:
|
||||||
|
writer.close()
|
||||||
|
await writer.wait_closed()
|
||||||
|
logger.info(f"P2P connection closed from {addr}")
|
||||||
|
|
||||||
|
async def run_p2p_service(host: str, port: int, redis_url: str, node_id: str):
|
||||||
|
"""Run P2P service"""
|
||||||
|
service = P2PNetworkService(host, port, redis_url, node_id)
|
||||||
|
await service.start()
|
||||||
|
|
||||||
|
def main():
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="AITBC P2P Network Service")
|
||||||
|
parser.add_argument("--host", default="0.0.0.0", help="Bind host")
|
||||||
|
parser.add_argument("--port", type=int, default=8005, help="Bind port")
|
||||||
|
parser.add_argument("--redis", default="redis://localhost:6379", help="Redis URL")
|
||||||
|
parser.add_argument("--node-id", help="Node identifier")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(run_p2p_service(args.host, args.port, args.redis, args.node_id))
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("P2P service stopped by user")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user