Files
aitbc/scripts/ci/test_api_endpoints.py
aitbc 316dc8e894
All checks were successful
API Endpoint Tests / test-api-endpoints (push) Successful in 15s
Deploy to Testnet / deploy-testnet (push) Successful in 1m12s
Integration Tests / test-service-integration (push) Successful in 2m37s
Production Tests / Production Integration Tests (push) Successful in 21s
Python Tests / test-python (push) Successful in 41s
Security Scanning / security-scan (push) Successful in 36s
Deploy to Testnet / notify-deployment (push) Successful in 3s
Node Failover Simulation / failover-test (push) Successful in 2s
Multi-Node Stress Testing / stress-test (push) Successful in 4s
Cross-Node Transaction Testing / transaction-test (push) Successful in 10s
Update workflows for agent coordinator and app deps
2026-05-02 13:34:08 +02:00

98 lines
3.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""CI test script for AITBC API endpoints."""
import os
import requests
import json
import time
import statistics
import sys
# Service ports (must match systemd config)
API_HOST = os.environ.get("AITBC_API_HOST", "localhost")
SERVICES = {
"agent_coordinator": {"url": f"http://{API_HOST}:9001", "endpoints": ["/", "/health"]},
"exchange": {"url": f"http://{API_HOST}:8001", "endpoints": ["/", "/api/health", "/health", "/info"]},
"wallet": {"url": f"http://{API_HOST}:8003", "endpoints": ["/", "/health", "/wallets"]},
"blockchain_rpc": {"url": f"http://{API_HOST}:8006", "endpoints": ["/health", "/rpc/head", "/rpc/head?chain_id=ait-mainnet", "/rpc/head?chain_id=ait-testnet", "/rpc/mempool"]},
}
def test_service_endpoints(name, base_url, endpoints, timeout=5):
results = {"service": name, "endpoints": [], "success": True}
for ep in endpoints:
url = f"{base_url}{ep}"
try:
r = requests.get(url, timeout=timeout)
ok = r.status_code in (200, 404, 405)
results["endpoints"].append({"url": url, "status": r.status_code, "success": ok})
print(f" {'' if ok else ''} {url}: {r.status_code}")
if not ok:
results["success"] = False
except Exception as e:
results["endpoints"].append({"url": url, "error": str(e), "success": False})
print(f"{url}: {e}")
results["success"] = False
return results
def test_performance(apis, rounds=10, timeout=5):
results = {}
for name, url in apis:
times = []
ok_count = 0
for i in range(rounds):
try:
t0 = time.time()
r = requests.get(url, timeout=timeout)
dt = time.time() - t0
times.append(dt)
if r.status_code in (200, 404, 405):
ok_count += 1
except Exception:
pass
if times:
results[name] = {
"avg_ms": round(statistics.mean(times) * 1000, 1),
"min_ms": round(min(times) * 1000, 1),
"max_ms": round(max(times) * 1000, 1),
"success_rate": f"{ok_count}/{rounds}",
}
print(f" 📊 {name}: avg={results[name]['avg_ms']}ms ok={ok_count}/{rounds}")
else:
results[name] = {"error": "all requests failed"}
print(f"{name}: all requests failed")
return results
def main():
all_results = {}
overall_ok = True
for name, cfg in SERVICES.items():
print(f"\n🧪 Testing {name}...")
r = test_service_endpoints(name, cfg["url"], cfg["endpoints"])
all_results[name] = r
if not r["success"]:
overall_ok = False
print("\n⚡ Performance tests...")
perf = test_performance([
("Agent Coordinator", f"http://{API_HOST}:9001/health"),
("Exchange", f"http://{API_HOST}:8001/api/health"),
("Wallet", f"http://{API_HOST}:8003/health"),
("Blockchain RPC", f"http://{API_HOST}:8006/health"),
])
all_results["performance"] = perf
with open("api-test-results.json", "w") as f:
json.dump(all_results, f, indent=2)
print(f"\n{'' if overall_ok else '⚠️'} API endpoint tests completed")
return 0 if overall_ok else 1
if __name__ == "__main__":
sys.exit(main())