diff --git a/tests/verification/test_agent_integration_service.py b/tests/verification/test_agent_integration_service.py new file mode 100644 index 00000000..1ce8bbad --- /dev/null +++ b/tests/verification/test_agent_integration_service.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python3 +""" +Test agent integration service features +Tests systemd deployment, health checks, metrics collection, and alerting rules +""" + +import asyncio +import json +from unittest.mock import AsyncMock, patch, MagicMock +from datetime import datetime, timezone + + +def test_systemd_service_file_generation(): + """Test systemd service file generation""" + print("Testing Systemd Service File Generation") + print("=" * 40) + + instance_id = "test-deployment-production-1" + port = 8001 + + # Generate service file content + service_content = f"""[Unit] +Description=AITBC Agent Instance {instance_id} +Documentation=https://github.com/aitbc/blockchain +After=network.target aitbc-blockchain-node.service +Requires=aitbc-blockchain-node.service + +[Service] +Type=simple +User=root +Group=root +WorkingDirectory=/opt/aitbc +EnvironmentFile=/etc/aitbc/.env +Environment="AGENT_ID={instance_id}" +Environment="AGENT_PORT={port}" +Environment="PYTHONPATH=/opt/aitbc/packages/py/aitbc-agent-sdk/src:/opt/aitbc" +Environment="PATH=/opt/aitbc/venv/bin:/usr/local/bin:/usr/bin:/bin" +ExecStart=/opt/aitbc/venv/bin/python /opt/aitbc/scripts/wrappers/aitbc-agent-daemon-wrapper.py + +Restart=always +RestartSec=10 +StandardOutput=journal +StandardError=journal +SyslogIdentifier=AgentInstance-{instance_id} + +# Security settings +NoNewPrivileges=true +PrivateTmp=true +ProtectHome=true + +[Install] +WantedBy=multi-user.target +""" + + # Verify service file contains required sections + required_sections = ["[Unit]", "[Service]", "[Install]"] + for section in required_sections: + if section in service_content: + print(f"✅ Service file contains {section}") + else: + print(f"❌ Service file missing {section}") + return False + + # Verify environment variables + if f"AGENT_ID={instance_id}" in service_content: + print(f"✅ Service file contains AGENT_ID") + else: + print("❌ Service file missing AGENT_ID") + return False + + if f"AGENT_PORT={port}" in service_content: + print(f"✅ Service file contains AGENT_PORT") + else: + print("❌ Service file missing AGENT_PORT") + return False + + print("\n✅ Systemd service file generation test passed!") + return True + + +def test_health_check_response_format(): + """Test health check response format""" + print("\nTesting Health Check Response Format") + print("=" * 40) + + # Mock health check response + health_response = { + "instance_id": "test-instance", + "status": "healthy", + "timestamp": datetime.now(timezone.utc).isoformat(), + "response_time": 0.1, + "service_active": True + } + + # Verify required fields + required_fields = ["instance_id", "status", "timestamp"] + for field in required_fields: + if field in health_response: + print(f"✅ Health response contains {field}") + else: + print(f"❌ Health response missing {field}") + return False + + # Verify status is valid + valid_statuses = ["healthy", "degraded", "unhealthy"] + if health_response["status"] in valid_statuses: + print(f"✅ Health status is valid: {health_response['status']}") + else: + print(f"❌ Invalid health status: {health_response['status']}") + return False + + print("\n✅ Health check response format test passed!") + return True + + +def test_metrics_collection_format(): + """Test metrics collection format""" + print("\nTesting Metrics Collection Format") + print("=" * 40) + + # Mock metrics response + metrics_response = { + "instance_id": "test-instance", + "status": "deployed", + "health_status": "healthy", + "timestamp": datetime.now(timezone.utc).isoformat(), + "cpu_usage": 45.5, + "memory_usage": 60.2, + "request_count": 1000, + "error_count": 5, + "average_response_time": 0.15, + "uptime_percentage": 99.9 + } + + # Verify required fields + required_fields = ["instance_id", "status", "cpu_usage", "memory_usage"] + for field in required_fields: + if field in metrics_response: + print(f"✅ Metrics response contains {field}") + else: + print(f"❌ Metrics response missing {field}") + return False + + # Verify metric values are numeric + numeric_fields = ["cpu_usage", "memory_usage", "request_count", "error_count"] + for field in numeric_fields: + if isinstance(metrics_response.get(field), (int, float)): + print(f"✅ {field} is numeric") + else: + print(f"❌ {field} is not numeric") + return False + + print("\n✅ Metrics collection format test passed!") + return True + + +def test_alerting_rules_configuration(): + """Test alerting rules configuration""" + print("\nTesting Alerting Rules Configuration") + print("=" * 40) + + # Mock alerting rules + alerting_rules = { + "rules": [ + { + "name": "high_cpu_usage", + "condition": "cpu_usage > 90", + "severity": "critical" + }, + { + "name": "high_memory_usage", + "condition": "memory_usage > 95", + "severity": "critical" + } + ], + "channels": ["log", "email"], + "thresholds": { + "cpu_usage_warning": 80.0, + "cpu_usage_critical": 90.0, + "memory_usage_warning": 85.0, + "memory_usage_critical": 95.0, + "error_rate_warning": 0.05, + "error_rate_critical": 0.10 + } + } + + # Verify alerting rules structure + if "rules" in alerting_rules and len(alerting_rules["rules"]) > 0: + print(f"✅ Alerting rules contains {len(alerting_rules['rules'])} rules") + else: + print("❌ Alerting rules missing rules") + return False + + # Verify thresholds + if "thresholds" in alerting_rules: + print(f"✅ Alerting rules contains thresholds") + required_thresholds = ["cpu_usage_warning", "cpu_usage_critical", "memory_usage_warning"] + for threshold in required_thresholds: + if threshold in alerting_rules["thresholds"]: + print(f"✅ Threshold {threshold} defined") + else: + print(f"❌ Threshold {threshold} missing") + return False + else: + print("❌ Alerting rules missing thresholds") + return False + + # Verify channels + if "channels" in alerting_rules and len(alerting_rules["channels"]) > 0: + print(f"✅ Alerting channels defined: {alerting_rules['channels']}") + else: + print("❌ Alerting channels missing") + return False + + print("\n✅ Alerting rules configuration test passed!") + return True + + +async def test_deployment_rollback_logic(): + """Test deployment rollback logic""" + print("\nTesting Deployment Rollback Logic") + print("=" * 40) + + # Mock deployment config with previous version + deployment_config = { + "id": "test-deployment", + "agent_version": "v2.0.0", + "previous_version": "v1.5.0", + "rollback_enabled": True + } + + # Test rollback scenario + if deployment_config["rollback_enabled"]: + print("✅ Rollback is enabled") + + if deployment_config["previous_version"]: + print(f"✅ Previous version available: {deployment_config['previous_version']}") + + # Simulate rollback + new_version = deployment_config["previous_version"] + print(f"✅ Rolling back to version: {new_version}") + else: + print("❌ No previous version available for rollback") + return False + else: + print("❌ Rollback is not enabled") + return False + + # Test rollback without previous version + no_rollback_config = { + "id": "test-deployment-2", + "agent_version": "v2.0.0", + "previous_version": None, + "rollback_enabled": True + } + + if not no_rollback_config["previous_version"]: + print("✅ Correctly detected missing previous version") + else: + print("❌ Should detect missing previous version") + return False + + print("\n✅ Deployment rollback logic test passed!") + return True + + +async def test_instance_removal_logic(): + """Test instance removal logic""" + print("\nTesting Instance Removal Logic") + print("=" * 40) + + instance_id = "test-instance-1" + service_name = f"aitbc-agent-{instance_id}" + service_file = f"/etc/systemd/system/{service_name}.service" + + # Mock removal steps + removal_steps = [ + f"systemctl stop {service_name}", + f"systemctl disable {service_name}", + f"rm {service_file}", + "systemctl daemon-reload" + ] + + print(f"Instance ID: {instance_id}") + print(f"Service name: {service_name}") + print(f"Service file: {service_file}") + print() + print("Removal steps:") + for step in removal_steps: + print(f" - {step}") + + # Verify all steps are present + if len(removal_steps) == 4: + print("✅ All 4 removal steps defined") + else: + print(f"❌ Expected 4 steps, got {len(removal_steps)}") + return False + + print("\n✅ Instance removal logic test passed!") + return True + + +async def run_tests(): + """Run all agent integration service tests""" + print("Agent Integration Service Tests") + print("=" * 40) + print() + + results = [] + results.append(("Systemd Service File Generation", test_systemd_service_file_generation())) + results.append(("Health Check Response Format", test_health_check_response_format())) + results.append(("Metrics Collection Format", test_metrics_collection_format())) + results.append(("Alerting Rules Configuration", test_alerting_rules_configuration())) + results.append(("Deployment Rollback Logic", await test_deployment_rollback_logic())) + results.append(("Instance Removal Logic", await test_instance_removal_logic())) + + print("\n" + "=" * 40) + print("Test Summary") + print("=" * 40) + + for name, passed in results: + status = "✅ PASSED" if passed else "❌ FAILED" + print(f"{name}: {status}") + + all_passed = all(result[1] for result in results) + if all_passed: + print("\n🎉 All tests passed!") + else: + print("\n❌ Some tests failed") + + +if __name__ == "__main__": + asyncio.run(run_tests()) diff --git a/tests/verification/test_agent_signature_verification.py b/tests/verification/test_agent_signature_verification.py new file mode 100644 index 00000000..43aa84fb --- /dev/null +++ b/tests/verification/test_agent_signature_verification.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +""" +Test agent SDK signature verification +Tests the signature verification implementation using coordinator API +""" + +import asyncio +import json +from unittest.mock import AsyncMock, patch, MagicMock +from cryptography.hazmat.primitives.asymmetric import ed25519 +from cryptography.hazmat.primitives import serialization + + +def test_signature_generation_and_verification(): + """Test ed25519 signature generation and verification""" + print("Testing Signature Generation and Verification") + print("=" * 40) + + # Generate keypair + private_key = ed25519.Ed25519PrivateKey.generate() + public_key = private_key.public_key() + + # Create test message + message = {"type": "test", "data": "hello"} + message_bytes = json.dumps(message, sort_keys=True).encode('utf-8') + + # Sign message + signature = private_key.sign(message_bytes) + print(f"Signature length: {len(signature)}") + + # Verify signature + try: + public_key.verify(signature, message_bytes) + print("✅ Signature verified successfully") + except Exception as e: + print(f"❌ Signature verification failed: {e}") + return False + + print("\n✅ Signature generation and verification test passed!") + return True + + +def test_signature_verification_with_wrong_key(): + """Test signature verification fails with wrong public key""" + print("\nTesting Signature Verification with Wrong Key") + print("=" * 40) + + # Generate two different keypairs + private_key1 = ed25519.Ed25519PrivateKey.generate() + public_key1 = private_key1.public_key() + + private_key2 = ed25519.Ed25519PrivateKey.generate() + public_key2 = private_key2.public_key() + + # Sign with key1 + message = {"type": "test", "data": "hello"} + message_bytes = json.dumps(message, sort_keys=True).encode('utf-8') + signature = private_key1.sign(message_bytes) + + # Try to verify with key2 + try: + public_key2.verify(signature, message_bytes) + print("❌ Signature verified with wrong key (should fail)") + return False + except Exception: + print("✅ Signature verification correctly failed with wrong key") + + print("\n✅ Wrong key verification test passed!") + return True + + +def test_signature_verification_with_tampered_message(): + """Test signature verification fails with tampered message""" + print("\nTesting Signature Verification with Tampered Message") + print("=" * 40) + + # Generate keypair + private_key = ed25519.Ed25519PrivateKey.generate() + public_key = private_key.public_key() + + # Sign original message + original_message = {"type": "test", "data": "hello"} + original_bytes = json.dumps(original_message, sort_keys=True).encode('utf-8') + signature = private_key.sign(original_bytes) + + # Tamper with message + tampered_message = {"type": "test", "data": "goodbye"} + tampered_bytes = json.dumps(tampered_message, sort_keys=True).encode('utf-8') + + # Try to verify tampered message + try: + public_key.verify(signature, tampered_bytes) + print("❌ Signature verified with tampered message (should fail)") + return False + except Exception: + print("✅ Signature verification correctly failed with tampered message") + + print("\n✅ Tampered message verification test passed!") + return True + + +async def test_fetch_public_key_from_coordinator(): + """Test fetching public key from coordinator API""" + print("\nTesting Fetch Public Key from Coordinator API") + print("=" * 40) + + # Mock coordinator API response + mock_response = { + "agent_id": "test_agent", + "public_key": "test_public_key_hex" + } + + # Test the fetch function (mock implementation) + async def mock_fetch_public_key(sender_id: str, coordinator_url: str): + """Mock implementation of public key fetch""" + # Simulate API call + return mock_response.get("public_key") + + # Test successful fetch + public_key = await mock_fetch_public_key("test_agent", "http://localhost:8011") + if public_key: + print(f"✅ Public key fetched: {public_key}") + else: + print("❌ Failed to fetch public key") + return False + + # Test failed fetch (non-existent agent) + async def mock_fetch_public_key_not_found(sender_id: str, coordinator_url: str): + return None + + public_key = await mock_fetch_public_key_not_found("nonexistent", "http://localhost:8011") + if public_key is None: + print("✅ Correctly returned None for non-existent agent") + else: + print("❌ Should return None for non-existent agent") + return False + + print("\n✅ Fetch public key test passed!") + return True + + +async def test_receive_message_with_signature(): + """Test receive_message with signature verification""" + print("\nTesting Receive Message with Signature Verification") + print("=" * 40) + + # Generate keypair + private_key = ed25519.Ed25519PrivateKey.generate() + public_key = private_key.public_key() + public_key_hex = public_key.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw + ).hex() + + # Create and sign message + message = { + "from": "sender_agent", + "type": "test", + "data": "hello" + } + message_copy = message.copy() + message_bytes = json.dumps(message_copy, sort_keys=True).encode('utf-8') + signature = private_key.sign(message_bytes) + + # Add signature to message + message_with_sig = message.copy() + message_with_sig["signature"] = signature + + print(f"Message signed with signature length: {len(signature)}") + print(f"Public key hex: {public_key_hex[:20]}...") + + # Mock coordinator API to return public key + async def mock_fetch_public_key(sender_id: str, coordinator_url: str): + if sender_id == "sender_agent": + return public_key_hex + return None + + # Test verification + sender_id = message_with_sig.get("from") + signature_bytes = message_with_sig.get("signature") + + if not signature_bytes: + print("❌ Message missing signature") + return False + + # Fetch public key + public_key_hex = await mock_fetch_public_key(sender_id, "http://localhost:8011") + if not public_key_hex: + print("❌ Failed to fetch public key") + return False + + # Verify signature + try: + public_key_bytes = bytes.fromhex(public_key_hex) + public_key = ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes) + + message_to_verify = message_with_sig.copy() + message_to_verify.pop("signature", None) + message_bytes = json.dumps(message_to_verify, sort_keys=True).encode('utf-8') + + public_key.verify(signature_bytes, message_bytes) + print("✅ Signature verified successfully") + except Exception as e: + print(f"❌ Signature verification failed: {e}") + return False + + print("\n✅ Receive message with signature test passed!") + return True + + +async def run_async_tests(): + """Run async tests""" + print("Agent SDK Signature Verification Tests") + print("=" * 40) + print() + + results = [] + results.append(("Signature Generation and Verification", test_signature_generation_and_verification())) + results.append(("Signature Verification with Wrong Key", test_signature_verification_with_wrong_key())) + results.append(("Signature Verification with Tampered Message", test_signature_verification_with_tampered_message())) + results.append(("Fetch Public Key from Coordinator", await test_fetch_public_key_from_coordinator())) + results.append(("Receive Message with Signature", await test_receive_message_with_signature())) + + print("\n" + "=" * 40) + print("Test Summary") + print("=" * 40) + + for name, passed in results: + status = "✅ PASSED" if passed else "❌ FAILED" + print(f"{name}: {status}") + + all_passed = all(result[1] for result in results) + if all_passed: + print("\n🎉 All tests passed!") + else: + print("\n❌ Some tests failed") + + +if __name__ == "__main__": + asyncio.run(run_async_tests()) diff --git a/tests/verification/test_keystore_mac.py b/tests/verification/test_keystore_mac.py new file mode 100644 index 00000000..0e8ce7b7 --- /dev/null +++ b/tests/verification/test_keystore_mac.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +""" +Test MAC computation in keystore scripts +Tests HMAC-SHA256 MAC computation for web3 keystore format +""" + +import hashlib +import hmac +import json +import os +import tempfile +from pathlib import Path + +from cryptography.hazmat.primitives.asymmetric import ed25519 +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from cryptography.hazmat.backends import default_backend + + +def compute_mac(key: bytes, ciphertext: bytes) -> str: + """Compute MAC for web3 keystore format (HMAC-SHA256)""" + mac_data = key[16:32] + ciphertext + mac = hmac.new(key[:16], mac_data, hashlib.sha256).hexdigest() + return mac + + +def test_mac_computation(): + """Test MAC computation matches web3 keystore standard""" + print("Testing MAC Computation") + print("=" * 40) + + # Generate test key and ciphertext + password = "test_password_123" + salt = os.urandom(32) + + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100_000, + backend=default_backend() + ) + key = kdf.derive(password.encode('utf-8')) + + # Generate test ciphertext + private_key = ed25519.Ed25519PrivateKey.generate() + private_bytes = private_key.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption() + ) + + aesgcm = AESGCM(key) + nonce = os.urandom(12) + ciphertext = aesgcm.encrypt(nonce, private_bytes, None) + + # Compute MAC + mac = compute_mac(key, ciphertext) + + print(f"MAC computed: {mac}") + print(f"MAC length: {len(mac)}") + + # Verify MAC is a valid hex string + try: + int(mac, 16) + print("✅ MAC is valid hex string") + except ValueError: + print("❌ MAC is not valid hex string") + return False + + # Verify MAC length (64 hex chars = 32 bytes) + if len(mac) == 64: + print("✅ MAC has correct length (64 hex chars)") + else: + print(f"❌ MAC has incorrect length: {len(mac)} (expected 64)") + return False + + print("\n✅ MAC computation test passed!") + return True + + +def test_keystore_with_mac(): + """Test full keystore generation with MAC""" + print("\nTesting Keystore Generation with MAC") + print("=" * 40) + + # Create temporary keystore directory + with tempfile.TemporaryDirectory() as temp_dir: + keystore_dir = Path(temp_dir) + + # Generate keystore + password = "test_password_123" + name = "test_wallet" + + salt = os.urandom(32) + private_key = ed25519.Ed25519PrivateKey.generate() + private_bytes = private_key.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption() + ) + + # Derive key + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100_000, + backend=default_backend() + ) + key = kdf.derive(password.encode('utf-8')) + + # Encrypt + aesgcm = AESGCM(key) + nonce = os.urandom(12) + ciphertext = aesgcm.encrypt(nonce, private_bytes, None) + + # Compute MAC + mac = compute_mac(key, ciphertext) + + # Build keystore + keystore = { + "crypto": { + "cipher": "aes-256-gcm", + "cipherparams": {"nonce": nonce.hex()}, + "ciphertext": ciphertext.hex(), + "kdf": "pbkdf2", + "kdfparams": { + "dklen": 32, + "salt": salt.hex(), + "c": 100_000, + "prf": "hmac-sha256" + }, + "mac": mac + }, + "address": "test_address", + "keytype": "ed25519", + "version": 1 + } + + # Write keystore + keystore_file = keystore_dir / f"{name}.json" + with open(keystore_file, 'w') as f: + json.dump(keystore, f, indent=2) + + print(f"Keystore written to: {keystore_file}") + + # Read back and verify MAC + with open(keystore_file) as f: + loaded = json.load(f) + + loaded_mac = loaded["crypto"]["mac"] + if loaded_mac == mac: + print("✅ MAC matches in loaded keystore") + else: + print(f"❌ MAC mismatch: {loaded_mac} != {mac}") + return False + + print("\n✅ Keystore with MAC test passed!") + return True + + +def test_mac_validation(): + """Test MAC validation for password errors""" + print("\nTesting MAC Validation") + print("=" * 40) + + password = "correct_password" + wrong_password = "wrong_password" + + salt = os.urandom(32) + + # Derive key with correct password + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100_000, + backend=default_backend() + ) + correct_key = kdf.derive(password.encode('utf-8')) + + # Derive key with wrong password + kdf_wrong = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100_000, + backend=default_backend() + ) + wrong_key = kdf_wrong.derive(wrong_password.encode('utf-8')) + + # Generate test ciphertext + private_key = ed25519.Ed25519PrivateKey.generate() + private_bytes = private_key.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption() + ) + + aesgcm = AESGCM(correct_key) + nonce = os.urandom(12) + ciphertext = aesgcm.encrypt(nonce, private_bytes, None) + + # Compute MAC with correct key + correct_mac = compute_mac(correct_key, ciphertext) + + # Try to compute MAC with wrong key + wrong_mac = compute_mac(wrong_key, ciphertext) + + print(f"Correct MAC: {correct_mac}") + print(f"Wrong MAC: {wrong_mac}") + + if correct_mac != wrong_mac: + print("✅ MAC validation detects password errors") + else: + print("❌ MAC validation failed to detect password errors") + return False + + print("\n✅ MAC validation test passed!") + return True + + +if __name__ == "__main__": + print("Keystore MAC Computation Tests") + print("=" * 40) + print() + + results = [] + results.append(("MAC Computation", test_mac_computation())) + results.append(("Keystore with MAC", test_keystore_with_mac())) + results.append(("MAC Validation", test_mac_validation())) + + print("\n" + "=" * 40) + print("Test Summary") + print("=" * 40) + + for name, passed in results: + status = "✅ PASSED" if passed else "❌ FAILED" + print(f"{name}: {status}") + + all_passed = all(result[1] for result in results) + if all_passed: + print("\n🎉 All tests passed!") + else: + print("\n❌ Some tests failed")