#!/usr/bin/env python3 """CI test script for AITBC API endpoints.""" import requests import json import time import statistics import sys # Service ports (must match systemd config) SERVICES = { "coordinator": {"url": "http://localhost:8000", "endpoints": ["/", "/health", "/info"]}, "exchange": {"url": "http://localhost:8001", "endpoints": ["/", "/api/health", "/health", "/info"]}, "wallet": {"url": "http://localhost:8003", "endpoints": ["/", "/health", "/wallets"]}, "blockchain_rpc": {"url": "http://localhost:8006", "endpoints": ["/health", "/rpc/head", "/rpc/mempool"]}, } def test_service_endpoints(name, base_url, endpoints, timeout=5): results = {"service": name, "endpoints": [], "success": True} for ep in endpoints: url = f"{base_url}{ep}" try: r = requests.get(url, timeout=timeout) ok = r.status_code in (200, 404, 405) results["endpoints"].append({"url": url, "status": r.status_code, "success": ok}) print(f" {'โœ…' if ok else 'โŒ'} {url}: {r.status_code}") if not ok: results["success"] = False except Exception as e: results["endpoints"].append({"url": url, "error": str(e), "success": False}) print(f" โŒ {url}: {e}") results["success"] = False return results def test_performance(apis, rounds=10, timeout=5): results = {} for name, url in apis: times = [] ok_count = 0 for i in range(rounds): try: t0 = time.time() r = requests.get(url, timeout=timeout) dt = time.time() - t0 times.append(dt) if r.status_code in (200, 404, 405): ok_count += 1 except Exception: pass if times: results[name] = { "avg_ms": round(statistics.mean(times) * 1000, 1), "min_ms": round(min(times) * 1000, 1), "max_ms": round(max(times) * 1000, 1), "success_rate": f"{ok_count}/{rounds}", } print(f" ๐Ÿ“Š {name}: avg={results[name]['avg_ms']}ms ok={ok_count}/{rounds}") else: results[name] = {"error": "all requests failed"} print(f" โŒ {name}: all requests failed") return results def main(): all_results = {} overall_ok = True for name, cfg in SERVICES.items(): print(f"\n๐Ÿงช Testing {name}...") r = test_service_endpoints(name, cfg["url"], cfg["endpoints"]) all_results[name] = r if not r["success"]: overall_ok = False print("\nโšก Performance tests...") perf = test_performance([ ("Coordinator", "http://localhost:8000/health"), ("Exchange", "http://localhost:8001/api/health"), ("Wallet", "http://localhost:8003/health"), ("Blockchain RPC", "http://localhost:8006/health"), ]) all_results["performance"] = perf with open("api-test-results.json", "w") as f: json.dump(all_results, f, indent=2) print(f"\n{'โœ…' if overall_ok else 'โš ๏ธ'} API endpoint tests completed") return 0 if overall_ok else 1 if __name__ == "__main__": sys.exit(main())