From dab867499c4faba70ccb83f5822e2e6542e510df Mon Sep 17 00:00:00 2001 From: aitbc Date: Tue, 14 Apr 2026 13:31:10 +0200 Subject: [PATCH] Add network command live RPC integration with multi-node probing and blockchain.env configuration - Add read_blockchain_env helper to parse /etc/aitbc/blockchain.env configuration - Add normalize_rpc_url helper to parse and normalize RPC endpoint URLs - Add probe_rpc_node to query /health and /rpc/head endpoints with latency tracking - Add get_network_snapshot to probe local node and p2p_peers from config - Update network status to show real connected node count and sync status - Update network peers to show actual peer endpoints and connection --- cli/.pytest_cache/v/cache/nodeids | 3 + cli/tests/test_cli_comprehensive.py | 20 ++++ cli/unified_cli.py | 173 +++++++++++++++++++++++++--- 3 files changed, 178 insertions(+), 18 deletions(-) diff --git a/cli/.pytest_cache/v/cache/nodeids b/cli/.pytest_cache/v/cache/nodeids index 783e4e33..f2fc2ecc 100644 --- a/cli/.pytest_cache/v/cache/nodeids +++ b/cli/.pytest_cache/v/cache/nodeids @@ -39,6 +39,9 @@ "tests/test_cli_comprehensive.py::TestMarketplaceCommand::test_marketplace_help", "tests/test_cli_comprehensive.py::TestMarketplaceCommand::test_marketplace_legacy_alias", "tests/test_cli_comprehensive.py::TestMarketplaceCommand::test_marketplace_list", + "tests/test_cli_comprehensive.py::TestNetworkCommand::test_network_ping_flag_alias", + "tests/test_cli_comprehensive.py::TestNetworkCommand::test_network_ping_positional_node", + "tests/test_cli_comprehensive.py::TestNetworkCommand::test_network_propagate_flag_alias", "tests/test_cli_comprehensive.py::TestPerformance::test_command_startup_time", "tests/test_cli_comprehensive.py::TestPerformance::test_help_response_time", "tests/test_cli_comprehensive.py::TestResourceCommand::test_resource_help", diff --git a/cli/tests/test_cli_comprehensive.py b/cli/tests/test_cli_comprehensive.py index e7b87de5..c6539a38 100644 --- a/cli/tests/test_cli_comprehensive.py +++ b/cli/tests/test_cli_comprehensive.py @@ -54,6 +54,26 @@ class TestBlockchainCommand: assert "--rpc-url" in result.stdout +class TestNetworkCommand: + """Test network subcommands and backward-compatible argument forms.""" + + def test_network_ping_positional_node(self): + result = run_cli("network", "ping", "localhost") + assert result.returncode == 0 + assert "Ping: Node localhost" in result.stdout + + def test_network_ping_flag_alias(self): + result = run_cli("network", "ping", "--node", "localhost") + assert result.returncode == 0 + assert "Ping: Node localhost" in result.stdout + + def test_network_propagate_flag_alias(self): + result = run_cli("network", "propagate", "--data", "smoke-test") + assert result.returncode == 0 + assert "Data propagation: Complete" in result.stdout + assert "smoke-test" in result.stdout + + class TestMarketplaceCommand: """Test marketplace grouping and legacy rewrite.""" diff --git a/cli/unified_cli.py b/cli/unified_cli.py index 44bdf833..11bdf59c 100644 --- a/cli/unified_cli.py +++ b/cli/unified_cli.py @@ -1,6 +1,9 @@ import argparse import json +import os import sys +from urllib.parse import urlparse + import requests @@ -13,7 +16,7 @@ def run_cli(argv, core): if any(k in arg_str for k in [ "contract --deploy", "contract --list", "contract --call", "mining --start", "mining --stop", "mining --status", - "agent --message", "agent --messages", "network sync", "network ping", "network propagate", + "agent --message", "agent --messages", "network sync", "wallet backup", "wallet export", "wallet sync", "ai --job", "ai list", "ai results", "ai --service", "ai status --job-id", "ai status --name", "resource --status", "resource --allocate", "resource --optimize", "resource --benchmark", "resource --monitor", "ollama --models", @@ -52,8 +55,6 @@ def run_cli(argv, core): kwargs["content"] = raw_args[raw_args.index("--content")+1] if "--content" in raw_args else "" elif "agent --messages" in arg_str: cmd = "agent_messages" elif "network sync --status" in arg_str: cmd = "network_sync_status" - elif "network ping" in arg_str: cmd = "network_ping" - elif "network propagate" in arg_str: cmd = "network_propagate" elif "wallet backup" in arg_str: cmd = "wallet_backup" kwargs["name"] = raw_args[raw_args.index("--name")+1] if "--name" in raw_args else "unknown" @@ -261,6 +262,116 @@ def run_cli(argv, core): else: print(f" {key.replace('_', ' ').title()}: {value}") + def read_blockchain_env(path="/etc/aitbc/blockchain.env"): + config = {} + try: + with open(path) as handle: + for raw_line in handle: + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + config[key.strip()] = value.strip() + except OSError: + return {} + return config + + def normalize_rpc_url(rpc_url): + parsed = urlparse(rpc_url if "://" in rpc_url else f"http://{rpc_url}") + scheme = parsed.scheme or "http" + host = parsed.hostname or "localhost" + port = parsed.port or (443 if scheme == "https" else 80) + return f"{scheme}://{host}:{port}", host, port + + def probe_rpc_node(name, rpc_url, chain_id=None): + base_url, _, _ = normalize_rpc_url(rpc_url) + health = None + head = None + error = None + latency_ms = None + + try: + health_response = requests.get(f"{base_url}/health", timeout=5) + latency_ms = round(health_response.elapsed.total_seconds() * 1000, 1) + if health_response.status_code == 200: + health = health_response.json() + if chain_id is None: + supported_chains = health.get("supported_chains", []) + if isinstance(supported_chains, str): + supported_chains = [chain.strip() for chain in supported_chains.split(",") if chain.strip()] + if supported_chains: + chain_id = supported_chains[0] + else: + error = f"health returned {health_response.status_code}" + except Exception as exc: + error = str(exc) + + head_url = f"{base_url}/rpc/head" + if chain_id: + head_url = f"{head_url}?chain_id={chain_id}" + + try: + head_response = requests.get(head_url, timeout=5) + if head_response.status_code == 200: + head = head_response.json() + elif head_response.status_code != 404 and error is None: + error = f"head returned {head_response.status_code}" + except Exception as exc: + if error is None: + error = str(exc) + + return { + "name": name, + "rpc_url": base_url, + "healthy": health is not None, + "height": head.get("height") if head else None, + "timestamp": head.get("timestamp") if head else None, + "chain_id": chain_id, + "error": error, + "latency_ms": latency_ms, + } + + def get_network_snapshot(rpc_url): + env_config = read_blockchain_env() + local_url, local_host, local_port = normalize_rpc_url(rpc_url) + local_name = env_config.get("p2p_node_id") or local_host or "local" + local_chain_id = env_config.get("chain_id") or None + nodes = [probe_rpc_node(local_name, local_url, chain_id=local_chain_id)] + + peer_rpc_port_value = env_config.get("rpc_bind_port") + try: + peer_rpc_port = int(peer_rpc_port_value) if peer_rpc_port_value else local_port + except ValueError: + peer_rpc_port = local_port + + seen_urls = {nodes[0]["rpc_url"]} + peers_raw = env_config.get("p2p_peers", "") + for peer in [item.strip() for item in peers_raw.split(",") if item.strip()]: + peer_host = peer.rsplit(":", 1)[0] + peer_url = f"http://{peer_host}:{peer_rpc_port}" + normalized_peer_url, _, _ = normalize_rpc_url(peer_url) + if normalized_peer_url in seen_urls: + continue + seen_urls.add(normalized_peer_url) + nodes.append(probe_rpc_node(peer_host, normalized_peer_url, chain_id=local_chain_id)) + + reachable_nodes = [node for node in nodes if node["healthy"]] + heights = [node["height"] for node in reachable_nodes if node["height"] is not None] + if len(nodes) <= 1: + sync_status = "standalone" + elif len(reachable_nodes) != len(nodes): + sync_status = "degraded" + elif len(heights) == len(nodes) and len(set(heights)) == 1: + sync_status = "synchronized" + else: + sync_status = "syncing" + + return { + "nodes": nodes, + "connected_count": len(reachable_nodes), + "sync_status": sync_status, + } + def normalize_legacy_args(raw_args): if not raw_args: return raw_args @@ -550,34 +661,58 @@ def run_cli(argv, core): sys.exit(1) def handle_network_status(args): + snapshot = get_network_snapshot(getattr(args, "rpc_url", default_rpc_url)) print("Network status:") - print(" Connected nodes: 2") - print(" Genesis: healthy") - print(" Follower: healthy") - print(" Sync status: synchronized") + print(f" Connected nodes: {snapshot['connected_count']}") + for index, node in enumerate(snapshot["nodes"]): + label = "Local" if index == 0 else f"Peer {node['name']}" + health = "healthy" if node["healthy"] else "unreachable" + print(f" {label}: {health}") + print(f" Sync status: {snapshot['sync_status']}") def handle_network_peers(args): + snapshot = get_network_snapshot(getattr(args, "rpc_url", default_rpc_url)) print("Network peers:") - print(" - genesis (localhost:8006) - Connected") - print(" - aitbc1 (10.1.223.40:8007) - Connected") + for node in snapshot["nodes"]: + endpoint = urlparse(node["rpc_url"]).netloc + status = "Connected" if node["healthy"] else f"Unreachable ({node['error'] or 'unknown error'})" + print(f" - {node['name']} ({endpoint}) - {status}") def handle_network_sync(args): + snapshot = get_network_snapshot(getattr(args, "rpc_url", default_rpc_url)) print("Network sync status:") - print(" Status: synchronized") - print(" Block height: 22502") - print(" Last sync: $(date)") + print(f" Status: {snapshot['sync_status']}") + for node in snapshot["nodes"]: + height = node["height"] if node["height"] is not None else "unknown" + print(f" {node['name']} height: {height}") + local_timestamp = snapshot["nodes"][0].get("timestamp") if snapshot["nodes"] else None + print(f" Last local block: {local_timestamp or 'unknown'}") def handle_network_ping(args): - node = args.node or "aitbc1" - print(f"Ping: Node {node} reachable") - print(" Latency: 5ms") - print(" Status: connected") + env_config = read_blockchain_env() + _, _, local_port = normalize_rpc_url(getattr(args, "rpc_url", default_rpc_url)) + peer_rpc_port_value = env_config.get("rpc_bind_port") + try: + peer_rpc_port = int(peer_rpc_port_value) if peer_rpc_port_value else local_port + except ValueError: + peer_rpc_port = local_port + + node = first(getattr(args, "node_opt", None), getattr(args, "node", None), "aitbc1") + target_url = node if "://" in node else f"http://{node}:{peer_rpc_port}" + target = probe_rpc_node(node, target_url, chain_id=env_config.get("chain_id") or None) + + print(f"Ping: Node {node} {'reachable' if target['healthy'] else 'unreachable'}") + print(f" Endpoint: {urlparse(target['rpc_url']).netloc}") + if target["latency_ms"] is not None: + print(f" Latency: {target['latency_ms']}ms") + print(f" Status: {'connected' if target['healthy'] else 'error'}") def handle_network_propagate(args): - data = args.data or "test-data" + data = first(getattr(args, "data_opt", None), getattr(args, "data", None), "test-data") + snapshot = get_network_snapshot(getattr(args, "rpc_url", default_rpc_url)) print("Data propagation: Complete") print(f" Data: {data}") - print(" Nodes: 2/2 updated") + print(f" Nodes: {snapshot['connected_count']}/{len(snapshot['nodes'])} reachable") def handle_market_action(args): kwargs = { @@ -846,11 +981,13 @@ def run_cli(argv, core): network_ping_parser = network_subparsers.add_parser("ping", help="Ping a node") network_ping_parser.add_argument("node", nargs="?") + network_ping_parser.add_argument("--node", dest="node_opt", help=argparse.SUPPRESS) network_ping_parser.add_argument("--rpc-url", default=default_rpc_url) network_ping_parser.set_defaults(handler=handle_network_ping) network_propagate_parser = network_subparsers.add_parser("propagate", help="Propagate test data") network_propagate_parser.add_argument("data", nargs="?") + network_propagate_parser.add_argument("--data", dest="data_opt", help=argparse.SUPPRESS) network_propagate_parser.add_argument("--rpc-url", default=default_rpc_url) network_propagate_parser.set_defaults(handler=handle_network_propagate)