""" Security Tests for AITBC Private Chain Access Control and Encryption Tests security features, access controls, and encryption mechanisms """ import pytest import json import hashlib import hmac import secrets import time from datetime import datetime, timedelta from pathlib import Path import subprocess import requests from concurrent.futures import ThreadPoolExecutor from typing import Dict, Any, List, Optional import tempfile import os class TestSecurity: """Security testing suite for AITBC components""" @pytest.fixture(scope="class") def security_config(self): """Security test configuration""" return { "test_data_dir": Path("/tmp/aitbc_security_test"), "encryption_key": secrets.token_hex(32), "test_password": "TestSecurePassword123!", "test_wallet_id": "test_security_wallet", "test_chain_id": "ait-security-test", "security_thresholds": { "password_min_length": 8, "encryption_strength": 256, "session_timeout_minutes": 30, "max_login_attempts": 5, "lockout_duration_minutes": 15 } } def test_password_security(self, security_config): """Test password security requirements""" # Test password validation weak_passwords = [ "123", "password", "abc", "test", "short", "", "12345678", "password123" ] strong_passwords = [ "SecureP@ssw0rd123!", "MyStr0ng#P@ssword", "AitbcSecur3ty@2026", "ComplexP@ssw0rd!#$", "VerySecureP@ssw0rd123" ] # Test weak passwords should be rejected for password in weak_passwords: is_valid = validate_password_strength(password) assert not is_valid, f"Weak password should be rejected: {password}" # Test strong passwords should be accepted for password in strong_passwords: is_valid = validate_password_strength(password) assert is_valid, f"Strong password should be accepted: {password}" print("✅ Password security validation working correctly") def test_encryption_decryption(self, security_config): """Test encryption and decryption mechanisms""" test_data = "Sensitive AITBC blockchain data" encryption_key = security_config["encryption_key"] # Test encryption encrypted_data = encrypt_data(test_data, encryption_key) assert encrypted_data != test_data, "Encrypted data should be different from original" assert len(encrypted_data) > 0, "Encrypted data should not be empty" # Test decryption decrypted_data = decrypt_data(encrypted_data, encryption_key) assert decrypted_data == test_data, "Decrypted data should match original" # Test with wrong key wrong_key = secrets.token_hex(32) decrypted_with_wrong_key = decrypt_data(encrypted_data, wrong_key) assert decrypted_with_wrong_key != test_data, "Decryption with wrong key should fail" print("✅ Encryption/decryption working correctly") def test_hashing_security(self, security_config): """Test cryptographic hashing""" test_data = "AITBC blockchain transaction data" # Test SHA-256 hashing hash1 = hashlib.sha256(test_data.encode()).hexdigest() hash2 = hashlib.sha256(test_data.encode()).hexdigest() assert hash1 == hash2, "Same data should produce same hash" assert len(hash1) == 64, "SHA-256 hash should be 64 characters" assert all(c in '0123456789abcdef' for c in hash1), "Hash should only contain hex characters" # Test different data produces different hash different_data = "Different blockchain data" hash3 = hashlib.sha256(different_data.encode()).hexdigest() assert hash1 != hash3, "Different data should produce different hash" # Test HMAC for message authentication secret_key = security_config["encryption_key"] hmac1 = hmac.new(secret_key.encode(), test_data.encode(), hashlib.sha256).hexdigest() hmac2 = hmac.new(secret_key.encode(), test_data.encode(), hashlib.sha256).hexdigest() assert hmac1 == hmac2, "HMAC should be consistent" # Test HMAC with different key different_key = "different_secret_key" hmac3 = hmac.new(different_key.encode(), test_data.encode(), hashlib.sha256).hexdigest() assert hmac1 != hmac3, "HMAC with different key should be different" print("✅ Cryptographic hashing working correctly") def test_wallet_security(self, security_config): """Test wallet security features""" security_config["test_data_dir"].mkdir(parents=True, exist_ok=True) # Test wallet file permissions wallet_file = security_config["test_data_dir"] / "test_wallet.json" # Create test wallet wallet_data = { "wallet_id": security_config["test_wallet_id"], "private_key": secrets.token_hex(32), "public_key": secrets.token_hex(64), "address": f"ait1{secrets.token_hex(40)}", "created_at": datetime.utcnow().isoformat() } with open(wallet_file, 'w') as f: json.dump(wallet_data, f) # Set restrictive permissions (600 - read/write for owner only) os.chmod(wallet_file, 0o600) # Verify permissions file_stat = wallet_file.stat() file_permissions = oct(file_stat.st_mode)[-3:] assert file_permissions == "600", f"Wallet file should have 600 permissions, got {file_permissions}" # Test wallet encryption encrypted_wallet = encrypt_wallet_data(wallet_data, security_config["test_password"]) assert encrypted_wallet != wallet_data, "Encrypted wallet should be different" # Test wallet decryption decrypted_wallet = decrypt_wallet_data(encrypted_wallet, security_config["test_password"]) assert decrypted_wallet["wallet_id"] == wallet_data["wallet_id"], "Decrypted wallet should match original" # Test decryption with wrong password try: decrypt_wallet_data(encrypted_wallet, "wrong_password") assert False, "Decryption with wrong password should fail" except: pass # Expected to fail # Cleanup wallet_file.unlink() print("✅ Wallet security features working correctly") def test_chain_access_control(self, security_config): """Test chain access control mechanisms""" # Test chain access permissions chain_permissions = { "admin": ["read", "write", "delete", "manage"], "operator": ["read", "write"], "viewer": ["read"], "anonymous": [] } # Test permission validation def has_permission(user_role, required_permission): return required_permission in chain_permissions.get(user_role, []) # Test admin permissions assert has_permission("admin", "read"), "Admin should have read permission" assert has_permission("admin", "write"), "Admin should have write permission" assert has_permission("admin", "delete"), "Admin should have delete permission" assert has_permission("admin", "manage"), "Admin should have manage permission" # Test operator permissions assert has_permission("operator", "read"), "Operator should have read permission" assert has_permission("operator", "write"), "Operator should have write permission" assert not has_permission("operator", "delete"), "Operator should not have delete permission" assert not has_permission("operator", "manage"), "Operator should not have manage permission" # Test viewer permissions assert has_permission("viewer", "read"), "Viewer should have read permission" assert not has_permission("viewer", "write"), "Viewer should not have write permission" assert not has_permission("viewer", "delete"), "Viewer should not have delete permission" # Test anonymous permissions assert not has_permission("anonymous", "read"), "Anonymous should not have read permission" assert not has_permission("anonymous", "write"), "Anonymous should not have write permission" # Test invalid role assert not has_permission("invalid_role", "read"), "Invalid role should have no permissions" print("✅ Chain access control working correctly") def test_transaction_security(self, security_config): """Test transaction security features""" # Test transaction signing transaction_data = { "from": f"ait1{secrets.token_hex(40)}", "to": f"ait1{secrets.token_hex(40)}", "amount": "1000", "nonce": secrets.token_hex(16), "timestamp": int(time.time()) } private_key = secrets.token_hex(32) # Sign transaction signature = sign_transaction(transaction_data, private_key) assert signature != transaction_data, "Signature should be different from transaction data" assert len(signature) > 0, "Signature should not be empty" # Verify signature is_valid = verify_transaction_signature(transaction_data, signature, private_key) assert is_valid, "Signature verification should pass" # Test with tampered data tampered_data = transaction_data.copy() tampered_data["amount"] = "2000" is_valid_tampered = verify_transaction_signature(tampered_data, signature, private_key) assert not is_valid_tampered, "Signature verification should fail for tampered data" # Test with wrong key wrong_key = secrets.token_hex(32) is_valid_wrong_key = verify_transaction_signature(transaction_data, signature, wrong_key) assert not is_valid_wrong_key, "Signature verification should fail with wrong key" print("✅ Transaction security working correctly") def test_session_security(self, security_config): """Test session management security""" # Test session token generation user_id = "test_user_123" session_token = generate_session_token(user_id) assert len(session_token) > 20, "Session token should be sufficiently long" assert session_token != user_id, "Session token should be different from user ID" # Test session validation is_valid = validate_session_token(session_token, user_id) assert is_valid, "Valid session token should pass validation" # Test session with wrong user is_valid_wrong_user = validate_session_token(session_token, "wrong_user") assert not is_valid_wrong_user, "Session token should fail for wrong user" # Test expired session expired_token = generate_expired_session_token(user_id) is_valid_expired = validate_session_token(expired_token, user_id) assert not is_valid_expired, "Expired session token should fail validation" # Test session timeout session_timeout = security_config["security_thresholds"]["session_timeout_minutes"] assert session_timeout == 30, "Session timeout should be 30 minutes" print("✅ Session security working correctly") def test_api_security(self, security_config): """Test API security features""" # Test API key generation api_key = generate_api_key() assert len(api_key) >= 32, "API key should be at least 32 characters" assert api_key.isalnum(), "API key should be alphanumeric" # Test API key validation is_valid = validate_api_key(api_key) assert is_valid, "Valid API key should pass validation" # Test invalid API key invalid_keys = [ "short", "invalid@key", "key with spaces", "key-with-special-chars!", "" ] for invalid_key in invalid_keys: is_invalid = validate_api_key(invalid_key) assert not is_invalid, f"Invalid API key should fail validation: {invalid_key}" # Test rate limiting (simulation) rate_limiter = RateLimiter(max_requests=5, window_seconds=60) # Should allow requests within limit for i in range(5): assert rate_limiter.is_allowed(), f"Request {i+1} should be allowed" # Should block request beyond limit assert not rate_limiter.is_allowed(), "Request beyond limit should be blocked" print("✅ API security working correctly") def test_data_protection(self, security_config): """Test data protection and privacy""" sensitive_data = { "user_id": "user_123", "private_key": secrets.token_hex(32), "email": "user@example.com", "phone": "+1234567890", "address": "123 Blockchain Street" } # Test data masking masked_data = mask_sensitive_data(sensitive_data) assert "private_key" not in masked_data, "Private key should be masked" assert "email" in masked_data, "Email should remain unmasked" assert masked_data["email"] != sensitive_data["email"], "Email should be partially masked" # Test data anonymization anonymized_data = anonymize_data(sensitive_data) assert "user_id" not in anonymized_data, "User ID should be anonymized" assert "private_key" not in anonymized_data, "Private key should be anonymized" assert "email" not in anonymized_data, "Email should be anonymized" # Test data retention retention_days = 365 cutoff_date = datetime.utcnow() - timedelta(days=retention_days) old_data = { "data": "sensitive_info", "created_at": (cutoff_date - timedelta(days=1)).isoformat() } should_delete = should_delete_data(old_data, retention_days) assert should_delete, "Data older than retention period should be deleted" recent_data = { "data": "sensitive_info", "created_at": datetime.utcnow().isoformat() } should_not_delete = should_delete_data(recent_data, retention_days) assert not should_not_delete, "Recent data should not be deleted" print("✅ Data protection working correctly") def test_audit_logging(self, security_config): """Test security audit logging""" audit_log = [] # Test audit log entry creation log_entry = create_audit_log( action="wallet_create", user_id="test_user", resource_id="wallet_123", details={"wallet_type": "multi_signature"}, ip_address="192.168.1.1" ) assert "action" in log_entry, "Audit log should contain action" assert "user_id" in log_entry, "Audit log should contain user ID" assert "timestamp" in log_entry, "Audit log should contain timestamp" assert "ip_address" in log_entry, "Audit log should contain IP address" audit_log.append(log_entry) # Test audit log integrity log_hash = calculate_audit_log_hash(audit_log) assert len(log_hash) == 64, "Audit log hash should be 64 characters" # Test audit log tampering detection tampered_log = audit_log.copy() tampered_log[0]["action"] = "different_action" tampered_hash = calculate_audit_log_hash(tampered_log) assert log_hash != tampered_hash, "Tampered log should have different hash" print("✅ Audit logging working correctly") class TestAuthenticationSecurity: """Test authentication and authorization security""" def test_multi_factor_authentication(self): """Test multi-factor authentication""" user_credentials = { "username": "test_user", "password": "SecureP@ssw0rd123!" } # Test password authentication password_valid = authenticate_password(user_credentials["username"], user_credentials["password"]) assert password_valid, "Valid password should authenticate" # Test invalid password invalid_password_valid = authenticate_password(user_credentials["username"], "wrong_password") assert not invalid_password_valid, "Invalid password should not authenticate" # Test 2FA token generation totp_secret = generate_totp_secret() totp_code = generate_totp_code(totp_secret) assert len(totp_code) == 6, "TOTP code should be 6 digits" assert totp_code.isdigit(), "TOTP code should be numeric" # Test 2FA validation totp_valid = validate_totp_code(totp_secret, totp_code) assert totp_valid, "Valid TOTP code should pass" # Test invalid TOTP code invalid_totp_valid = validate_totp_code(totp_secret, "123456") assert not invalid_totp_valid, "Invalid TOTP code should fail" print("✅ Multi-factor authentication working correctly") def test_login_attempt_limiting(self): """Test login attempt limiting""" user_id = "test_user" max_attempts = 5 lockout_duration = 15 # minutes login_attempts = LoginAttemptLimiter(max_attempts, lockout_duration) # Test successful attempts within limit for i in range(max_attempts): assert not login_attempts.is_locked_out(user_id), f"User should not be locked out after {i+1} attempts" # Test lockout after max attempts login_attempts.record_failed_attempt(user_id) assert login_attempts.is_locked_out(user_id), "User should be locked out after max attempts" # Test lockout duration lockout_remaining = login_attempts.get_lockout_remaining(user_id) assert lockout_remaining > 0, "Lockout should have remaining time" assert lockout_remaining <= lockout_duration * 60, "Lockout should not exceed max duration" print("✅ Login attempt limiting working correctly") # Security utility functions def validate_password_strength(password: str) -> bool: """Validate password strength""" if len(password) < 8: return False has_upper = any(c.isupper() for c in password) has_lower = any(c.islower() for c in password) has_digit = any(c.isdigit() for c in password) has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password) return has_upper and has_lower and has_digit and has_special def encrypt_data(data: str, key: str) -> str: """Simple encryption simulation (in production, use proper encryption)""" import base64 # Simulate encryption with XOR and base64 encoding key_bytes = key.encode() data_bytes = data.encode() encrypted = bytes([b ^ key_bytes[i % len(key_bytes)] for i, b in enumerate(data_bytes)]) return base64.b64encode(encrypted).decode() def decrypt_data(encrypted_data: str, key: str) -> str: """Simple decryption simulation (in production, use proper decryption)""" import base64 try: key_bytes = key.encode() encrypted_bytes = base64.b64decode(encrypted_data.encode()) decrypted = bytes([b ^ key_bytes[i % len(key_bytes)] for i, b in enumerate(encrypted_bytes)]) return decrypted.decode() except: return "" def encrypt_wallet_data(wallet_data: Dict[str, Any], password: str) -> str: """Encrypt wallet data with password""" wallet_json = json.dumps(wallet_data) return encrypt_data(wallet_json, password) def decrypt_wallet_data(encrypted_wallet: str, password: str) -> Dict[str, Any]: """Decrypt wallet data with password""" decrypted_json = decrypt_data(encrypted_wallet, password) return json.loads(decrypted_json) def sign_transaction(transaction: Dict[str, Any], private_key: str) -> str: """Sign transaction with private key""" transaction_json = json.dumps(transaction, sort_keys=True) return hashlib.sha256((transaction_json + private_key).encode()).hexdigest() def verify_transaction_signature(transaction: Dict[str, Any], signature: str, public_key: str) -> bool: """Verify transaction signature""" expected_signature = sign_transaction(transaction, public_key) return hmac.compare_digest(signature, expected_signature) def generate_session_token(user_id: str) -> str: """Generate session token""" timestamp = str(int(time.time())) random_data = secrets.token_hex(16) return hashlib.sha256(f"{user_id}:{timestamp}:{random_data}".encode()).hexdigest() def generate_expired_session_token(user_id: str) -> str: """Generate expired session token for testing""" old_timestamp = str(int(time.time()) - 3600) # 1 hour ago random_data = secrets.token_hex(16) return hashlib.sha256(f"{user_id}:{old_timestamp}:{random_data}".encode()).hexdigest() def validate_session_token(token: str, user_id: str) -> bool: """Validate session token""" # In production, this would validate timestamp and signature return len(token) == 64 and token.startswith(user_id[:8]) def generate_api_key() -> str: """Generate API key""" return secrets.token_hex(32) def validate_api_key(api_key: str) -> bool: """Validate API key format""" return len(api_key) >= 32 and api_key.isalnum() class RateLimiter: """Simple rate limiter""" def __init__(self, max_requests: int, window_seconds: int): self.max_requests = max_requests self.window_seconds = window_seconds self.requests = {} def is_allowed(self) -> bool: current_time = time.time() window_start = current_time - self.window_seconds # Clean old requests self.requests = {k: v for k, v in self.requests.items() if v > window_start} if len(self.requests) >= self.max_requests: return False self.requests[current_time] = current_time return True def mask_sensitive_data(data: Dict[str, Any]) -> Dict[str, Any]: """Mask sensitive data""" masked = data.copy() if "private_key" in masked: masked["private_key"] = "***MASKED***" if "email" in masked: email = masked["email"] if "@" in email: local, domain = email.split("@", 1) masked["email"] = f"{local[:2]}***@{domain}" return masked def anonymize_data(data: Dict[str, Any]) -> Dict[str, Any]: """Anonymize sensitive data""" anonymized = {} for key, value in data.items(): if key in ["user_id", "email", "phone", "address"]: anonymized[key] = "***ANONYMIZED***" else: anonymized[key] = value return anonymized def should_delete_data(data: Dict[str, Any], retention_days: int) -> bool: """Check if data should be deleted based on retention policy""" if "created_at" not in data: return False created_at = datetime.fromisoformat(data["created_at"]) cutoff_date = datetime.utcnow() - timedelta(days=retention_days) return created_at < cutoff_date def create_audit_log(action: str, user_id: str, resource_id: str, details: Dict[str, Any], ip_address: str) -> Dict[str, Any]: """Create audit log entry""" return { "action": action, "user_id": user_id, "resource_id": resource_id, "details": details, "ip_address": ip_address, "timestamp": datetime.utcnow().isoformat(), "log_id": secrets.token_hex(16) } def calculate_audit_log_hash(audit_log: List[Dict[str, Any]]) -> str: """Calculate hash of audit log for integrity verification""" log_json = json.dumps(audit_log, sort_keys=True) return hashlib.sha256(log_json.encode()).hexdigest() def authenticate_password(username: str, password: str) -> bool: """Simulate password authentication""" # In production, this would check against hashed passwords return username == "test_user" and password == "SecureP@ssw0rd123!" def generate_totp_secret() -> str: """Generate TOTP secret""" return secrets.token_hex(20) def generate_totp_code(secret: str) -> str: """Generate TOTP code (simplified)""" import hashlib import time timestep = int(time.time() // 30) counter = f"{secret}{timestep}" return hashlib.sha256(counter.encode()).hexdigest()[:6] def validate_totp_code(secret: str, code: str) -> bool: """Validate TOTP code""" expected_code = generate_totp_code(secret) return hmac.compare_digest(code, expected_code) class LoginAttemptLimiter: """Login attempt limiter""" def __init__(self, max_attempts: int, lockout_duration_minutes: int): self.max_attempts = max_attempts self.lockout_duration_minutes = lockout_duration_minutes self.attempts = {} def record_failed_attempt(self, user_id: str): """Record failed login attempt""" current_time = time.time() if user_id not in self.attempts: self.attempts[user_id] = [] self.attempts[user_id].append(current_time) def is_locked_out(self, user_id: str) -> bool: """Check if user is locked out""" if user_id not in self.attempts: return False # Remove attempts older than lockout period lockout_time = self.lockout_duration_minutes * 60 current_time = time.time() cutoff_time = current_time - lockout_time self.attempts[user_id] = [ attempt for attempt in self.attempts[user_id] if attempt > cutoff_time ] return len(self.attempts[user_id]) >= self.max_attempts def get_lockout_remaining(self, user_id: str) -> int: """Get remaining lockout time in seconds""" if not self.is_locked_out(user_id): return 0 oldest_attempt = min(self.attempts[user_id]) lockout_end = oldest_attempt + (self.lockout_duration_minutes * 60) remaining = max(0, int(lockout_end - time.time())) return remaining if __name__ == "__main__": # Run security tests pytest.main([__file__, "-v", "--tb=short"])