Files
aitbc/apps/blockchain-node/src/aitbc_chain/main.py
aitbc 74e5a880b0
Some checks failed
Integration Tests / test-service-integration (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
Add logging and error handling to gossip subscriber setup in blockchain node
- Add info logs for gossip subscriber setup start and completion
- Add try-except blocks around transactions and blocks topic subscriptions
- Add success logs after each subscription
- Add error logs with exception details if subscription fails
- Add early return if subscription fails to prevent further processing
- Add logging for gossip backend initialization with backend type and URL
- Add success log after gossip backend initialization compl
2026-04-14 08:56:24 +02:00

227 lines
8.8 KiB
Python
Executable File

from __future__ import annotations
import asyncio
import json
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
from .config import settings
from .consensus import PoAProposer, ProposerConfig, CircuitBreaker
from .database import init_db, session_scope
from .logger import get_logger
from .gossip import gossip_broker, create_backend
from .sync import ChainSync
from .mempool import init_mempool
logger = get_logger(__name__)
def _load_keystore_password() -> str:
"""Load keystore password from file or environment."""
pwd_file = settings.keystore_password_file
if pwd_file.exists():
return pwd_file.read_text().strip()
env_pwd = os.getenv("KEYSTORE_PASSWORD")
if env_pwd:
return env_pwd
raise RuntimeError(f"Keystore password not found. Set in {pwd_file} or KEYSTORE_PASSWORD env.")
def _load_private_key_from_keystore(keystore_dir: Path, password: str, target_address: Optional[str] = None) -> Optional[bytes]:
"""Load an ed25519 private key from the keystore.
If target_address is given, find the keystore file with matching address.
Otherwise, return the first key found.
"""
if not keystore_dir.exists():
return None
for kf in keystore_dir.glob("*.json"):
try:
with open(kf) as f:
data = json.load(f)
addr = data.get("address")
if target_address and addr != target_address:
continue
# Decrypt
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.backends import default_backend
crypto = data["crypto"]
kdfparams = crypto["kdfparams"]
salt = bytes.fromhex(kdfparams["salt"])
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=kdfparams["c"],
backend=default_backend()
)
key = kdf.derive(password.encode('utf-8'))
nonce = bytes.fromhex(crypto["cipherparams"]["nonce"])
ciphertext = bytes.fromhex(crypto["ciphertext"])
aesgcm = AESGCM(key)
private_bytes = aesgcm.decrypt(nonce, ciphertext, None)
# Verify it's ed25519
priv_key = ed25519.Ed25519PrivateKey.from_private_bytes(private_bytes)
return private_bytes
except Exception:
continue
return None
# Attempt to load proposer private key from keystore if not set
if not settings.proposer_key:
try:
pwd = _load_keystore_password()
key_bytes = _load_private_key_from_keystore(settings.keystore_path, pwd, target_address=settings.proposer_id)
if key_bytes:
# Encode as hex for easy storage; not yet used for signing
settings.proposer_key = key_bytes.hex()
logger.info("Loaded proposer private key from keystore", extra={"proposer_id": settings.proposer_id})
else:
logger.warning("Proposer private key not found in keystore; block signing disabled", extra={"proposer_id": settings.proposer_id})
except Exception as e:
logger.warning("Failed to load proposer key from keystore", extra={"error": str(e)})
class BlockchainNode:
def __init__(self) -> None:
self._stop_event = asyncio.Event()
self._proposers: dict[str, PoAProposer] = {}
async def _setup_gossip_subscribers(self) -> None:
logger.info("Setting up gossip subscribers")
# Transactions
try:
tx_sub = await gossip_broker.subscribe("transactions")
logger.info("Successfully subscribed to transactions topic")
except Exception as e:
logger.error(f"Failed to subscribe to transactions: {e}")
return
async def process_txs():
from .mempool import get_mempool
mempool = get_mempool()
while True:
try:
tx_data = await tx_sub.queue.get()
if isinstance(tx_data, str):
import json
tx_data = json.loads(tx_data)
chain_id = tx_data.get("chain_id", settings.chain_id)
mempool.add(tx_data, chain_id=chain_id)
except Exception as exc:
logger.error(f"Error processing transaction from gossip: {exc}")
asyncio.create_task(process_txs())
# Blocks
try:
block_sub = await gossip_broker.subscribe("blocks")
logger.info("Successfully subscribed to blocks topic")
except Exception as e:
logger.error(f"Failed to subscribe to blocks: {e}")
return
async def process_blocks():
while True:
try:
block_data = await block_sub.queue.get()
logger.info(f"Received block from gossip")
if isinstance(block_data, str):
import json
block_data = json.loads(block_data)
chain_id = block_data.get("chain_id", settings.chain_id)
logger.info(f"Importing block for chain {chain_id}: {block_data.get('height')}")
sync = ChainSync(session_factory=session_scope, chain_id=chain_id)
res = sync.import_block(block_data, transactions=block_data.get("transactions"))
logger.info(f"Import result: accepted={res.accepted}, reason={res.reason}")
except Exception as exc:
logger.error(f"Error processing block from gossip: {exc}")
asyncio.create_task(process_blocks())
logger.info("Gossip subscribers setup completed")
async def start(self) -> None:
logger.info("Starting blockchain node", extra={"supported_chains": getattr(settings, 'supported_chains', settings.chain_id)})
# Initialize Gossip Backend
backend = create_backend(
settings.gossip_backend,
broadcast_url=settings.gossip_broadcast_url,
)
logger.info(f"Initializing gossip backend: {settings.gossip_backend}, url: {settings.gossip_broadcast_url}")
await gossip_broker.set_backend(backend)
logger.info("Gossip backend initialized successfully")
init_db()
init_mempool(
backend=settings.mempool_backend,
db_path=str(settings.db_path.parent / "mempool.db"),
max_size=settings.mempool_max_size,
min_fee=settings.min_fee,
)
# Start proposers only if enabled (followers set enable_block_production=False)
if getattr(settings, "enable_block_production", True):
self._start_proposers()
else:
logger.info("Block production disabled on this node", extra={"proposer_id": settings.proposer_id})
await self._setup_gossip_subscribers()
try:
await self._stop_event.wait()
finally:
await self._shutdown()
async def stop(self) -> None:
logger.info("Stopping blockchain node")
self._stop_event.set()
await self._shutdown()
def _start_proposers(self) -> None:
chains_str = getattr(settings, 'supported_chains', settings.chain_id)
chains = [c.strip() for c in chains_str.split(",") if c.strip()]
for chain_id in chains:
if chain_id in self._proposers:
continue
proposer_config = ProposerConfig(
chain_id=chain_id,
proposer_id=settings.proposer_id,
interval_seconds=settings.block_time_seconds,
max_block_size_bytes=settings.max_block_size_bytes,
max_txs_per_block=settings.max_txs_per_block,
)
proposer = PoAProposer(config=proposer_config, session_factory=session_scope)
self._proposers[chain_id] = proposer
asyncio.create_task(proposer.start())
async def _shutdown(self) -> None:
for chain_id, proposer in list(self._proposers.items()):
await proposer.stop()
self._proposers.clear()
await gossip_broker.shutdown()
@asynccontextmanager
async def node_app() -> asyncio.AbstractAsyncContextManager[BlockchainNode]: # type: ignore[override]
node = BlockchainNode()
try:
yield node
finally:
await node.stop()
def run() -> None:
asyncio.run(_run())
async def _run() -> None:
async with node_app() as node:
await node.start()
if __name__ == "__main__": # pragma: no cover
run()