fix(sync): resolve multi-chain gossip block propagation and sync across nodes
This commit is contained in:
@@ -174,6 +174,7 @@ class PoAProposer:
|
||||
await gossip_broker.publish(
|
||||
"blocks",
|
||||
{
|
||||
"chain_id": self._config.chain_id,
|
||||
"height": block.height,
|
||||
"hash": block.hash,
|
||||
"parent_hash": block.parent_hash,
|
||||
@@ -210,6 +211,7 @@ class PoAProposer:
|
||||
await gossip_broker.publish(
|
||||
"blocks",
|
||||
{
|
||||
"chain_id": self._config.chain_id,
|
||||
"height": genesis.height,
|
||||
"hash": genesis.hash,
|
||||
"parent_hash": genesis.parent_hash,
|
||||
|
||||
@@ -8,6 +8,8 @@ from .config import settings
|
||||
from .consensus import PoAProposer, ProposerConfig, CircuitBreaker
|
||||
from .database import init_db, session_scope
|
||||
from .logger import get_logger
|
||||
from .gossip import gossip_broker, create_backend
|
||||
from .sync import ChainSync
|
||||
from .mempool import init_mempool
|
||||
|
||||
logger = get_logger(__name__)
|
||||
@@ -18,8 +20,57 @@ class BlockchainNode:
|
||||
self._stop_event = asyncio.Event()
|
||||
self._proposers: dict[str, PoAProposer] = {}
|
||||
|
||||
async def _setup_gossip_subscribers(self) -> None:
|
||||
# Transactions
|
||||
tx_sub = await gossip_broker.subscribe("transactions")
|
||||
|
||||
async def process_txs():
|
||||
from .mempool import get_mempool
|
||||
mempool = get_mempool()
|
||||
while True:
|
||||
try:
|
||||
tx_data = await tx_sub.queue.get()
|
||||
if isinstance(tx_data, str):
|
||||
import json
|
||||
tx_data = json.loads(tx_data)
|
||||
chain_id = tx_data.get("chain_id", "ait-devnet")
|
||||
mempool.add(tx_data, chain_id=chain_id)
|
||||
except Exception as exc:
|
||||
logger.error(f"Error processing transaction from gossip: {exc}")
|
||||
|
||||
asyncio.create_task(process_txs())
|
||||
|
||||
# Blocks
|
||||
block_sub = await gossip_broker.subscribe("blocks")
|
||||
|
||||
async def process_blocks():
|
||||
while True:
|
||||
try:
|
||||
block_data = await block_sub.queue.get()
|
||||
logger.info(f"Received block from gossip")
|
||||
if isinstance(block_data, str):
|
||||
import json
|
||||
block_data = json.loads(block_data)
|
||||
chain_id = block_data.get("chain_id", "ait-devnet")
|
||||
logger.info(f"Importing block for chain {chain_id}: {block_data.get('height')}")
|
||||
sync = ChainSync(session_factory=session_scope, chain_id=chain_id)
|
||||
res = sync.import_block(block_data)
|
||||
logger.info(f"Import result: accepted={res.accepted}, reason={res.reason}")
|
||||
except Exception as exc:
|
||||
logger.error(f"Error processing block from gossip: {exc}")
|
||||
|
||||
asyncio.create_task(process_blocks())
|
||||
|
||||
async def start(self) -> None:
|
||||
logger.info("Starting blockchain node", extra={"supported_chains": getattr(settings, 'supported_chains', settings.chain_id)})
|
||||
|
||||
# Initialize Gossip Backend
|
||||
backend = create_backend(
|
||||
settings.gossip_backend,
|
||||
broadcast_url=settings.gossip_broadcast_url,
|
||||
)
|
||||
await gossip_broker.set_backend(backend)
|
||||
|
||||
init_db()
|
||||
init_mempool(
|
||||
backend=settings.mempool_backend,
|
||||
@@ -28,6 +79,7 @@ class BlockchainNode:
|
||||
min_fee=settings.min_fee,
|
||||
)
|
||||
self._start_proposers()
|
||||
await self._setup_gossip_subscribers()
|
||||
try:
|
||||
await self._stop_event.wait()
|
||||
finally:
|
||||
@@ -61,6 +113,7 @@ class BlockchainNode:
|
||||
for chain_id, proposer in list(self._proposers.items()):
|
||||
await proposer.stop()
|
||||
self._proposers.clear()
|
||||
await gossip_broker.shutdown()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
||||
Reference in New Issue
Block a user