diff --git a/apps/blockchain-node/src/aitbc_chain/consensus/poa.py b/apps/blockchain-node/src/aitbc_chain/consensus/poa.py index a6d713ac..e34ba6f0 100644 --- a/apps/blockchain-node/src/aitbc_chain/consensus/poa.py +++ b/apps/blockchain-node/src/aitbc_chain/consensus/poa.py @@ -174,6 +174,7 @@ class PoAProposer: await gossip_broker.publish( "blocks", { + "chain_id": self._config.chain_id, "height": block.height, "hash": block.hash, "parent_hash": block.parent_hash, @@ -210,6 +211,7 @@ class PoAProposer: await gossip_broker.publish( "blocks", { + "chain_id": self._config.chain_id, "height": genesis.height, "hash": genesis.hash, "parent_hash": genesis.parent_hash, diff --git a/apps/blockchain-node/src/aitbc_chain/main.py b/apps/blockchain-node/src/aitbc_chain/main.py index 0135debf..b8cc58ca 100644 --- a/apps/blockchain-node/src/aitbc_chain/main.py +++ b/apps/blockchain-node/src/aitbc_chain/main.py @@ -8,6 +8,8 @@ from .config import settings from .consensus import PoAProposer, ProposerConfig, CircuitBreaker from .database import init_db, session_scope from .logger import get_logger +from .gossip import gossip_broker, create_backend +from .sync import ChainSync from .mempool import init_mempool logger = get_logger(__name__) @@ -18,8 +20,57 @@ class BlockchainNode: self._stop_event = asyncio.Event() self._proposers: dict[str, PoAProposer] = {} + async def _setup_gossip_subscribers(self) -> None: + # Transactions + tx_sub = await gossip_broker.subscribe("transactions") + + async def process_txs(): + from .mempool import get_mempool + mempool = get_mempool() + while True: + try: + tx_data = await tx_sub.queue.get() + if isinstance(tx_data, str): + import json + tx_data = json.loads(tx_data) + chain_id = tx_data.get("chain_id", "ait-devnet") + mempool.add(tx_data, chain_id=chain_id) + except Exception as exc: + logger.error(f"Error processing transaction from gossip: {exc}") + + asyncio.create_task(process_txs()) + + # Blocks + block_sub = await gossip_broker.subscribe("blocks") + + async def process_blocks(): + while True: + try: + block_data = await block_sub.queue.get() + logger.info(f"Received block from gossip") + if isinstance(block_data, str): + import json + block_data = json.loads(block_data) + chain_id = block_data.get("chain_id", "ait-devnet") + logger.info(f"Importing block for chain {chain_id}: {block_data.get('height')}") + sync = ChainSync(session_factory=session_scope, chain_id=chain_id) + res = sync.import_block(block_data) + logger.info(f"Import result: accepted={res.accepted}, reason={res.reason}") + except Exception as exc: + logger.error(f"Error processing block from gossip: {exc}") + + asyncio.create_task(process_blocks()) + async def start(self) -> None: logger.info("Starting blockchain node", extra={"supported_chains": getattr(settings, 'supported_chains', settings.chain_id)}) + + # Initialize Gossip Backend + backend = create_backend( + settings.gossip_backend, + broadcast_url=settings.gossip_broadcast_url, + ) + await gossip_broker.set_backend(backend) + init_db() init_mempool( backend=settings.mempool_backend, @@ -28,6 +79,7 @@ class BlockchainNode: min_fee=settings.min_fee, ) self._start_proposers() + await self._setup_gossip_subscribers() try: await self._stop_event.wait() finally: @@ -61,6 +113,7 @@ class BlockchainNode: for chain_id, proposer in list(self._proposers.items()): await proposer.stop() self._proposers.clear() + await gossip_broker.shutdown() @asynccontextmanager diff --git a/dev/scripts/create_genesis_all.py b/dev/scripts/create_genesis_all.py index ba63fedc..7e217239 100755 --- a/dev/scripts/create_genesis_all.py +++ b/dev/scripts/create_genesis_all.py @@ -10,7 +10,7 @@ from datetime import datetime import hashlib def compute_block_hash(chain_id: str, height: int, parent_hash: str, timestamp: datetime) -> str: - data = f"{chain_id}{height}{parent_hash}{timestamp}".encode() + data = f"{chain_id}{height}{parent_hash}{timestamp.isoformat()}".encode() return "0x" + hashlib.sha256(data).hexdigest() def create_genesis(chain_id: str): @@ -19,17 +19,18 @@ def create_genesis(chain_id: str): with session_scope() as session: existing = session.exec(select(Block).where(Block.chain_id == chain_id).order_by(Block.height.desc()).limit(1)).first() if existing: - print(f"Genesis block already exists for {chain_id}: #{existing.height}") + print(f"Genesis block already exists for {chain_id}: #{existing.height} (hash: {existing.hash})") return - timestamp = datetime.utcnow() + # Use a deterministic timestamp so all nodes agree on the hash + timestamp = datetime(2025, 1, 1, 0, 0, 0) genesis_hash = compute_block_hash(chain_id, 0, "0x00", timestamp) genesis = Block( chain_id=chain_id, height=0, hash=genesis_hash, parent_hash="0x00", - proposer=f"{chain_id}-proposer", + proposer="genesis", timestamp=timestamp, tx_count=0, state_root=None, @@ -39,9 +40,9 @@ def create_genesis(chain_id: str): print(f"Genesis block created for {chain_id}: #{genesis.height}") print(f"Hash: {genesis.hash}") print(f"Proposer: {genesis.proposer}") - print(f"Timestamp: {genesis.timestamp}") + print(f"Timestamp: {genesis.timestamp.isoformat()}") if __name__ == "__main__": init_db() - for chain in ["ait-testnet", "ait-devnet"]: + for chain in ["ait-testnet", "ait-devnet", "ait-healthchain"]: create_genesis(chain) diff --git a/dev/tests/test_sync.py b/dev/tests/test_sync.py new file mode 100644 index 00000000..d2cf6ccc --- /dev/null +++ b/dev/tests/test_sync.py @@ -0,0 +1,31 @@ +import asyncio +import httpx +import time + +async def main(): + async with httpx.AsyncClient() as client: + print("Submitting transaction to aitbc (testnet)...") + tx_data = { + "type": "transfer", + "sender": "0xTEST_SENDER", + "nonce": int(time.time()), + "fee": 1, + "payload": {"amount": 100, "recipient": "0xTEST_RECIPIENT"}, + "sig": "0xSIG" + } + resp = await client.post("http://10.1.223.93:8082/rpc/sendTx?chain_id=ait-testnet", json=tx_data) + print("aitbc response:", resp.status_code, resp.text) + + print("Waiting 5 seconds for gossip propagation and block proposing...") + await asyncio.sleep(5) + + print("Checking head on aitbc...") + resp = await client.get("http://10.1.223.93:8082/rpc/head?chain_id=ait-testnet") + print("aitbc head:", resp.status_code, resp.json()) + + print("Checking head on aitbc1...") + resp = await client.get("http://10.1.223.40:8082/rpc/head?chain_id=ait-testnet") + print("aitbc1 head:", resp.status_code, resp.json()) + +if __name__ == "__main__": + asyncio.run(main())