- Remove executable permissions from configuration files (.editorconfig, .env.example, .gitignore) - Remove executable permissions from documentation files (README.md, LICENSE, SECURITY.md) - Remove executable permissions from web assets (HTML, CSS, JS files) - Remove executable permissions from data files (JSON, SQL, YAML, requirements.txt) - Remove executable permissions from source code files across all apps - Add executable permissions to Python
682 lines
27 KiB
Python
Executable File
682 lines
27 KiB
Python
Executable File
"""
|
|
Security Tests for AITBC Private Chain Access Control and Encryption
|
|
Tests security features, access controls, and encryption mechanisms
|
|
"""
|
|
|
|
import pytest
|
|
import json
|
|
import hashlib
|
|
import hmac
|
|
import secrets
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
import subprocess
|
|
import requests
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from typing import Dict, Any, List, Optional
|
|
import tempfile
|
|
import os
|
|
|
|
class TestSecurity:
|
|
"""Security testing suite for AITBC components"""
|
|
|
|
@pytest.fixture(scope="class")
|
|
def security_config(self):
|
|
"""Security test configuration"""
|
|
return {
|
|
"test_data_dir": Path("/tmp/aitbc_security_test"),
|
|
"encryption_key": secrets.token_hex(32),
|
|
"test_password": "TestSecurePassword123!",
|
|
"test_wallet_id": "test_security_wallet",
|
|
"test_chain_id": "ait-security-test",
|
|
"security_thresholds": {
|
|
"password_min_length": 8,
|
|
"encryption_strength": 256,
|
|
"session_timeout_minutes": 30,
|
|
"max_login_attempts": 5,
|
|
"lockout_duration_minutes": 15
|
|
}
|
|
}
|
|
|
|
def test_password_security(self, security_config):
|
|
"""Test password security requirements"""
|
|
# Test password validation
|
|
weak_passwords = [
|
|
"123",
|
|
"password",
|
|
"abc",
|
|
"test",
|
|
"short",
|
|
"",
|
|
"12345678",
|
|
"password123"
|
|
]
|
|
|
|
strong_passwords = [
|
|
"SecureP@ssw0rd123!",
|
|
"MyStr0ng#P@ssword",
|
|
"AitbcSecur3ty@2026",
|
|
"ComplexP@ssw0rd!#$",
|
|
"VerySecureP@ssw0rd123"
|
|
]
|
|
|
|
# Test weak passwords should be rejected
|
|
for password in weak_passwords:
|
|
is_valid = validate_password_strength(password)
|
|
assert not is_valid, f"Weak password should be rejected: {password}"
|
|
|
|
# Test strong passwords should be accepted
|
|
for password in strong_passwords:
|
|
is_valid = validate_password_strength(password)
|
|
assert is_valid, f"Strong password should be accepted: {password}"
|
|
|
|
print("✅ Password security validation working correctly")
|
|
|
|
def test_encryption_decryption(self, security_config):
|
|
"""Test encryption and decryption mechanisms"""
|
|
test_data = "Sensitive AITBC blockchain data"
|
|
encryption_key = security_config["encryption_key"]
|
|
|
|
# Test encryption
|
|
encrypted_data = encrypt_data(test_data, encryption_key)
|
|
assert encrypted_data != test_data, "Encrypted data should be different from original"
|
|
assert len(encrypted_data) > 0, "Encrypted data should not be empty"
|
|
|
|
# Test decryption
|
|
decrypted_data = decrypt_data(encrypted_data, encryption_key)
|
|
assert decrypted_data == test_data, "Decrypted data should match original"
|
|
|
|
# Test with wrong key
|
|
wrong_key = secrets.token_hex(32)
|
|
decrypted_with_wrong_key = decrypt_data(encrypted_data, wrong_key)
|
|
assert decrypted_with_wrong_key != test_data, "Decryption with wrong key should fail"
|
|
|
|
print("✅ Encryption/decryption working correctly")
|
|
|
|
def test_hashing_security(self, security_config):
|
|
"""Test cryptographic hashing"""
|
|
test_data = "AITBC blockchain transaction data"
|
|
|
|
# Test SHA-256 hashing
|
|
hash1 = hashlib.sha256(test_data.encode()).hexdigest()
|
|
hash2 = hashlib.sha256(test_data.encode()).hexdigest()
|
|
|
|
assert hash1 == hash2, "Same data should produce same hash"
|
|
assert len(hash1) == 64, "SHA-256 hash should be 64 characters"
|
|
assert all(c in '0123456789abcdef' for c in hash1), "Hash should only contain hex characters"
|
|
|
|
# Test different data produces different hash
|
|
different_data = "Different blockchain data"
|
|
hash3 = hashlib.sha256(different_data.encode()).hexdigest()
|
|
assert hash1 != hash3, "Different data should produce different hash"
|
|
|
|
# Test HMAC for message authentication
|
|
secret_key = security_config["encryption_key"]
|
|
hmac1 = hmac.new(secret_key.encode(), test_data.encode(), hashlib.sha256).hexdigest()
|
|
hmac2 = hmac.new(secret_key.encode(), test_data.encode(), hashlib.sha256).hexdigest()
|
|
|
|
assert hmac1 == hmac2, "HMAC should be consistent"
|
|
|
|
# Test HMAC with different key
|
|
different_key = "different_secret_key"
|
|
hmac3 = hmac.new(different_key.encode(), test_data.encode(), hashlib.sha256).hexdigest()
|
|
assert hmac1 != hmac3, "HMAC with different key should be different"
|
|
|
|
print("✅ Cryptographic hashing working correctly")
|
|
|
|
def test_wallet_security(self, security_config):
|
|
"""Test wallet security features"""
|
|
security_config["test_data_dir"].mkdir(parents=True, exist_ok=True)
|
|
|
|
# Test wallet file permissions
|
|
wallet_file = security_config["test_data_dir"] / "test_wallet.json"
|
|
|
|
# Create test wallet
|
|
wallet_data = {
|
|
"wallet_id": security_config["test_wallet_id"],
|
|
"private_key": secrets.token_hex(32),
|
|
"public_key": secrets.token_hex(64),
|
|
"address": f"ait1{secrets.token_hex(40)}",
|
|
"created_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
with open(wallet_file, 'w') as f:
|
|
json.dump(wallet_data, f)
|
|
|
|
# Set restrictive permissions (600 - read/write for owner only)
|
|
os.chmod(wallet_file, 0o600)
|
|
|
|
# Verify permissions
|
|
file_stat = wallet_file.stat()
|
|
file_permissions = oct(file_stat.st_mode)[-3:]
|
|
|
|
assert file_permissions == "600", f"Wallet file should have 600 permissions, got {file_permissions}"
|
|
|
|
# Test wallet encryption
|
|
encrypted_wallet = encrypt_wallet_data(wallet_data, security_config["test_password"])
|
|
assert encrypted_wallet != wallet_data, "Encrypted wallet should be different"
|
|
|
|
# Test wallet decryption
|
|
decrypted_wallet = decrypt_wallet_data(encrypted_wallet, security_config["test_password"])
|
|
assert decrypted_wallet["wallet_id"] == wallet_data["wallet_id"], "Decrypted wallet should match original"
|
|
|
|
# Test decryption with wrong password
|
|
try:
|
|
decrypt_wallet_data(encrypted_wallet, "wrong_password")
|
|
assert False, "Decryption with wrong password should fail"
|
|
except:
|
|
pass # Expected to fail
|
|
|
|
# Cleanup
|
|
wallet_file.unlink()
|
|
|
|
print("✅ Wallet security features working correctly")
|
|
|
|
def test_chain_access_control(self, security_config):
|
|
"""Test chain access control mechanisms"""
|
|
# Test chain access permissions
|
|
chain_permissions = {
|
|
"admin": ["read", "write", "delete", "manage"],
|
|
"operator": ["read", "write"],
|
|
"viewer": ["read"],
|
|
"anonymous": []
|
|
}
|
|
|
|
# Test permission validation
|
|
def has_permission(user_role, required_permission):
|
|
return required_permission in chain_permissions.get(user_role, [])
|
|
|
|
# Test admin permissions
|
|
assert has_permission("admin", "read"), "Admin should have read permission"
|
|
assert has_permission("admin", "write"), "Admin should have write permission"
|
|
assert has_permission("admin", "delete"), "Admin should have delete permission"
|
|
assert has_permission("admin", "manage"), "Admin should have manage permission"
|
|
|
|
# Test operator permissions
|
|
assert has_permission("operator", "read"), "Operator should have read permission"
|
|
assert has_permission("operator", "write"), "Operator should have write permission"
|
|
assert not has_permission("operator", "delete"), "Operator should not have delete permission"
|
|
assert not has_permission("operator", "manage"), "Operator should not have manage permission"
|
|
|
|
# Test viewer permissions
|
|
assert has_permission("viewer", "read"), "Viewer should have read permission"
|
|
assert not has_permission("viewer", "write"), "Viewer should not have write permission"
|
|
assert not has_permission("viewer", "delete"), "Viewer should not have delete permission"
|
|
|
|
# Test anonymous permissions
|
|
assert not has_permission("anonymous", "read"), "Anonymous should not have read permission"
|
|
assert not has_permission("anonymous", "write"), "Anonymous should not have write permission"
|
|
|
|
# Test invalid role
|
|
assert not has_permission("invalid_role", "read"), "Invalid role should have no permissions"
|
|
|
|
print("✅ Chain access control working correctly")
|
|
|
|
def test_transaction_security(self, security_config):
|
|
"""Test transaction security features"""
|
|
# Test transaction signing
|
|
transaction_data = {
|
|
"from": f"ait1{secrets.token_hex(40)}",
|
|
"to": f"ait1{secrets.token_hex(40)}",
|
|
"amount": "1000",
|
|
"nonce": secrets.token_hex(16),
|
|
"timestamp": int(time.time())
|
|
}
|
|
|
|
private_key = secrets.token_hex(32)
|
|
|
|
# Sign transaction
|
|
signature = sign_transaction(transaction_data, private_key)
|
|
assert signature != transaction_data, "Signature should be different from transaction data"
|
|
assert len(signature) > 0, "Signature should not be empty"
|
|
|
|
# Verify signature
|
|
is_valid = verify_transaction_signature(transaction_data, signature, private_key)
|
|
assert is_valid, "Signature verification should pass"
|
|
|
|
# Test with tampered data
|
|
tampered_data = transaction_data.copy()
|
|
tampered_data["amount"] = "2000"
|
|
|
|
is_valid_tampered = verify_transaction_signature(tampered_data, signature, private_key)
|
|
assert not is_valid_tampered, "Signature verification should fail for tampered data"
|
|
|
|
# Test with wrong key
|
|
wrong_key = secrets.token_hex(32)
|
|
is_valid_wrong_key = verify_transaction_signature(transaction_data, signature, wrong_key)
|
|
assert not is_valid_wrong_key, "Signature verification should fail with wrong key"
|
|
|
|
print("✅ Transaction security working correctly")
|
|
|
|
def test_session_security(self, security_config):
|
|
"""Test session management security"""
|
|
# Test session token generation
|
|
user_id = "test_user_123"
|
|
session_token = generate_session_token(user_id)
|
|
|
|
assert len(session_token) > 20, "Session token should be sufficiently long"
|
|
assert session_token != user_id, "Session token should be different from user ID"
|
|
|
|
# Test session validation
|
|
is_valid = validate_session_token(session_token, user_id)
|
|
assert is_valid, "Valid session token should pass validation"
|
|
|
|
# Test session with wrong user
|
|
is_valid_wrong_user = validate_session_token(session_token, "wrong_user")
|
|
assert not is_valid_wrong_user, "Session token should fail for wrong user"
|
|
|
|
# Test expired session
|
|
expired_token = generate_expired_session_token(user_id)
|
|
is_valid_expired = validate_session_token(expired_token, user_id)
|
|
assert not is_valid_expired, "Expired session token should fail validation"
|
|
|
|
# Test session timeout
|
|
session_timeout = security_config["security_thresholds"]["session_timeout_minutes"]
|
|
assert session_timeout == 30, "Session timeout should be 30 minutes"
|
|
|
|
print("✅ Session security working correctly")
|
|
|
|
def test_api_security(self, security_config):
|
|
"""Test API security features"""
|
|
# Test API key generation
|
|
api_key = generate_api_key()
|
|
|
|
assert len(api_key) >= 32, "API key should be at least 32 characters"
|
|
assert api_key.isalnum(), "API key should be alphanumeric"
|
|
|
|
# Test API key validation
|
|
is_valid = validate_api_key(api_key)
|
|
assert is_valid, "Valid API key should pass validation"
|
|
|
|
# Test invalid API key
|
|
invalid_keys = [
|
|
"short",
|
|
"invalid@key",
|
|
"key with spaces",
|
|
"key-with-special-chars!",
|
|
""
|
|
]
|
|
|
|
for invalid_key in invalid_keys:
|
|
is_invalid = validate_api_key(invalid_key)
|
|
assert not is_invalid, f"Invalid API key should fail validation: {invalid_key}"
|
|
|
|
# Test rate limiting (simulation)
|
|
rate_limiter = RateLimiter(max_requests=5, window_seconds=60)
|
|
|
|
# Should allow requests within limit
|
|
for i in range(5):
|
|
assert rate_limiter.is_allowed(), f"Request {i+1} should be allowed"
|
|
|
|
# Should block request beyond limit
|
|
assert not rate_limiter.is_allowed(), "Request beyond limit should be blocked"
|
|
|
|
print("✅ API security working correctly")
|
|
|
|
def test_data_protection(self, security_config):
|
|
"""Test data protection and privacy"""
|
|
sensitive_data = {
|
|
"user_id": "user_123",
|
|
"private_key": secrets.token_hex(32),
|
|
"email": "user@example.com",
|
|
"phone": "+1234567890",
|
|
"address": "123 Blockchain Street"
|
|
}
|
|
|
|
# Test data masking
|
|
masked_data = mask_sensitive_data(sensitive_data)
|
|
|
|
assert "private_key" not in masked_data, "Private key should be masked"
|
|
assert "email" in masked_data, "Email should remain unmasked"
|
|
assert masked_data["email"] != sensitive_data["email"], "Email should be partially masked"
|
|
|
|
# Test data anonymization
|
|
anonymized_data = anonymize_data(sensitive_data)
|
|
|
|
assert "user_id" not in anonymized_data, "User ID should be anonymized"
|
|
assert "private_key" not in anonymized_data, "Private key should be anonymized"
|
|
assert "email" not in anonymized_data, "Email should be anonymized"
|
|
|
|
# Test data retention
|
|
retention_days = 365
|
|
cutoff_date = datetime.utcnow() - timedelta(days=retention_days)
|
|
|
|
old_data = {
|
|
"data": "sensitive_info",
|
|
"created_at": (cutoff_date - timedelta(days=1)).isoformat()
|
|
}
|
|
|
|
should_delete = should_delete_data(old_data, retention_days)
|
|
assert should_delete, "Data older than retention period should be deleted"
|
|
|
|
recent_data = {
|
|
"data": "sensitive_info",
|
|
"created_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
should_not_delete = should_delete_data(recent_data, retention_days)
|
|
assert not should_not_delete, "Recent data should not be deleted"
|
|
|
|
print("✅ Data protection working correctly")
|
|
|
|
def test_audit_logging(self, security_config):
|
|
"""Test security audit logging"""
|
|
audit_log = []
|
|
|
|
# Test audit log entry creation
|
|
log_entry = create_audit_log(
|
|
action="wallet_create",
|
|
user_id="test_user",
|
|
resource_id="wallet_123",
|
|
details={"wallet_type": "multi_signature"},
|
|
ip_address="192.168.1.1"
|
|
)
|
|
|
|
assert "action" in log_entry, "Audit log should contain action"
|
|
assert "user_id" in log_entry, "Audit log should contain user ID"
|
|
assert "timestamp" in log_entry, "Audit log should contain timestamp"
|
|
assert "ip_address" in log_entry, "Audit log should contain IP address"
|
|
|
|
audit_log.append(log_entry)
|
|
|
|
# Test audit log integrity
|
|
log_hash = calculate_audit_log_hash(audit_log)
|
|
assert len(log_hash) == 64, "Audit log hash should be 64 characters"
|
|
|
|
# Test audit log tampering detection
|
|
tampered_log = audit_log.copy()
|
|
tampered_log[0]["action"] = "different_action"
|
|
|
|
tampered_hash = calculate_audit_log_hash(tampered_log)
|
|
assert log_hash != tampered_hash, "Tampered log should have different hash"
|
|
|
|
print("✅ Audit logging working correctly")
|
|
|
|
class TestAuthenticationSecurity:
|
|
"""Test authentication and authorization security"""
|
|
|
|
def test_multi_factor_authentication(self):
|
|
"""Test multi-factor authentication"""
|
|
user_credentials = {
|
|
"username": "test_user",
|
|
"password": "SecureP@ssw0rd123!"
|
|
}
|
|
|
|
# Test password authentication
|
|
password_valid = authenticate_password(user_credentials["username"], user_credentials["password"])
|
|
assert password_valid, "Valid password should authenticate"
|
|
|
|
# Test invalid password
|
|
invalid_password_valid = authenticate_password(user_credentials["username"], "wrong_password")
|
|
assert not invalid_password_valid, "Invalid password should not authenticate"
|
|
|
|
# Test 2FA token generation
|
|
totp_secret = generate_totp_secret()
|
|
totp_code = generate_totp_code(totp_secret)
|
|
|
|
assert len(totp_code) == 6, "TOTP code should be 6 digits"
|
|
assert totp_code.isdigit(), "TOTP code should be numeric"
|
|
|
|
# Test 2FA validation
|
|
totp_valid = validate_totp_code(totp_secret, totp_code)
|
|
assert totp_valid, "Valid TOTP code should pass"
|
|
|
|
# Test invalid TOTP code
|
|
invalid_totp_valid = validate_totp_code(totp_secret, "123456")
|
|
assert not invalid_totp_valid, "Invalid TOTP code should fail"
|
|
|
|
print("✅ Multi-factor authentication working correctly")
|
|
|
|
def test_login_attempt_limiting(self):
|
|
"""Test login attempt limiting"""
|
|
user_id = "test_user"
|
|
max_attempts = 5
|
|
lockout_duration = 15 # minutes
|
|
|
|
login_attempts = LoginAttemptLimiter(max_attempts, lockout_duration)
|
|
|
|
# Test successful attempts within limit
|
|
for i in range(max_attempts):
|
|
assert not login_attempts.is_locked_out(user_id), f"User should not be locked out after {i+1} attempts"
|
|
|
|
# Test lockout after max attempts
|
|
login_attempts.record_failed_attempt(user_id)
|
|
assert login_attempts.is_locked_out(user_id), "User should be locked out after max attempts"
|
|
|
|
# Test lockout duration
|
|
lockout_remaining = login_attempts.get_lockout_remaining(user_id)
|
|
assert lockout_remaining > 0, "Lockout should have remaining time"
|
|
assert lockout_remaining <= lockout_duration * 60, "Lockout should not exceed max duration"
|
|
|
|
print("✅ Login attempt limiting working correctly")
|
|
|
|
# Security utility functions
|
|
def validate_password_strength(password: str) -> bool:
|
|
"""Validate password strength"""
|
|
if len(password) < 8:
|
|
return False
|
|
|
|
has_upper = any(c.isupper() for c in password)
|
|
has_lower = any(c.islower() for c in password)
|
|
has_digit = any(c.isdigit() for c in password)
|
|
has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password)
|
|
|
|
return has_upper and has_lower and has_digit and has_special
|
|
|
|
def encrypt_data(data: str, key: str) -> str:
|
|
"""Simple encryption simulation (in production, use proper encryption)"""
|
|
import base64
|
|
|
|
# Simulate encryption with XOR and base64 encoding
|
|
key_bytes = key.encode()
|
|
data_bytes = data.encode()
|
|
|
|
encrypted = bytes([b ^ key_bytes[i % len(key_bytes)] for i, b in enumerate(data_bytes)])
|
|
return base64.b64encode(encrypted).decode()
|
|
|
|
def decrypt_data(encrypted_data: str, key: str) -> str:
|
|
"""Simple decryption simulation (in production, use proper decryption)"""
|
|
import base64
|
|
|
|
try:
|
|
key_bytes = key.encode()
|
|
encrypted_bytes = base64.b64decode(encrypted_data.encode())
|
|
|
|
decrypted = bytes([b ^ key_bytes[i % len(key_bytes)] for i, b in enumerate(encrypted_bytes)])
|
|
return decrypted.decode()
|
|
except:
|
|
return ""
|
|
|
|
def encrypt_wallet_data(wallet_data: Dict[str, Any], password: str) -> str:
|
|
"""Encrypt wallet data with password"""
|
|
wallet_json = json.dumps(wallet_data)
|
|
return encrypt_data(wallet_json, password)
|
|
|
|
def decrypt_wallet_data(encrypted_wallet: str, password: str) -> Dict[str, Any]:
|
|
"""Decrypt wallet data with password"""
|
|
decrypted_json = decrypt_data(encrypted_wallet, password)
|
|
return json.loads(decrypted_json)
|
|
|
|
def sign_transaction(transaction: Dict[str, Any], private_key: str) -> str:
|
|
"""Sign transaction with private key"""
|
|
transaction_json = json.dumps(transaction, sort_keys=True)
|
|
return hashlib.sha256((transaction_json + private_key).encode()).hexdigest()
|
|
|
|
def verify_transaction_signature(transaction: Dict[str, Any], signature: str, public_key: str) -> bool:
|
|
"""Verify transaction signature"""
|
|
expected_signature = sign_transaction(transaction, public_key)
|
|
return hmac.compare_digest(signature, expected_signature)
|
|
|
|
def generate_session_token(user_id: str) -> str:
|
|
"""Generate session token"""
|
|
timestamp = str(int(time.time()))
|
|
random_data = secrets.token_hex(16)
|
|
return hashlib.sha256(f"{user_id}:{timestamp}:{random_data}".encode()).hexdigest()
|
|
|
|
def generate_expired_session_token(user_id: str) -> str:
|
|
"""Generate expired session token for testing"""
|
|
old_timestamp = str(int(time.time()) - 3600) # 1 hour ago
|
|
random_data = secrets.token_hex(16)
|
|
return hashlib.sha256(f"{user_id}:{old_timestamp}:{random_data}".encode()).hexdigest()
|
|
|
|
def validate_session_token(token: str, user_id: str) -> bool:
|
|
"""Validate session token"""
|
|
# In production, this would validate timestamp and signature
|
|
return len(token) == 64 and token.startswith(user_id[:8])
|
|
|
|
def generate_api_key() -> str:
|
|
"""Generate API key"""
|
|
return secrets.token_hex(32)
|
|
|
|
def validate_api_key(api_key: str) -> bool:
|
|
"""Validate API key format"""
|
|
return len(api_key) >= 32 and api_key.isalnum()
|
|
|
|
class RateLimiter:
|
|
"""Simple rate limiter"""
|
|
|
|
def __init__(self, max_requests: int, window_seconds: int):
|
|
self.max_requests = max_requests
|
|
self.window_seconds = window_seconds
|
|
self.requests = {}
|
|
|
|
def is_allowed(self) -> bool:
|
|
current_time = time.time()
|
|
window_start = current_time - self.window_seconds
|
|
|
|
# Clean old requests
|
|
self.requests = {k: v for k, v in self.requests.items() if v > window_start}
|
|
|
|
if len(self.requests) >= self.max_requests:
|
|
return False
|
|
|
|
self.requests[current_time] = current_time
|
|
return True
|
|
|
|
def mask_sensitive_data(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Mask sensitive data"""
|
|
masked = data.copy()
|
|
|
|
if "private_key" in masked:
|
|
masked["private_key"] = "***MASKED***"
|
|
|
|
if "email" in masked:
|
|
email = masked["email"]
|
|
if "@" in email:
|
|
local, domain = email.split("@", 1)
|
|
masked["email"] = f"{local[:2]}***@{domain}"
|
|
|
|
return masked
|
|
|
|
def anonymize_data(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Anonymize sensitive data"""
|
|
anonymized = {}
|
|
|
|
for key, value in data.items():
|
|
if key in ["user_id", "email", "phone", "address"]:
|
|
anonymized[key] = "***ANONYMIZED***"
|
|
else:
|
|
anonymized[key] = value
|
|
|
|
return anonymized
|
|
|
|
def should_delete_data(data: Dict[str, Any], retention_days: int) -> bool:
|
|
"""Check if data should be deleted based on retention policy"""
|
|
if "created_at" not in data:
|
|
return False
|
|
|
|
created_at = datetime.fromisoformat(data["created_at"])
|
|
cutoff_date = datetime.utcnow() - timedelta(days=retention_days)
|
|
|
|
return created_at < cutoff_date
|
|
|
|
def create_audit_log(action: str, user_id: str, resource_id: str, details: Dict[str, Any], ip_address: str) -> Dict[str, Any]:
|
|
"""Create audit log entry"""
|
|
return {
|
|
"action": action,
|
|
"user_id": user_id,
|
|
"resource_id": resource_id,
|
|
"details": details,
|
|
"ip_address": ip_address,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"log_id": secrets.token_hex(16)
|
|
}
|
|
|
|
def calculate_audit_log_hash(audit_log: List[Dict[str, Any]]) -> str:
|
|
"""Calculate hash of audit log for integrity verification"""
|
|
log_json = json.dumps(audit_log, sort_keys=True)
|
|
return hashlib.sha256(log_json.encode()).hexdigest()
|
|
|
|
def authenticate_password(username: str, password: str) -> bool:
|
|
"""Simulate password authentication"""
|
|
# In production, this would check against hashed passwords
|
|
return username == "test_user" and password == "SecureP@ssw0rd123!"
|
|
|
|
def generate_totp_secret() -> str:
|
|
"""Generate TOTP secret"""
|
|
return secrets.token_hex(20)
|
|
|
|
def generate_totp_code(secret: str) -> str:
|
|
"""Generate TOTP code (simplified)"""
|
|
import hashlib
|
|
import time
|
|
|
|
timestep = int(time.time() // 30)
|
|
counter = f"{secret}{timestep}"
|
|
return hashlib.sha256(counter.encode()).hexdigest()[:6]
|
|
|
|
def validate_totp_code(secret: str, code: str) -> bool:
|
|
"""Validate TOTP code"""
|
|
expected_code = generate_totp_code(secret)
|
|
return hmac.compare_digest(code, expected_code)
|
|
|
|
class LoginAttemptLimiter:
|
|
"""Login attempt limiter"""
|
|
|
|
def __init__(self, max_attempts: int, lockout_duration_minutes: int):
|
|
self.max_attempts = max_attempts
|
|
self.lockout_duration_minutes = lockout_duration_minutes
|
|
self.attempts = {}
|
|
|
|
def record_failed_attempt(self, user_id: str):
|
|
"""Record failed login attempt"""
|
|
current_time = time.time()
|
|
|
|
if user_id not in self.attempts:
|
|
self.attempts[user_id] = []
|
|
|
|
self.attempts[user_id].append(current_time)
|
|
|
|
def is_locked_out(self, user_id: str) -> bool:
|
|
"""Check if user is locked out"""
|
|
if user_id not in self.attempts:
|
|
return False
|
|
|
|
# Remove attempts older than lockout period
|
|
lockout_time = self.lockout_duration_minutes * 60
|
|
current_time = time.time()
|
|
cutoff_time = current_time - lockout_time
|
|
|
|
self.attempts[user_id] = [
|
|
attempt for attempt in self.attempts[user_id]
|
|
if attempt > cutoff_time
|
|
]
|
|
|
|
return len(self.attempts[user_id]) >= self.max_attempts
|
|
|
|
def get_lockout_remaining(self, user_id: str) -> int:
|
|
"""Get remaining lockout time in seconds"""
|
|
if not self.is_locked_out(user_id):
|
|
return 0
|
|
|
|
oldest_attempt = min(self.attempts[user_id])
|
|
lockout_end = oldest_attempt + (self.lockout_duration_minutes * 60)
|
|
remaining = max(0, int(lockout_end - time.time()))
|
|
|
|
return remaining
|
|
|
|
if __name__ == "__main__":
|
|
# Run security tests
|
|
pytest.main([__file__, "-v", "--tb=short"])
|