chore: update file permissions to executable across repository
- Change file mode from 644 to 755 for all project files - Add chain_id parameter to get_balance RPC endpoint with default "ait-devnet" - Rename Miner.extra_meta_data to extra_metadata for consistency
This commit is contained in:
681
tests/security/test_security.py
Normal file
681
tests/security/test_security.py
Normal file
@@ -0,0 +1,681 @@
|
||||
"""
|
||||
Security Tests for AITBC Private Chain Access Control and Encryption
|
||||
Tests security features, access controls, and encryption mechanisms
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import json
|
||||
import hashlib
|
||||
import hmac
|
||||
import secrets
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
import requests
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Dict, Any, List, Optional
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
class TestSecurity:
|
||||
"""Security testing suite for AITBC components"""
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def security_config(self):
|
||||
"""Security test configuration"""
|
||||
return {
|
||||
"test_data_dir": Path("/tmp/aitbc_security_test"),
|
||||
"encryption_key": secrets.token_hex(32),
|
||||
"test_password": "TestSecurePassword123!",
|
||||
"test_wallet_id": "test_security_wallet",
|
||||
"test_chain_id": "ait-security-test",
|
||||
"security_thresholds": {
|
||||
"password_min_length": 8,
|
||||
"encryption_strength": 256,
|
||||
"session_timeout_minutes": 30,
|
||||
"max_login_attempts": 5,
|
||||
"lockout_duration_minutes": 15
|
||||
}
|
||||
}
|
||||
|
||||
def test_password_security(self, security_config):
|
||||
"""Test password security requirements"""
|
||||
# Test password validation
|
||||
weak_passwords = [
|
||||
"123",
|
||||
"password",
|
||||
"abc",
|
||||
"test",
|
||||
"short",
|
||||
"",
|
||||
"12345678",
|
||||
"password123"
|
||||
]
|
||||
|
||||
strong_passwords = [
|
||||
"SecureP@ssw0rd123!",
|
||||
"MyStr0ng#P@ssword",
|
||||
"AitbcSecur3ty@2026",
|
||||
"ComplexP@ssw0rd!#$",
|
||||
"VerySecureP@ssw0rd123"
|
||||
]
|
||||
|
||||
# Test weak passwords should be rejected
|
||||
for password in weak_passwords:
|
||||
is_valid = validate_password_strength(password)
|
||||
assert not is_valid, f"Weak password should be rejected: {password}"
|
||||
|
||||
# Test strong passwords should be accepted
|
||||
for password in strong_passwords:
|
||||
is_valid = validate_password_strength(password)
|
||||
assert is_valid, f"Strong password should be accepted: {password}"
|
||||
|
||||
print("✅ Password security validation working correctly")
|
||||
|
||||
def test_encryption_decryption(self, security_config):
|
||||
"""Test encryption and decryption mechanisms"""
|
||||
test_data = "Sensitive AITBC blockchain data"
|
||||
encryption_key = security_config["encryption_key"]
|
||||
|
||||
# Test encryption
|
||||
encrypted_data = encrypt_data(test_data, encryption_key)
|
||||
assert encrypted_data != test_data, "Encrypted data should be different from original"
|
||||
assert len(encrypted_data) > 0, "Encrypted data should not be empty"
|
||||
|
||||
# Test decryption
|
||||
decrypted_data = decrypt_data(encrypted_data, encryption_key)
|
||||
assert decrypted_data == test_data, "Decrypted data should match original"
|
||||
|
||||
# Test with wrong key
|
||||
wrong_key = secrets.token_hex(32)
|
||||
decrypted_with_wrong_key = decrypt_data(encrypted_data, wrong_key)
|
||||
assert decrypted_with_wrong_key != test_data, "Decryption with wrong key should fail"
|
||||
|
||||
print("✅ Encryption/decryption working correctly")
|
||||
|
||||
def test_hashing_security(self, security_config):
|
||||
"""Test cryptographic hashing"""
|
||||
test_data = "AITBC blockchain transaction data"
|
||||
|
||||
# Test SHA-256 hashing
|
||||
hash1 = hashlib.sha256(test_data.encode()).hexdigest()
|
||||
hash2 = hashlib.sha256(test_data.encode()).hexdigest()
|
||||
|
||||
assert hash1 == hash2, "Same data should produce same hash"
|
||||
assert len(hash1) == 64, "SHA-256 hash should be 64 characters"
|
||||
assert all(c in '0123456789abcdef' for c in hash1), "Hash should only contain hex characters"
|
||||
|
||||
# Test different data produces different hash
|
||||
different_data = "Different blockchain data"
|
||||
hash3 = hashlib.sha256(different_data.encode()).hexdigest()
|
||||
assert hash1 != hash3, "Different data should produce different hash"
|
||||
|
||||
# Test HMAC for message authentication
|
||||
secret_key = security_config["encryption_key"]
|
||||
hmac1 = hmac.new(secret_key.encode(), test_data.encode(), hashlib.sha256).hexdigest()
|
||||
hmac2 = hmac.new(secret_key.encode(), test_data.encode(), hashlib.sha256).hexdigest()
|
||||
|
||||
assert hmac1 == hmac2, "HMAC should be consistent"
|
||||
|
||||
# Test HMAC with different key
|
||||
different_key = "different_secret_key"
|
||||
hmac3 = hmac.new(different_key.encode(), test_data.encode(), hashlib.sha256).hexdigest()
|
||||
assert hmac1 != hmac3, "HMAC with different key should be different"
|
||||
|
||||
print("✅ Cryptographic hashing working correctly")
|
||||
|
||||
def test_wallet_security(self, security_config):
|
||||
"""Test wallet security features"""
|
||||
security_config["test_data_dir"].mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Test wallet file permissions
|
||||
wallet_file = security_config["test_data_dir"] / "test_wallet.json"
|
||||
|
||||
# Create test wallet
|
||||
wallet_data = {
|
||||
"wallet_id": security_config["test_wallet_id"],
|
||||
"private_key": secrets.token_hex(32),
|
||||
"public_key": secrets.token_hex(64),
|
||||
"address": f"ait1{secrets.token_hex(40)}",
|
||||
"created_at": datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
with open(wallet_file, 'w') as f:
|
||||
json.dump(wallet_data, f)
|
||||
|
||||
# Set restrictive permissions (600 - read/write for owner only)
|
||||
os.chmod(wallet_file, 0o600)
|
||||
|
||||
# Verify permissions
|
||||
file_stat = wallet_file.stat()
|
||||
file_permissions = oct(file_stat.st_mode)[-3:]
|
||||
|
||||
assert file_permissions == "600", f"Wallet file should have 600 permissions, got {file_permissions}"
|
||||
|
||||
# Test wallet encryption
|
||||
encrypted_wallet = encrypt_wallet_data(wallet_data, security_config["test_password"])
|
||||
assert encrypted_wallet != wallet_data, "Encrypted wallet should be different"
|
||||
|
||||
# Test wallet decryption
|
||||
decrypted_wallet = decrypt_wallet_data(encrypted_wallet, security_config["test_password"])
|
||||
assert decrypted_wallet["wallet_id"] == wallet_data["wallet_id"], "Decrypted wallet should match original"
|
||||
|
||||
# Test decryption with wrong password
|
||||
try:
|
||||
decrypt_wallet_data(encrypted_wallet, "wrong_password")
|
||||
assert False, "Decryption with wrong password should fail"
|
||||
except:
|
||||
pass # Expected to fail
|
||||
|
||||
# Cleanup
|
||||
wallet_file.unlink()
|
||||
|
||||
print("✅ Wallet security features working correctly")
|
||||
|
||||
def test_chain_access_control(self, security_config):
|
||||
"""Test chain access control mechanisms"""
|
||||
# Test chain access permissions
|
||||
chain_permissions = {
|
||||
"admin": ["read", "write", "delete", "manage"],
|
||||
"operator": ["read", "write"],
|
||||
"viewer": ["read"],
|
||||
"anonymous": []
|
||||
}
|
||||
|
||||
# Test permission validation
|
||||
def has_permission(user_role, required_permission):
|
||||
return required_permission in chain_permissions.get(user_role, [])
|
||||
|
||||
# Test admin permissions
|
||||
assert has_permission("admin", "read"), "Admin should have read permission"
|
||||
assert has_permission("admin", "write"), "Admin should have write permission"
|
||||
assert has_permission("admin", "delete"), "Admin should have delete permission"
|
||||
assert has_permission("admin", "manage"), "Admin should have manage permission"
|
||||
|
||||
# Test operator permissions
|
||||
assert has_permission("operator", "read"), "Operator should have read permission"
|
||||
assert has_permission("operator", "write"), "Operator should have write permission"
|
||||
assert not has_permission("operator", "delete"), "Operator should not have delete permission"
|
||||
assert not has_permission("operator", "manage"), "Operator should not have manage permission"
|
||||
|
||||
# Test viewer permissions
|
||||
assert has_permission("viewer", "read"), "Viewer should have read permission"
|
||||
assert not has_permission("viewer", "write"), "Viewer should not have write permission"
|
||||
assert not has_permission("viewer", "delete"), "Viewer should not have delete permission"
|
||||
|
||||
# Test anonymous permissions
|
||||
assert not has_permission("anonymous", "read"), "Anonymous should not have read permission"
|
||||
assert not has_permission("anonymous", "write"), "Anonymous should not have write permission"
|
||||
|
||||
# Test invalid role
|
||||
assert not has_permission("invalid_role", "read"), "Invalid role should have no permissions"
|
||||
|
||||
print("✅ Chain access control working correctly")
|
||||
|
||||
def test_transaction_security(self, security_config):
|
||||
"""Test transaction security features"""
|
||||
# Test transaction signing
|
||||
transaction_data = {
|
||||
"from": f"ait1{secrets.token_hex(40)}",
|
||||
"to": f"ait1{secrets.token_hex(40)}",
|
||||
"amount": "1000",
|
||||
"nonce": secrets.token_hex(16),
|
||||
"timestamp": int(time.time())
|
||||
}
|
||||
|
||||
private_key = secrets.token_hex(32)
|
||||
|
||||
# Sign transaction
|
||||
signature = sign_transaction(transaction_data, private_key)
|
||||
assert signature != transaction_data, "Signature should be different from transaction data"
|
||||
assert len(signature) > 0, "Signature should not be empty"
|
||||
|
||||
# Verify signature
|
||||
is_valid = verify_transaction_signature(transaction_data, signature, private_key)
|
||||
assert is_valid, "Signature verification should pass"
|
||||
|
||||
# Test with tampered data
|
||||
tampered_data = transaction_data.copy()
|
||||
tampered_data["amount"] = "2000"
|
||||
|
||||
is_valid_tampered = verify_transaction_signature(tampered_data, signature, private_key)
|
||||
assert not is_valid_tampered, "Signature verification should fail for tampered data"
|
||||
|
||||
# Test with wrong key
|
||||
wrong_key = secrets.token_hex(32)
|
||||
is_valid_wrong_key = verify_transaction_signature(transaction_data, signature, wrong_key)
|
||||
assert not is_valid_wrong_key, "Signature verification should fail with wrong key"
|
||||
|
||||
print("✅ Transaction security working correctly")
|
||||
|
||||
def test_session_security(self, security_config):
|
||||
"""Test session management security"""
|
||||
# Test session token generation
|
||||
user_id = "test_user_123"
|
||||
session_token = generate_session_token(user_id)
|
||||
|
||||
assert len(session_token) > 20, "Session token should be sufficiently long"
|
||||
assert session_token != user_id, "Session token should be different from user ID"
|
||||
|
||||
# Test session validation
|
||||
is_valid = validate_session_token(session_token, user_id)
|
||||
assert is_valid, "Valid session token should pass validation"
|
||||
|
||||
# Test session with wrong user
|
||||
is_valid_wrong_user = validate_session_token(session_token, "wrong_user")
|
||||
assert not is_valid_wrong_user, "Session token should fail for wrong user"
|
||||
|
||||
# Test expired session
|
||||
expired_token = generate_expired_session_token(user_id)
|
||||
is_valid_expired = validate_session_token(expired_token, user_id)
|
||||
assert not is_valid_expired, "Expired session token should fail validation"
|
||||
|
||||
# Test session timeout
|
||||
session_timeout = security_config["security_thresholds"]["session_timeout_minutes"]
|
||||
assert session_timeout == 30, "Session timeout should be 30 minutes"
|
||||
|
||||
print("✅ Session security working correctly")
|
||||
|
||||
def test_api_security(self, security_config):
|
||||
"""Test API security features"""
|
||||
# Test API key generation
|
||||
api_key = generate_api_key()
|
||||
|
||||
assert len(api_key) >= 32, "API key should be at least 32 characters"
|
||||
assert api_key.isalnum(), "API key should be alphanumeric"
|
||||
|
||||
# Test API key validation
|
||||
is_valid = validate_api_key(api_key)
|
||||
assert is_valid, "Valid API key should pass validation"
|
||||
|
||||
# Test invalid API key
|
||||
invalid_keys = [
|
||||
"short",
|
||||
"invalid@key",
|
||||
"key with spaces",
|
||||
"key-with-special-chars!",
|
||||
""
|
||||
]
|
||||
|
||||
for invalid_key in invalid_keys:
|
||||
is_invalid = validate_api_key(invalid_key)
|
||||
assert not is_invalid, f"Invalid API key should fail validation: {invalid_key}"
|
||||
|
||||
# Test rate limiting (simulation)
|
||||
rate_limiter = RateLimiter(max_requests=5, window_seconds=60)
|
||||
|
||||
# Should allow requests within limit
|
||||
for i in range(5):
|
||||
assert rate_limiter.is_allowed(), f"Request {i+1} should be allowed"
|
||||
|
||||
# Should block request beyond limit
|
||||
assert not rate_limiter.is_allowed(), "Request beyond limit should be blocked"
|
||||
|
||||
print("✅ API security working correctly")
|
||||
|
||||
def test_data_protection(self, security_config):
|
||||
"""Test data protection and privacy"""
|
||||
sensitive_data = {
|
||||
"user_id": "user_123",
|
||||
"private_key": secrets.token_hex(32),
|
||||
"email": "user@example.com",
|
||||
"phone": "+1234567890",
|
||||
"address": "123 Blockchain Street"
|
||||
}
|
||||
|
||||
# Test data masking
|
||||
masked_data = mask_sensitive_data(sensitive_data)
|
||||
|
||||
assert "private_key" not in masked_data, "Private key should be masked"
|
||||
assert "email" in masked_data, "Email should remain unmasked"
|
||||
assert masked_data["email"] != sensitive_data["email"], "Email should be partially masked"
|
||||
|
||||
# Test data anonymization
|
||||
anonymized_data = anonymize_data(sensitive_data)
|
||||
|
||||
assert "user_id" not in anonymized_data, "User ID should be anonymized"
|
||||
assert "private_key" not in anonymized_data, "Private key should be anonymized"
|
||||
assert "email" not in anonymized_data, "Email should be anonymized"
|
||||
|
||||
# Test data retention
|
||||
retention_days = 365
|
||||
cutoff_date = datetime.utcnow() - timedelta(days=retention_days)
|
||||
|
||||
old_data = {
|
||||
"data": "sensitive_info",
|
||||
"created_at": (cutoff_date - timedelta(days=1)).isoformat()
|
||||
}
|
||||
|
||||
should_delete = should_delete_data(old_data, retention_days)
|
||||
assert should_delete, "Data older than retention period should be deleted"
|
||||
|
||||
recent_data = {
|
||||
"data": "sensitive_info",
|
||||
"created_at": datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
should_not_delete = should_delete_data(recent_data, retention_days)
|
||||
assert not should_not_delete, "Recent data should not be deleted"
|
||||
|
||||
print("✅ Data protection working correctly")
|
||||
|
||||
def test_audit_logging(self, security_config):
|
||||
"""Test security audit logging"""
|
||||
audit_log = []
|
||||
|
||||
# Test audit log entry creation
|
||||
log_entry = create_audit_log(
|
||||
action="wallet_create",
|
||||
user_id="test_user",
|
||||
resource_id="wallet_123",
|
||||
details={"wallet_type": "multi_signature"},
|
||||
ip_address="192.168.1.1"
|
||||
)
|
||||
|
||||
assert "action" in log_entry, "Audit log should contain action"
|
||||
assert "user_id" in log_entry, "Audit log should contain user ID"
|
||||
assert "timestamp" in log_entry, "Audit log should contain timestamp"
|
||||
assert "ip_address" in log_entry, "Audit log should contain IP address"
|
||||
|
||||
audit_log.append(log_entry)
|
||||
|
||||
# Test audit log integrity
|
||||
log_hash = calculate_audit_log_hash(audit_log)
|
||||
assert len(log_hash) == 64, "Audit log hash should be 64 characters"
|
||||
|
||||
# Test audit log tampering detection
|
||||
tampered_log = audit_log.copy()
|
||||
tampered_log[0]["action"] = "different_action"
|
||||
|
||||
tampered_hash = calculate_audit_log_hash(tampered_log)
|
||||
assert log_hash != tampered_hash, "Tampered log should have different hash"
|
||||
|
||||
print("✅ Audit logging working correctly")
|
||||
|
||||
class TestAuthenticationSecurity:
|
||||
"""Test authentication and authorization security"""
|
||||
|
||||
def test_multi_factor_authentication(self):
|
||||
"""Test multi-factor authentication"""
|
||||
user_credentials = {
|
||||
"username": "test_user",
|
||||
"password": "SecureP@ssw0rd123!"
|
||||
}
|
||||
|
||||
# Test password authentication
|
||||
password_valid = authenticate_password(user_credentials["username"], user_credentials["password"])
|
||||
assert password_valid, "Valid password should authenticate"
|
||||
|
||||
# Test invalid password
|
||||
invalid_password_valid = authenticate_password(user_credentials["username"], "wrong_password")
|
||||
assert not invalid_password_valid, "Invalid password should not authenticate"
|
||||
|
||||
# Test 2FA token generation
|
||||
totp_secret = generate_totp_secret()
|
||||
totp_code = generate_totp_code(totp_secret)
|
||||
|
||||
assert len(totp_code) == 6, "TOTP code should be 6 digits"
|
||||
assert totp_code.isdigit(), "TOTP code should be numeric"
|
||||
|
||||
# Test 2FA validation
|
||||
totp_valid = validate_totp_code(totp_secret, totp_code)
|
||||
assert totp_valid, "Valid TOTP code should pass"
|
||||
|
||||
# Test invalid TOTP code
|
||||
invalid_totp_valid = validate_totp_code(totp_secret, "123456")
|
||||
assert not invalid_totp_valid, "Invalid TOTP code should fail"
|
||||
|
||||
print("✅ Multi-factor authentication working correctly")
|
||||
|
||||
def test_login_attempt_limiting(self):
|
||||
"""Test login attempt limiting"""
|
||||
user_id = "test_user"
|
||||
max_attempts = 5
|
||||
lockout_duration = 15 # minutes
|
||||
|
||||
login_attempts = LoginAttemptLimiter(max_attempts, lockout_duration)
|
||||
|
||||
# Test successful attempts within limit
|
||||
for i in range(max_attempts):
|
||||
assert not login_attempts.is_locked_out(user_id), f"User should not be locked out after {i+1} attempts"
|
||||
|
||||
# Test lockout after max attempts
|
||||
login_attempts.record_failed_attempt(user_id)
|
||||
assert login_attempts.is_locked_out(user_id), "User should be locked out after max attempts"
|
||||
|
||||
# Test lockout duration
|
||||
lockout_remaining = login_attempts.get_lockout_remaining(user_id)
|
||||
assert lockout_remaining > 0, "Lockout should have remaining time"
|
||||
assert lockout_remaining <= lockout_duration * 60, "Lockout should not exceed max duration"
|
||||
|
||||
print("✅ Login attempt limiting working correctly")
|
||||
|
||||
# Security utility functions
|
||||
def validate_password_strength(password: str) -> bool:
|
||||
"""Validate password strength"""
|
||||
if len(password) < 8:
|
||||
return False
|
||||
|
||||
has_upper = any(c.isupper() for c in password)
|
||||
has_lower = any(c.islower() for c in password)
|
||||
has_digit = any(c.isdigit() for c in password)
|
||||
has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password)
|
||||
|
||||
return has_upper and has_lower and has_digit and has_special
|
||||
|
||||
def encrypt_data(data: str, key: str) -> str:
|
||||
"""Simple encryption simulation (in production, use proper encryption)"""
|
||||
import base64
|
||||
|
||||
# Simulate encryption with XOR and base64 encoding
|
||||
key_bytes = key.encode()
|
||||
data_bytes = data.encode()
|
||||
|
||||
encrypted = bytes([b ^ key_bytes[i % len(key_bytes)] for i, b in enumerate(data_bytes)])
|
||||
return base64.b64encode(encrypted).decode()
|
||||
|
||||
def decrypt_data(encrypted_data: str, key: str) -> str:
|
||||
"""Simple decryption simulation (in production, use proper decryption)"""
|
||||
import base64
|
||||
|
||||
try:
|
||||
key_bytes = key.encode()
|
||||
encrypted_bytes = base64.b64decode(encrypted_data.encode())
|
||||
|
||||
decrypted = bytes([b ^ key_bytes[i % len(key_bytes)] for i, b in enumerate(encrypted_bytes)])
|
||||
return decrypted.decode()
|
||||
except:
|
||||
return ""
|
||||
|
||||
def encrypt_wallet_data(wallet_data: Dict[str, Any], password: str) -> str:
|
||||
"""Encrypt wallet data with password"""
|
||||
wallet_json = json.dumps(wallet_data)
|
||||
return encrypt_data(wallet_json, password)
|
||||
|
||||
def decrypt_wallet_data(encrypted_wallet: str, password: str) -> Dict[str, Any]:
|
||||
"""Decrypt wallet data with password"""
|
||||
decrypted_json = decrypt_data(encrypted_wallet, password)
|
||||
return json.loads(decrypted_json)
|
||||
|
||||
def sign_transaction(transaction: Dict[str, Any], private_key: str) -> str:
|
||||
"""Sign transaction with private key"""
|
||||
transaction_json = json.dumps(transaction, sort_keys=True)
|
||||
return hashlib.sha256((transaction_json + private_key).encode()).hexdigest()
|
||||
|
||||
def verify_transaction_signature(transaction: Dict[str, Any], signature: str, public_key: str) -> bool:
|
||||
"""Verify transaction signature"""
|
||||
expected_signature = sign_transaction(transaction, public_key)
|
||||
return hmac.compare_digest(signature, expected_signature)
|
||||
|
||||
def generate_session_token(user_id: str) -> str:
|
||||
"""Generate session token"""
|
||||
timestamp = str(int(time.time()))
|
||||
random_data = secrets.token_hex(16)
|
||||
return hashlib.sha256(f"{user_id}:{timestamp}:{random_data}".encode()).hexdigest()
|
||||
|
||||
def generate_expired_session_token(user_id: str) -> str:
|
||||
"""Generate expired session token for testing"""
|
||||
old_timestamp = str(int(time.time()) - 3600) # 1 hour ago
|
||||
random_data = secrets.token_hex(16)
|
||||
return hashlib.sha256(f"{user_id}:{old_timestamp}:{random_data}".encode()).hexdigest()
|
||||
|
||||
def validate_session_token(token: str, user_id: str) -> bool:
|
||||
"""Validate session token"""
|
||||
# In production, this would validate timestamp and signature
|
||||
return len(token) == 64 and token.startswith(user_id[:8])
|
||||
|
||||
def generate_api_key() -> str:
|
||||
"""Generate API key"""
|
||||
return secrets.token_hex(32)
|
||||
|
||||
def validate_api_key(api_key: str) -> bool:
|
||||
"""Validate API key format"""
|
||||
return len(api_key) >= 32 and api_key.isalnum()
|
||||
|
||||
class RateLimiter:
|
||||
"""Simple rate limiter"""
|
||||
|
||||
def __init__(self, max_requests: int, window_seconds: int):
|
||||
self.max_requests = max_requests
|
||||
self.window_seconds = window_seconds
|
||||
self.requests = {}
|
||||
|
||||
def is_allowed(self) -> bool:
|
||||
current_time = time.time()
|
||||
window_start = current_time - self.window_seconds
|
||||
|
||||
# Clean old requests
|
||||
self.requests = {k: v for k, v in self.requests.items() if v > window_start}
|
||||
|
||||
if len(self.requests) >= self.max_requests:
|
||||
return False
|
||||
|
||||
self.requests[current_time] = current_time
|
||||
return True
|
||||
|
||||
def mask_sensitive_data(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Mask sensitive data"""
|
||||
masked = data.copy()
|
||||
|
||||
if "private_key" in masked:
|
||||
masked["private_key"] = "***MASKED***"
|
||||
|
||||
if "email" in masked:
|
||||
email = masked["email"]
|
||||
if "@" in email:
|
||||
local, domain = email.split("@", 1)
|
||||
masked["email"] = f"{local[:2]}***@{domain}"
|
||||
|
||||
return masked
|
||||
|
||||
def anonymize_data(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Anonymize sensitive data"""
|
||||
anonymized = {}
|
||||
|
||||
for key, value in data.items():
|
||||
if key in ["user_id", "email", "phone", "address"]:
|
||||
anonymized[key] = "***ANONYMIZED***"
|
||||
else:
|
||||
anonymized[key] = value
|
||||
|
||||
return anonymized
|
||||
|
||||
def should_delete_data(data: Dict[str, Any], retention_days: int) -> bool:
|
||||
"""Check if data should be deleted based on retention policy"""
|
||||
if "created_at" not in data:
|
||||
return False
|
||||
|
||||
created_at = datetime.fromisoformat(data["created_at"])
|
||||
cutoff_date = datetime.utcnow() - timedelta(days=retention_days)
|
||||
|
||||
return created_at < cutoff_date
|
||||
|
||||
def create_audit_log(action: str, user_id: str, resource_id: str, details: Dict[str, Any], ip_address: str) -> Dict[str, Any]:
|
||||
"""Create audit log entry"""
|
||||
return {
|
||||
"action": action,
|
||||
"user_id": user_id,
|
||||
"resource_id": resource_id,
|
||||
"details": details,
|
||||
"ip_address": ip_address,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"log_id": secrets.token_hex(16)
|
||||
}
|
||||
|
||||
def calculate_audit_log_hash(audit_log: List[Dict[str, Any]]) -> str:
|
||||
"""Calculate hash of audit log for integrity verification"""
|
||||
log_json = json.dumps(audit_log, sort_keys=True)
|
||||
return hashlib.sha256(log_json.encode()).hexdigest()
|
||||
|
||||
def authenticate_password(username: str, password: str) -> bool:
|
||||
"""Simulate password authentication"""
|
||||
# In production, this would check against hashed passwords
|
||||
return username == "test_user" and password == "SecureP@ssw0rd123!"
|
||||
|
||||
def generate_totp_secret() -> str:
|
||||
"""Generate TOTP secret"""
|
||||
return secrets.token_hex(20)
|
||||
|
||||
def generate_totp_code(secret: str) -> str:
|
||||
"""Generate TOTP code (simplified)"""
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
timestep = int(time.time() // 30)
|
||||
counter = f"{secret}{timestep}"
|
||||
return hashlib.sha256(counter.encode()).hexdigest()[:6]
|
||||
|
||||
def validate_totp_code(secret: str, code: str) -> bool:
|
||||
"""Validate TOTP code"""
|
||||
expected_code = generate_totp_code(secret)
|
||||
return hmac.compare_digest(code, expected_code)
|
||||
|
||||
class LoginAttemptLimiter:
|
||||
"""Login attempt limiter"""
|
||||
|
||||
def __init__(self, max_attempts: int, lockout_duration_minutes: int):
|
||||
self.max_attempts = max_attempts
|
||||
self.lockout_duration_minutes = lockout_duration_minutes
|
||||
self.attempts = {}
|
||||
|
||||
def record_failed_attempt(self, user_id: str):
|
||||
"""Record failed login attempt"""
|
||||
current_time = time.time()
|
||||
|
||||
if user_id not in self.attempts:
|
||||
self.attempts[user_id] = []
|
||||
|
||||
self.attempts[user_id].append(current_time)
|
||||
|
||||
def is_locked_out(self, user_id: str) -> bool:
|
||||
"""Check if user is locked out"""
|
||||
if user_id not in self.attempts:
|
||||
return False
|
||||
|
||||
# Remove attempts older than lockout period
|
||||
lockout_time = self.lockout_duration_minutes * 60
|
||||
current_time = time.time()
|
||||
cutoff_time = current_time - lockout_time
|
||||
|
||||
self.attempts[user_id] = [
|
||||
attempt for attempt in self.attempts[user_id]
|
||||
if attempt > cutoff_time
|
||||
]
|
||||
|
||||
return len(self.attempts[user_id]) >= self.max_attempts
|
||||
|
||||
def get_lockout_remaining(self, user_id: str) -> int:
|
||||
"""Get remaining lockout time in seconds"""
|
||||
if not self.is_locked_out(user_id):
|
||||
return 0
|
||||
|
||||
oldest_attempt = min(self.attempts[user_id])
|
||||
lockout_end = oldest_attempt + (self.lockout_duration_minutes * 60)
|
||||
remaining = max(0, int(lockout_end - time.time()))
|
||||
|
||||
return remaining
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run security tests
|
||||
pytest.main([__file__, "-v", "--tb=short"])
|
||||
Reference in New Issue
Block a user