Files
aitbc/tests/security/test_security.py
AITBC System b033923756 chore: normalize file permissions across repository
- Remove executable permissions from configuration files (.editorconfig, .env.example, .gitignore)
- Remove executable permissions from documentation files (README.md, LICENSE, SECURITY.md)
- Remove executable permissions from web assets (HTML, CSS, JS files)
- Remove executable permissions from data files (JSON, SQL, YAML, requirements.txt)
- Remove executable permissions from source code files across all apps
- Add executable permissions to Python
2026-03-08 11:26:18 +01:00

682 lines
27 KiB
Python
Executable File

"""
Security Tests for AITBC Private Chain Access Control and Encryption
Tests security features, access controls, and encryption mechanisms
"""
import pytest
import json
import hashlib
import hmac
import secrets
import time
from datetime import datetime, timedelta
from pathlib import Path
import subprocess
import requests
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Any, List, Optional
import tempfile
import os
class TestSecurity:
"""Security testing suite for AITBC components"""
@pytest.fixture(scope="class")
def security_config(self):
"""Security test configuration"""
return {
"test_data_dir": Path("/tmp/aitbc_security_test"),
"encryption_key": secrets.token_hex(32),
"test_password": "TestSecurePassword123!",
"test_wallet_id": "test_security_wallet",
"test_chain_id": "ait-security-test",
"security_thresholds": {
"password_min_length": 8,
"encryption_strength": 256,
"session_timeout_minutes": 30,
"max_login_attempts": 5,
"lockout_duration_minutes": 15
}
}
def test_password_security(self, security_config):
"""Test password security requirements"""
# Test password validation
weak_passwords = [
"123",
"password",
"abc",
"test",
"short",
"",
"12345678",
"password123"
]
strong_passwords = [
"SecureP@ssw0rd123!",
"MyStr0ng#P@ssword",
"AitbcSecur3ty@2026",
"ComplexP@ssw0rd!#$",
"VerySecureP@ssw0rd123"
]
# Test weak passwords should be rejected
for password in weak_passwords:
is_valid = validate_password_strength(password)
assert not is_valid, f"Weak password should be rejected: {password}"
# Test strong passwords should be accepted
for password in strong_passwords:
is_valid = validate_password_strength(password)
assert is_valid, f"Strong password should be accepted: {password}"
print("✅ Password security validation working correctly")
def test_encryption_decryption(self, security_config):
"""Test encryption and decryption mechanisms"""
test_data = "Sensitive AITBC blockchain data"
encryption_key = security_config["encryption_key"]
# Test encryption
encrypted_data = encrypt_data(test_data, encryption_key)
assert encrypted_data != test_data, "Encrypted data should be different from original"
assert len(encrypted_data) > 0, "Encrypted data should not be empty"
# Test decryption
decrypted_data = decrypt_data(encrypted_data, encryption_key)
assert decrypted_data == test_data, "Decrypted data should match original"
# Test with wrong key
wrong_key = secrets.token_hex(32)
decrypted_with_wrong_key = decrypt_data(encrypted_data, wrong_key)
assert decrypted_with_wrong_key != test_data, "Decryption with wrong key should fail"
print("✅ Encryption/decryption working correctly")
def test_hashing_security(self, security_config):
"""Test cryptographic hashing"""
test_data = "AITBC blockchain transaction data"
# Test SHA-256 hashing
hash1 = hashlib.sha256(test_data.encode()).hexdigest()
hash2 = hashlib.sha256(test_data.encode()).hexdigest()
assert hash1 == hash2, "Same data should produce same hash"
assert len(hash1) == 64, "SHA-256 hash should be 64 characters"
assert all(c in '0123456789abcdef' for c in hash1), "Hash should only contain hex characters"
# Test different data produces different hash
different_data = "Different blockchain data"
hash3 = hashlib.sha256(different_data.encode()).hexdigest()
assert hash1 != hash3, "Different data should produce different hash"
# Test HMAC for message authentication
secret_key = security_config["encryption_key"]
hmac1 = hmac.new(secret_key.encode(), test_data.encode(), hashlib.sha256).hexdigest()
hmac2 = hmac.new(secret_key.encode(), test_data.encode(), hashlib.sha256).hexdigest()
assert hmac1 == hmac2, "HMAC should be consistent"
# Test HMAC with different key
different_key = "different_secret_key"
hmac3 = hmac.new(different_key.encode(), test_data.encode(), hashlib.sha256).hexdigest()
assert hmac1 != hmac3, "HMAC with different key should be different"
print("✅ Cryptographic hashing working correctly")
def test_wallet_security(self, security_config):
"""Test wallet security features"""
security_config["test_data_dir"].mkdir(parents=True, exist_ok=True)
# Test wallet file permissions
wallet_file = security_config["test_data_dir"] / "test_wallet.json"
# Create test wallet
wallet_data = {
"wallet_id": security_config["test_wallet_id"],
"private_key": secrets.token_hex(32),
"public_key": secrets.token_hex(64),
"address": f"ait1{secrets.token_hex(40)}",
"created_at": datetime.utcnow().isoformat()
}
with open(wallet_file, 'w') as f:
json.dump(wallet_data, f)
# Set restrictive permissions (600 - read/write for owner only)
os.chmod(wallet_file, 0o600)
# Verify permissions
file_stat = wallet_file.stat()
file_permissions = oct(file_stat.st_mode)[-3:]
assert file_permissions == "600", f"Wallet file should have 600 permissions, got {file_permissions}"
# Test wallet encryption
encrypted_wallet = encrypt_wallet_data(wallet_data, security_config["test_password"])
assert encrypted_wallet != wallet_data, "Encrypted wallet should be different"
# Test wallet decryption
decrypted_wallet = decrypt_wallet_data(encrypted_wallet, security_config["test_password"])
assert decrypted_wallet["wallet_id"] == wallet_data["wallet_id"], "Decrypted wallet should match original"
# Test decryption with wrong password
try:
decrypt_wallet_data(encrypted_wallet, "wrong_password")
assert False, "Decryption with wrong password should fail"
except:
pass # Expected to fail
# Cleanup
wallet_file.unlink()
print("✅ Wallet security features working correctly")
def test_chain_access_control(self, security_config):
"""Test chain access control mechanisms"""
# Test chain access permissions
chain_permissions = {
"admin": ["read", "write", "delete", "manage"],
"operator": ["read", "write"],
"viewer": ["read"],
"anonymous": []
}
# Test permission validation
def has_permission(user_role, required_permission):
return required_permission in chain_permissions.get(user_role, [])
# Test admin permissions
assert has_permission("admin", "read"), "Admin should have read permission"
assert has_permission("admin", "write"), "Admin should have write permission"
assert has_permission("admin", "delete"), "Admin should have delete permission"
assert has_permission("admin", "manage"), "Admin should have manage permission"
# Test operator permissions
assert has_permission("operator", "read"), "Operator should have read permission"
assert has_permission("operator", "write"), "Operator should have write permission"
assert not has_permission("operator", "delete"), "Operator should not have delete permission"
assert not has_permission("operator", "manage"), "Operator should not have manage permission"
# Test viewer permissions
assert has_permission("viewer", "read"), "Viewer should have read permission"
assert not has_permission("viewer", "write"), "Viewer should not have write permission"
assert not has_permission("viewer", "delete"), "Viewer should not have delete permission"
# Test anonymous permissions
assert not has_permission("anonymous", "read"), "Anonymous should not have read permission"
assert not has_permission("anonymous", "write"), "Anonymous should not have write permission"
# Test invalid role
assert not has_permission("invalid_role", "read"), "Invalid role should have no permissions"
print("✅ Chain access control working correctly")
def test_transaction_security(self, security_config):
"""Test transaction security features"""
# Test transaction signing
transaction_data = {
"from": f"ait1{secrets.token_hex(40)}",
"to": f"ait1{secrets.token_hex(40)}",
"amount": "1000",
"nonce": secrets.token_hex(16),
"timestamp": int(time.time())
}
private_key = secrets.token_hex(32)
# Sign transaction
signature = sign_transaction(transaction_data, private_key)
assert signature != transaction_data, "Signature should be different from transaction data"
assert len(signature) > 0, "Signature should not be empty"
# Verify signature
is_valid = verify_transaction_signature(transaction_data, signature, private_key)
assert is_valid, "Signature verification should pass"
# Test with tampered data
tampered_data = transaction_data.copy()
tampered_data["amount"] = "2000"
is_valid_tampered = verify_transaction_signature(tampered_data, signature, private_key)
assert not is_valid_tampered, "Signature verification should fail for tampered data"
# Test with wrong key
wrong_key = secrets.token_hex(32)
is_valid_wrong_key = verify_transaction_signature(transaction_data, signature, wrong_key)
assert not is_valid_wrong_key, "Signature verification should fail with wrong key"
print("✅ Transaction security working correctly")
def test_session_security(self, security_config):
"""Test session management security"""
# Test session token generation
user_id = "test_user_123"
session_token = generate_session_token(user_id)
assert len(session_token) > 20, "Session token should be sufficiently long"
assert session_token != user_id, "Session token should be different from user ID"
# Test session validation
is_valid = validate_session_token(session_token, user_id)
assert is_valid, "Valid session token should pass validation"
# Test session with wrong user
is_valid_wrong_user = validate_session_token(session_token, "wrong_user")
assert not is_valid_wrong_user, "Session token should fail for wrong user"
# Test expired session
expired_token = generate_expired_session_token(user_id)
is_valid_expired = validate_session_token(expired_token, user_id)
assert not is_valid_expired, "Expired session token should fail validation"
# Test session timeout
session_timeout = security_config["security_thresholds"]["session_timeout_minutes"]
assert session_timeout == 30, "Session timeout should be 30 minutes"
print("✅ Session security working correctly")
def test_api_security(self, security_config):
"""Test API security features"""
# Test API key generation
api_key = generate_api_key()
assert len(api_key) >= 32, "API key should be at least 32 characters"
assert api_key.isalnum(), "API key should be alphanumeric"
# Test API key validation
is_valid = validate_api_key(api_key)
assert is_valid, "Valid API key should pass validation"
# Test invalid API key
invalid_keys = [
"short",
"invalid@key",
"key with spaces",
"key-with-special-chars!",
""
]
for invalid_key in invalid_keys:
is_invalid = validate_api_key(invalid_key)
assert not is_invalid, f"Invalid API key should fail validation: {invalid_key}"
# Test rate limiting (simulation)
rate_limiter = RateLimiter(max_requests=5, window_seconds=60)
# Should allow requests within limit
for i in range(5):
assert rate_limiter.is_allowed(), f"Request {i+1} should be allowed"
# Should block request beyond limit
assert not rate_limiter.is_allowed(), "Request beyond limit should be blocked"
print("✅ API security working correctly")
def test_data_protection(self, security_config):
"""Test data protection and privacy"""
sensitive_data = {
"user_id": "user_123",
"private_key": secrets.token_hex(32),
"email": "user@example.com",
"phone": "+1234567890",
"address": "123 Blockchain Street"
}
# Test data masking
masked_data = mask_sensitive_data(sensitive_data)
assert "private_key" not in masked_data, "Private key should be masked"
assert "email" in masked_data, "Email should remain unmasked"
assert masked_data["email"] != sensitive_data["email"], "Email should be partially masked"
# Test data anonymization
anonymized_data = anonymize_data(sensitive_data)
assert "user_id" not in anonymized_data, "User ID should be anonymized"
assert "private_key" not in anonymized_data, "Private key should be anonymized"
assert "email" not in anonymized_data, "Email should be anonymized"
# Test data retention
retention_days = 365
cutoff_date = datetime.utcnow() - timedelta(days=retention_days)
old_data = {
"data": "sensitive_info",
"created_at": (cutoff_date - timedelta(days=1)).isoformat()
}
should_delete = should_delete_data(old_data, retention_days)
assert should_delete, "Data older than retention period should be deleted"
recent_data = {
"data": "sensitive_info",
"created_at": datetime.utcnow().isoformat()
}
should_not_delete = should_delete_data(recent_data, retention_days)
assert not should_not_delete, "Recent data should not be deleted"
print("✅ Data protection working correctly")
def test_audit_logging(self, security_config):
"""Test security audit logging"""
audit_log = []
# Test audit log entry creation
log_entry = create_audit_log(
action="wallet_create",
user_id="test_user",
resource_id="wallet_123",
details={"wallet_type": "multi_signature"},
ip_address="192.168.1.1"
)
assert "action" in log_entry, "Audit log should contain action"
assert "user_id" in log_entry, "Audit log should contain user ID"
assert "timestamp" in log_entry, "Audit log should contain timestamp"
assert "ip_address" in log_entry, "Audit log should contain IP address"
audit_log.append(log_entry)
# Test audit log integrity
log_hash = calculate_audit_log_hash(audit_log)
assert len(log_hash) == 64, "Audit log hash should be 64 characters"
# Test audit log tampering detection
tampered_log = audit_log.copy()
tampered_log[0]["action"] = "different_action"
tampered_hash = calculate_audit_log_hash(tampered_log)
assert log_hash != tampered_hash, "Tampered log should have different hash"
print("✅ Audit logging working correctly")
class TestAuthenticationSecurity:
"""Test authentication and authorization security"""
def test_multi_factor_authentication(self):
"""Test multi-factor authentication"""
user_credentials = {
"username": "test_user",
"password": "SecureP@ssw0rd123!"
}
# Test password authentication
password_valid = authenticate_password(user_credentials["username"], user_credentials["password"])
assert password_valid, "Valid password should authenticate"
# Test invalid password
invalid_password_valid = authenticate_password(user_credentials["username"], "wrong_password")
assert not invalid_password_valid, "Invalid password should not authenticate"
# Test 2FA token generation
totp_secret = generate_totp_secret()
totp_code = generate_totp_code(totp_secret)
assert len(totp_code) == 6, "TOTP code should be 6 digits"
assert totp_code.isdigit(), "TOTP code should be numeric"
# Test 2FA validation
totp_valid = validate_totp_code(totp_secret, totp_code)
assert totp_valid, "Valid TOTP code should pass"
# Test invalid TOTP code
invalid_totp_valid = validate_totp_code(totp_secret, "123456")
assert not invalid_totp_valid, "Invalid TOTP code should fail"
print("✅ Multi-factor authentication working correctly")
def test_login_attempt_limiting(self):
"""Test login attempt limiting"""
user_id = "test_user"
max_attempts = 5
lockout_duration = 15 # minutes
login_attempts = LoginAttemptLimiter(max_attempts, lockout_duration)
# Test successful attempts within limit
for i in range(max_attempts):
assert not login_attempts.is_locked_out(user_id), f"User should not be locked out after {i+1} attempts"
# Test lockout after max attempts
login_attempts.record_failed_attempt(user_id)
assert login_attempts.is_locked_out(user_id), "User should be locked out after max attempts"
# Test lockout duration
lockout_remaining = login_attempts.get_lockout_remaining(user_id)
assert lockout_remaining > 0, "Lockout should have remaining time"
assert lockout_remaining <= lockout_duration * 60, "Lockout should not exceed max duration"
print("✅ Login attempt limiting working correctly")
# Security utility functions
def validate_password_strength(password: str) -> bool:
"""Validate password strength"""
if len(password) < 8:
return False
has_upper = any(c.isupper() for c in password)
has_lower = any(c.islower() for c in password)
has_digit = any(c.isdigit() for c in password)
has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password)
return has_upper and has_lower and has_digit and has_special
def encrypt_data(data: str, key: str) -> str:
"""Simple encryption simulation (in production, use proper encryption)"""
import base64
# Simulate encryption with XOR and base64 encoding
key_bytes = key.encode()
data_bytes = data.encode()
encrypted = bytes([b ^ key_bytes[i % len(key_bytes)] for i, b in enumerate(data_bytes)])
return base64.b64encode(encrypted).decode()
def decrypt_data(encrypted_data: str, key: str) -> str:
"""Simple decryption simulation (in production, use proper decryption)"""
import base64
try:
key_bytes = key.encode()
encrypted_bytes = base64.b64decode(encrypted_data.encode())
decrypted = bytes([b ^ key_bytes[i % len(key_bytes)] for i, b in enumerate(encrypted_bytes)])
return decrypted.decode()
except:
return ""
def encrypt_wallet_data(wallet_data: Dict[str, Any], password: str) -> str:
"""Encrypt wallet data with password"""
wallet_json = json.dumps(wallet_data)
return encrypt_data(wallet_json, password)
def decrypt_wallet_data(encrypted_wallet: str, password: str) -> Dict[str, Any]:
"""Decrypt wallet data with password"""
decrypted_json = decrypt_data(encrypted_wallet, password)
return json.loads(decrypted_json)
def sign_transaction(transaction: Dict[str, Any], private_key: str) -> str:
"""Sign transaction with private key"""
transaction_json = json.dumps(transaction, sort_keys=True)
return hashlib.sha256((transaction_json + private_key).encode()).hexdigest()
def verify_transaction_signature(transaction: Dict[str, Any], signature: str, public_key: str) -> bool:
"""Verify transaction signature"""
expected_signature = sign_transaction(transaction, public_key)
return hmac.compare_digest(signature, expected_signature)
def generate_session_token(user_id: str) -> str:
"""Generate session token"""
timestamp = str(int(time.time()))
random_data = secrets.token_hex(16)
return hashlib.sha256(f"{user_id}:{timestamp}:{random_data}".encode()).hexdigest()
def generate_expired_session_token(user_id: str) -> str:
"""Generate expired session token for testing"""
old_timestamp = str(int(time.time()) - 3600) # 1 hour ago
random_data = secrets.token_hex(16)
return hashlib.sha256(f"{user_id}:{old_timestamp}:{random_data}".encode()).hexdigest()
def validate_session_token(token: str, user_id: str) -> bool:
"""Validate session token"""
# In production, this would validate timestamp and signature
return len(token) == 64 and token.startswith(user_id[:8])
def generate_api_key() -> str:
"""Generate API key"""
return secrets.token_hex(32)
def validate_api_key(api_key: str) -> bool:
"""Validate API key format"""
return len(api_key) >= 32 and api_key.isalnum()
class RateLimiter:
"""Simple rate limiter"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {}
def is_allowed(self) -> bool:
current_time = time.time()
window_start = current_time - self.window_seconds
# Clean old requests
self.requests = {k: v for k, v in self.requests.items() if v > window_start}
if len(self.requests) >= self.max_requests:
return False
self.requests[current_time] = current_time
return True
def mask_sensitive_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Mask sensitive data"""
masked = data.copy()
if "private_key" in masked:
masked["private_key"] = "***MASKED***"
if "email" in masked:
email = masked["email"]
if "@" in email:
local, domain = email.split("@", 1)
masked["email"] = f"{local[:2]}***@{domain}"
return masked
def anonymize_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Anonymize sensitive data"""
anonymized = {}
for key, value in data.items():
if key in ["user_id", "email", "phone", "address"]:
anonymized[key] = "***ANONYMIZED***"
else:
anonymized[key] = value
return anonymized
def should_delete_data(data: Dict[str, Any], retention_days: int) -> bool:
"""Check if data should be deleted based on retention policy"""
if "created_at" not in data:
return False
created_at = datetime.fromisoformat(data["created_at"])
cutoff_date = datetime.utcnow() - timedelta(days=retention_days)
return created_at < cutoff_date
def create_audit_log(action: str, user_id: str, resource_id: str, details: Dict[str, Any], ip_address: str) -> Dict[str, Any]:
"""Create audit log entry"""
return {
"action": action,
"user_id": user_id,
"resource_id": resource_id,
"details": details,
"ip_address": ip_address,
"timestamp": datetime.utcnow().isoformat(),
"log_id": secrets.token_hex(16)
}
def calculate_audit_log_hash(audit_log: List[Dict[str, Any]]) -> str:
"""Calculate hash of audit log for integrity verification"""
log_json = json.dumps(audit_log, sort_keys=True)
return hashlib.sha256(log_json.encode()).hexdigest()
def authenticate_password(username: str, password: str) -> bool:
"""Simulate password authentication"""
# In production, this would check against hashed passwords
return username == "test_user" and password == "SecureP@ssw0rd123!"
def generate_totp_secret() -> str:
"""Generate TOTP secret"""
return secrets.token_hex(20)
def generate_totp_code(secret: str) -> str:
"""Generate TOTP code (simplified)"""
import hashlib
import time
timestep = int(time.time() // 30)
counter = f"{secret}{timestep}"
return hashlib.sha256(counter.encode()).hexdigest()[:6]
def validate_totp_code(secret: str, code: str) -> bool:
"""Validate TOTP code"""
expected_code = generate_totp_code(secret)
return hmac.compare_digest(code, expected_code)
class LoginAttemptLimiter:
"""Login attempt limiter"""
def __init__(self, max_attempts: int, lockout_duration_minutes: int):
self.max_attempts = max_attempts
self.lockout_duration_minutes = lockout_duration_minutes
self.attempts = {}
def record_failed_attempt(self, user_id: str):
"""Record failed login attempt"""
current_time = time.time()
if user_id not in self.attempts:
self.attempts[user_id] = []
self.attempts[user_id].append(current_time)
def is_locked_out(self, user_id: str) -> bool:
"""Check if user is locked out"""
if user_id not in self.attempts:
return False
# Remove attempts older than lockout period
lockout_time = self.lockout_duration_minutes * 60
current_time = time.time()
cutoff_time = current_time - lockout_time
self.attempts[user_id] = [
attempt for attempt in self.attempts[user_id]
if attempt > cutoff_time
]
return len(self.attempts[user_id]) >= self.max_attempts
def get_lockout_remaining(self, user_id: str) -> int:
"""Get remaining lockout time in seconds"""
if not self.is_locked_out(user_id):
return 0
oldest_attempt = min(self.attempts[user_id])
lockout_end = oldest_attempt + (self.lockout_duration_minutes * 60)
remaining = max(0, int(lockout_end - time.time()))
return remaining
if __name__ == "__main__":
# Run security tests
pytest.main([__file__, "-v", "--tb=short"])