diff --git a/apps/agent-coordinator/scripts/agent_daemon.py b/apps/agent-coordinator/scripts/agent_daemon.py index a7b40b08..89f463bd 100755 --- a/apps/agent-coordinator/scripts/agent_daemon.py +++ b/apps/agent-coordinator/scripts/agent_daemon.py @@ -83,7 +83,7 @@ def decrypt_wallet(keystore_path: Path, password: str) -> bytes: raise ValueError(f"Unsupported cipher: {cipher}") -def create_tx(private_bytes: bytes, from_addr: str, to_addr: str, amount: float, fee: float, payload: str) -> dict: +def create_tx(private_bytes: bytes, from_addr: str, to_addr: str, amount: float, fee: float, payload: str, chain_id: str = "ait-mainnet") -> dict: """Create and sign a transaction""" priv_key = ed25519.Ed25519PrivateKey.from_private_bytes(private_bytes) pub_hex = priv_key.public_key().public_bytes( @@ -99,7 +99,7 @@ def create_tx(private_bytes: bytes, from_addr: str, to_addr: str, amount: float, "fee": fee, "nonce": int(time.time() * 1000), "payload": payload, - "chain_id": "ait-mainnet" + "chain_id": chain_id } tx_string = json.dumps(tx, sort_keys=True) @@ -120,6 +120,7 @@ def main(): parser.add_argument("--poll-interval", type=int, default=DEFAULT_POLL_INTERVAL, help="Poll interval in seconds") parser.add_argument("--reply-message", default="pong", help="Message to send as reply") parser.add_argument("--trigger-message", default="ping", help="Message that triggers reply") + parser.add_argument("--chain-id", default="ait-mainnet", help="Chain ID for transactions (default: ait-mainnet)") args = parser.parse_args() @@ -172,7 +173,10 @@ def main(): try: with Session(engine) as session: txs = session.exec( - select(Transaction).where(Transaction.recipient == args.address) + select(Transaction).where( + Transaction.recipient == args.address, + Transaction.chain_id == args.chain_id + ) ).all() for tx in txs: @@ -200,7 +204,7 @@ def main(): # Check if message matches trigger if sender != args.address and args.trigger_message in str(data): print(f"Received '{data}' from {sender}! Sending '{args.reply_message}'...") - reply_tx = create_tx(priv_bytes, args.address, sender, 0, 10, args.reply_message) + reply_tx = create_tx(priv_bytes, args.address, sender, 0, 10, args.reply_message, args.chain_id) try: res = requests.post(f"{args.rpc_url}/rpc/transaction", json=reply_tx, timeout=10) diff --git a/apps/blockchain-node/src/aitbc_chain/sync.py b/apps/blockchain-node/src/aitbc_chain/sync.py index d8879964..02cd0307 100755 --- a/apps/blockchain-node/src/aitbc_chain/sync.py +++ b/apps/blockchain-node/src/aitbc_chain/sync.py @@ -295,7 +295,7 @@ class ChainSync: if result.accepted: imported += 1 else: - logger.warning("Block import failed during bulk", extra={"height": block_data.get("height"), "reason": result.reason}) + logger.warning(f"Block import failed during bulk at height {block_data.get('height')}: {result.reason}", extra={"height": block_data.get("height"), "reason": result.reason}) return imported start_height = end_height + 1 @@ -518,15 +518,26 @@ class ChainSync: """ fork_height = block_data.get("height", -1) our_height = our_head.height + fork_chain_id = block_data.get("chain_id", "") metrics_registry.increment("sync_forks_detected_total") - logger.warning("Fork detected", extra={ + logger.warning(f"Fork detected at height {fork_height} (our height: {our_height}, fork hash: {block_data.get('hash')[:16]}..., our hash: {our_head.hash[:16]}...)", extra={ "fork_height": fork_height, "our_height": our_height, "fork_hash": block_data.get("hash"), "our_hash": our_head.hash, + "fork_chain_id": fork_chain_id, + "our_chain_id": self._chain_id, }) + # Check if chains are incompatible (different chain_id) + if fork_chain_id and fork_chain_id != self._chain_id: + return ImportResult( + accepted=False, height=fork_height, + block_hash=block_data.get("hash", ""), + reason=f"Incompatible chain: block from chain '{fork_chain_id}' does not match our chain '{self._chain_id}' (heights: {fork_height} vs {our_height})" + ) + # Simple longest-chain: only reorg if incoming chain is strictly longer # and within max reorg depth if fork_height <= our_height: diff --git a/scripts/wrappers/aitbc-agent-daemon-wrapper.py b/scripts/wrappers/aitbc-agent-daemon-wrapper.py index b6148b71..9d7552d3 100755 --- a/scripts/wrappers/aitbc-agent-daemon-wrapper.py +++ b/scripts/wrappers/aitbc-agent-daemon-wrapper.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 """ Wrapper script for aitbc-agent-daemon service -Uses centralized aitbc utilities for path configuration +Supports multichain by spawning daemon instances for each configured chain """ import sys import os +import subprocess from pathlib import Path # Add aitbc to path @@ -21,18 +22,62 @@ os.environ["PYTHONPATH"] = f"{REPO_DIR}:{REPO_DIR}/packages/py/aitbc-agent-sdk/s os.environ["DATA_DIR"] = str(DATA_DIR) os.environ["LOG_DIR"] = str(LOG_DIR) -# Execute the actual service -exec_cmd = [ - "/opt/aitbc/venv/bin/python", - f"{REPO_DIR}/apps/agent-coordinator/scripts/agent_daemon.py", +# Get chain configuration from environment +# Support both single chain (CHAIN_ID) and multiple chains (AGENT_DAEMON_CHAINS) +chains_str = os.getenv("AGENT_DAEMON_CHAINS", "") +if chains_str: + chains = [c.strip() for c in chains_str.split(",")] +else: + chains = [os.getenv("CHAIN_ID", "ait-mainnet")] + +# Spawn daemon processes for each chain +daemon_script = f"{REPO_DIR}/apps/agent-coordinator/scripts/agent_daemon.py" +base_args = [ "--wallet", "temp-agent", "--address", "ait1d18e286fc0c12888aca94732b5507c8787af71a5", "--password-file", str(KEYSTORE_DIR / ".agent_daemon_password"), "--keystore-dir", str(KEYSTORE_DIR), - "--db-path", "/var/lib/aitbc/data/chain.db", "--rpc-url", "http://localhost:8006", "--poll-interval", "2", "--reply-message", "pong", "--trigger-message", "ping" ] -os.execvp(exec_cmd[0], exec_cmd) + +if len(chains) == 1: + # Single chain: exec directly (replaces wrapper process) + chain_id = chains[0] + db_path = f"/var/lib/aitbc/data/{chain_id}/chain.db" + exec_cmd = [ + "/opt/aitbc/venv/bin/python", + daemon_script, + *base_args, + "--db-path", db_path, + "--chain-id", chain_id + ] + os.execvp(exec_cmd[0], exec_cmd) +else: + # Multiple chains: spawn subprocesses and wait + processes = [] + for chain_id in chains: + db_path = f"/var/lib/aitbc/data/{chain_id}/chain.db" + cmd = [ + "/opt/aitbc/venv/bin/python", + daemon_script, + *base_args, + "--db-path", db_path, + "--chain-id", chain_id + ] + print(f"Starting agent daemon for chain: {chain_id}") + proc = subprocess.Popen(cmd) + processes.append(proc) + + # Wait for all processes + try: + for proc in processes: + proc.wait() + except KeyboardInterrupt: + print("Shutting down agent daemons...") + for proc in processes: + proc.terminate() + for proc in processes: + proc.wait()