Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
- test_keystore_mac.py: Tests HMAC-SHA256 MAC computation for web3 keystore format - MAC computation validation - Keystore generation with MAC - MAC validation for password error detection - test_agent_signature_verification.py: Tests agent SDK signature verification - ed25519 signature generation and verification - Wrong public key detection - Tampered message detection - Public key fetch from coordinator API - Receive message with signature verification - test_agent_integration_service.py: Tests agent integration service features - Systemd service file generation - Health check response format - Metrics collection format - Alerting rules configuration - Deployment rollback logic - Instance removal logic
334 lines
10 KiB
Python
334 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test agent integration service features
|
|
Tests systemd deployment, health checks, metrics collection, and alerting rules
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
from unittest.mock import AsyncMock, patch, MagicMock
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
def test_systemd_service_file_generation():
|
|
"""Test systemd service file generation"""
|
|
print("Testing Systemd Service File Generation")
|
|
print("=" * 40)
|
|
|
|
instance_id = "test-deployment-production-1"
|
|
port = 8001
|
|
|
|
# Generate service file content
|
|
service_content = f"""[Unit]
|
|
Description=AITBC Agent Instance {instance_id}
|
|
Documentation=https://github.com/aitbc/blockchain
|
|
After=network.target aitbc-blockchain-node.service
|
|
Requires=aitbc-blockchain-node.service
|
|
|
|
[Service]
|
|
Type=simple
|
|
User=root
|
|
Group=root
|
|
WorkingDirectory=/opt/aitbc
|
|
EnvironmentFile=/etc/aitbc/.env
|
|
Environment="AGENT_ID={instance_id}"
|
|
Environment="AGENT_PORT={port}"
|
|
Environment="PYTHONPATH=/opt/aitbc/packages/py/aitbc-agent-sdk/src:/opt/aitbc"
|
|
Environment="PATH=/opt/aitbc/venv/bin:/usr/local/bin:/usr/bin:/bin"
|
|
ExecStart=/opt/aitbc/venv/bin/python /opt/aitbc/scripts/wrappers/aitbc-agent-daemon-wrapper.py
|
|
|
|
Restart=always
|
|
RestartSec=10
|
|
StandardOutput=journal
|
|
StandardError=journal
|
|
SyslogIdentifier=AgentInstance-{instance_id}
|
|
|
|
# Security settings
|
|
NoNewPrivileges=true
|
|
PrivateTmp=true
|
|
ProtectHome=true
|
|
|
|
[Install]
|
|
WantedBy=multi-user.target
|
|
"""
|
|
|
|
# Verify service file contains required sections
|
|
required_sections = ["[Unit]", "[Service]", "[Install]"]
|
|
for section in required_sections:
|
|
if section in service_content:
|
|
print(f"✅ Service file contains {section}")
|
|
else:
|
|
print(f"❌ Service file missing {section}")
|
|
return False
|
|
|
|
# Verify environment variables
|
|
if f"AGENT_ID={instance_id}" in service_content:
|
|
print(f"✅ Service file contains AGENT_ID")
|
|
else:
|
|
print("❌ Service file missing AGENT_ID")
|
|
return False
|
|
|
|
if f"AGENT_PORT={port}" in service_content:
|
|
print(f"✅ Service file contains AGENT_PORT")
|
|
else:
|
|
print("❌ Service file missing AGENT_PORT")
|
|
return False
|
|
|
|
print("\n✅ Systemd service file generation test passed!")
|
|
return True
|
|
|
|
|
|
def test_health_check_response_format():
|
|
"""Test health check response format"""
|
|
print("\nTesting Health Check Response Format")
|
|
print("=" * 40)
|
|
|
|
# Mock health check response
|
|
health_response = {
|
|
"instance_id": "test-instance",
|
|
"status": "healthy",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"response_time": 0.1,
|
|
"service_active": True
|
|
}
|
|
|
|
# Verify required fields
|
|
required_fields = ["instance_id", "status", "timestamp"]
|
|
for field in required_fields:
|
|
if field in health_response:
|
|
print(f"✅ Health response contains {field}")
|
|
else:
|
|
print(f"❌ Health response missing {field}")
|
|
return False
|
|
|
|
# Verify status is valid
|
|
valid_statuses = ["healthy", "degraded", "unhealthy"]
|
|
if health_response["status"] in valid_statuses:
|
|
print(f"✅ Health status is valid: {health_response['status']}")
|
|
else:
|
|
print(f"❌ Invalid health status: {health_response['status']}")
|
|
return False
|
|
|
|
print("\n✅ Health check response format test passed!")
|
|
return True
|
|
|
|
|
|
def test_metrics_collection_format():
|
|
"""Test metrics collection format"""
|
|
print("\nTesting Metrics Collection Format")
|
|
print("=" * 40)
|
|
|
|
# Mock metrics response
|
|
metrics_response = {
|
|
"instance_id": "test-instance",
|
|
"status": "deployed",
|
|
"health_status": "healthy",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"cpu_usage": 45.5,
|
|
"memory_usage": 60.2,
|
|
"request_count": 1000,
|
|
"error_count": 5,
|
|
"average_response_time": 0.15,
|
|
"uptime_percentage": 99.9
|
|
}
|
|
|
|
# Verify required fields
|
|
required_fields = ["instance_id", "status", "cpu_usage", "memory_usage"]
|
|
for field in required_fields:
|
|
if field in metrics_response:
|
|
print(f"✅ Metrics response contains {field}")
|
|
else:
|
|
print(f"❌ Metrics response missing {field}")
|
|
return False
|
|
|
|
# Verify metric values are numeric
|
|
numeric_fields = ["cpu_usage", "memory_usage", "request_count", "error_count"]
|
|
for field in numeric_fields:
|
|
if isinstance(metrics_response.get(field), (int, float)):
|
|
print(f"✅ {field} is numeric")
|
|
else:
|
|
print(f"❌ {field} is not numeric")
|
|
return False
|
|
|
|
print("\n✅ Metrics collection format test passed!")
|
|
return True
|
|
|
|
|
|
def test_alerting_rules_configuration():
|
|
"""Test alerting rules configuration"""
|
|
print("\nTesting Alerting Rules Configuration")
|
|
print("=" * 40)
|
|
|
|
# Mock alerting rules
|
|
alerting_rules = {
|
|
"rules": [
|
|
{
|
|
"name": "high_cpu_usage",
|
|
"condition": "cpu_usage > 90",
|
|
"severity": "critical"
|
|
},
|
|
{
|
|
"name": "high_memory_usage",
|
|
"condition": "memory_usage > 95",
|
|
"severity": "critical"
|
|
}
|
|
],
|
|
"channels": ["log", "email"],
|
|
"thresholds": {
|
|
"cpu_usage_warning": 80.0,
|
|
"cpu_usage_critical": 90.0,
|
|
"memory_usage_warning": 85.0,
|
|
"memory_usage_critical": 95.0,
|
|
"error_rate_warning": 0.05,
|
|
"error_rate_critical": 0.10
|
|
}
|
|
}
|
|
|
|
# Verify alerting rules structure
|
|
if "rules" in alerting_rules and len(alerting_rules["rules"]) > 0:
|
|
print(f"✅ Alerting rules contains {len(alerting_rules['rules'])} rules")
|
|
else:
|
|
print("❌ Alerting rules missing rules")
|
|
return False
|
|
|
|
# Verify thresholds
|
|
if "thresholds" in alerting_rules:
|
|
print(f"✅ Alerting rules contains thresholds")
|
|
required_thresholds = ["cpu_usage_warning", "cpu_usage_critical", "memory_usage_warning"]
|
|
for threshold in required_thresholds:
|
|
if threshold in alerting_rules["thresholds"]:
|
|
print(f"✅ Threshold {threshold} defined")
|
|
else:
|
|
print(f"❌ Threshold {threshold} missing")
|
|
return False
|
|
else:
|
|
print("❌ Alerting rules missing thresholds")
|
|
return False
|
|
|
|
# Verify channels
|
|
if "channels" in alerting_rules and len(alerting_rules["channels"]) > 0:
|
|
print(f"✅ Alerting channels defined: {alerting_rules['channels']}")
|
|
else:
|
|
print("❌ Alerting channels missing")
|
|
return False
|
|
|
|
print("\n✅ Alerting rules configuration test passed!")
|
|
return True
|
|
|
|
|
|
async def test_deployment_rollback_logic():
|
|
"""Test deployment rollback logic"""
|
|
print("\nTesting Deployment Rollback Logic")
|
|
print("=" * 40)
|
|
|
|
# Mock deployment config with previous version
|
|
deployment_config = {
|
|
"id": "test-deployment",
|
|
"agent_version": "v2.0.0",
|
|
"previous_version": "v1.5.0",
|
|
"rollback_enabled": True
|
|
}
|
|
|
|
# Test rollback scenario
|
|
if deployment_config["rollback_enabled"]:
|
|
print("✅ Rollback is enabled")
|
|
|
|
if deployment_config["previous_version"]:
|
|
print(f"✅ Previous version available: {deployment_config['previous_version']}")
|
|
|
|
# Simulate rollback
|
|
new_version = deployment_config["previous_version"]
|
|
print(f"✅ Rolling back to version: {new_version}")
|
|
else:
|
|
print("❌ No previous version available for rollback")
|
|
return False
|
|
else:
|
|
print("❌ Rollback is not enabled")
|
|
return False
|
|
|
|
# Test rollback without previous version
|
|
no_rollback_config = {
|
|
"id": "test-deployment-2",
|
|
"agent_version": "v2.0.0",
|
|
"previous_version": None,
|
|
"rollback_enabled": True
|
|
}
|
|
|
|
if not no_rollback_config["previous_version"]:
|
|
print("✅ Correctly detected missing previous version")
|
|
else:
|
|
print("❌ Should detect missing previous version")
|
|
return False
|
|
|
|
print("\n✅ Deployment rollback logic test passed!")
|
|
return True
|
|
|
|
|
|
async def test_instance_removal_logic():
|
|
"""Test instance removal logic"""
|
|
print("\nTesting Instance Removal Logic")
|
|
print("=" * 40)
|
|
|
|
instance_id = "test-instance-1"
|
|
service_name = f"aitbc-agent-{instance_id}"
|
|
service_file = f"/etc/systemd/system/{service_name}.service"
|
|
|
|
# Mock removal steps
|
|
removal_steps = [
|
|
f"systemctl stop {service_name}",
|
|
f"systemctl disable {service_name}",
|
|
f"rm {service_file}",
|
|
"systemctl daemon-reload"
|
|
]
|
|
|
|
print(f"Instance ID: {instance_id}")
|
|
print(f"Service name: {service_name}")
|
|
print(f"Service file: {service_file}")
|
|
print()
|
|
print("Removal steps:")
|
|
for step in removal_steps:
|
|
print(f" - {step}")
|
|
|
|
# Verify all steps are present
|
|
if len(removal_steps) == 4:
|
|
print("✅ All 4 removal steps defined")
|
|
else:
|
|
print(f"❌ Expected 4 steps, got {len(removal_steps)}")
|
|
return False
|
|
|
|
print("\n✅ Instance removal logic test passed!")
|
|
return True
|
|
|
|
|
|
async def run_tests():
|
|
"""Run all agent integration service tests"""
|
|
print("Agent Integration Service Tests")
|
|
print("=" * 40)
|
|
print()
|
|
|
|
results = []
|
|
results.append(("Systemd Service File Generation", test_systemd_service_file_generation()))
|
|
results.append(("Health Check Response Format", test_health_check_response_format()))
|
|
results.append(("Metrics Collection Format", test_metrics_collection_format()))
|
|
results.append(("Alerting Rules Configuration", test_alerting_rules_configuration()))
|
|
results.append(("Deployment Rollback Logic", await test_deployment_rollback_logic()))
|
|
results.append(("Instance Removal Logic", await test_instance_removal_logic()))
|
|
|
|
print("\n" + "=" * 40)
|
|
print("Test Summary")
|
|
print("=" * 40)
|
|
|
|
for name, passed in results:
|
|
status = "✅ PASSED" if passed else "❌ FAILED"
|
|
print(f"{name}: {status}")
|
|
|
|
all_passed = all(result[1] for result in results)
|
|
if all_passed:
|
|
print("\n🎉 All tests passed!")
|
|
else:
|
|
print("\n❌ Some tests failed")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_tests())
|