Files
aitbc/cli/aitbc_cli/utils/security.py
oib f353e00172 chore(security): enhance environment configuration, CI workflows, and wallet daemon with security improvements
- Restructure .env.example with security-focused documentation, service-specific environment file references, and AWS Secrets Manager integration
- Update CLI tests workflow to single Python 3.13 version, add pytest-mock dependency, and consolidate test execution with coverage
- Add comprehensive security validation to package publishing workflow with manual approval gates, secret scanning, and release
2026-03-03 10:33:46 +01:00

281 lines
7.8 KiB
Python

"""
Secure Encryption Utilities - Fixed Version
Replaces the broken encryption in utils/__init__.py
"""
import base64
import hashlib
import secrets
from typing import Optional, Dict, Any
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
def derive_secure_key(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
"""
Derive secure encryption key using PBKDF2 with SHA-256
Args:
password: User password (required - no defaults)
salt: Optional salt (generated if not provided)
Returns:
Tuple of (fernet_key, salt)
Raises:
ValueError: If password is empty or too weak
"""
if not password or len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
if salt is None:
salt = secrets.token_bytes(32)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=600_000, # OWASP recommended minimum
)
key = kdf.derive(password.encode())
fernet_key = base64.urlsafe_b64encode(key)
return fernet_key, salt
def encrypt_value(value: str, password: str) -> Dict[str, str]:
"""
Encrypt a value using PBKDF2 + Fernet (no more hardcoded keys)
Args:
value: Value to encrypt
password: Strong password (required)
Returns:
Dict with encrypted data and metadata
Raises:
ValueError: If password is too weak
"""
if not value:
raise ValueError("Cannot encrypt empty value")
# Derive secure key
fernet_key, salt = derive_secure_key(password)
# Encrypt
f = Fernet(fernet_key)
encrypted = f.encrypt(value.encode())
# Fernet already returns base64, no double encoding
return {
"encrypted_data": encrypted.decode(),
"salt": base64.b64encode(salt).decode(),
"algorithm": "PBKDF2-SHA256-Fernet",
"iterations": 600_000,
"version": "1.0"
}
def decrypt_value(encrypted_data: Dict[str, str] | str, password: str) -> str:
"""
Decrypt a PBKDF2 + Fernet encrypted value
Args:
encrypted_data: Dict with encrypted data or legacy string
password: Password used for encryption
Returns:
Decrypted value
Raises:
ValueError: If decryption fails or password is wrong
InvalidToken: If the encrypted data is corrupted
"""
# Handle legacy format (backward compatibility)
if isinstance(encrypted_data, str):
# This is the old broken format - we can't decrypt it securely
raise ValueError(
"Legacy encrypted format detected. "
"This data was encrypted with a broken implementation and cannot be securely recovered. "
"Please recreate the wallet with proper encryption."
)
try:
# Extract salt and encrypted data
salt = base64.b64decode(encrypted_data["salt"])
encrypted = encrypted_data["encrypted_data"].encode()
# Derive same key
fernet_key, _ = derive_secure_key(password, salt)
# Decrypt
f = Fernet(fernet_key)
decrypted = f.decrypt(encrypted)
return decrypted.decode()
except InvalidToken:
raise ValueError("Invalid password or corrupted encrypted data")
except Exception as e:
raise ValueError(f"Decryption failed: {str(e)}")
def validate_password_strength(password: str) -> Dict[str, Any]:
"""
Validate password strength
Args:
password: Password to validate
Returns:
Dict with validation results
"""
issues = []
score = 0
if len(password) < 8:
issues.append("Password must be at least 8 characters")
else:
score += 1
if len(password) < 12:
issues.append("Consider using 12+ characters for better security")
else:
score += 1
if not any(c.isupper() for c in password):
issues.append("Include uppercase letters")
else:
score += 1
if not any(c.islower() for c in password):
issues.append("Include lowercase letters")
else:
score += 1
if not any(c.isdigit() for c in password):
issues.append("Include numbers")
else:
score += 1
if not any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password):
issues.append("Include special characters")
else:
score += 1
# Check for common patterns
if password.lower() in ["password", "123456", "qwerty", "admin"]:
issues.append("Avoid common passwords")
score = 0
strength_levels = {
0: "Very Weak",
1: "Weak",
2: "Fair",
3: "Good",
4: "Strong",
5: "Very Strong",
6: "Excellent"
}
return {
"score": score,
"strength": strength_levels.get(score, "Unknown"),
"issues": issues,
"is_acceptable": score >= 3
}
def generate_secure_password(length: int = 16) -> str:
"""
Generate a secure random password
Args:
length: Password length
Returns:
Secure random password
"""
alphabet = (
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"0123456789"
"!@#$%^&*()_+-=[]{}|;:,.<>?"
)
password = ''.join(secrets.choice(alphabet) for _ in range(length))
# Ensure it meets minimum requirements
while not validate_password_strength(password)["is_acceptable"]:
password = ''.join(secrets.choice(alphabet) for _ in range(length))
return password
# Migration helper for existing wallets
def migrate_legacy_wallet(legacy_data: Dict[str, Any], new_password: str) -> Dict[str, Any]:
"""
Migrate a wallet from broken encryption to secure encryption
Args:
legacy_data: Legacy wallet data with broken encryption
new_password: New strong password
Returns:
Migrated wallet data
Raises:
ValueError: If migration cannot be performed safely
"""
# Check if this is legacy format
if "encrypted" not in legacy_data or not legacy_data.get("encrypted"):
raise ValueError("Not a legacy encrypted wallet")
if "private_key" not in legacy_data:
raise ValueError("Cannot migrate wallet without private key")
# The legacy wallet might have a plaintext private key
# If it's truly encrypted with the broken method, we cannot recover it
private_key = legacy_data["private_key"]
if private_key.startswith("[ENCRYPTED_MOCK]") or private_key.startswith("["):
# This was never actually encrypted - it's a mock
raise ValueError(
"Cannot migrate mock wallet. "
"Please create a new wallet with proper key generation."
)
# If we get here, we have a plaintext private key (security issue!)
# Re-encrypt it properly
try:
encrypted_data = encrypt_value(private_key, new_password)
return {
**legacy_data,
"private_key": encrypted_data,
"encryption_version": "1.0",
"migration_timestamp": secrets.token_hex(16)
}
except Exception as e:
raise ValueError(f"Migration failed: {str(e)}")
# Security constants
class EncryptionConfig:
"""Encryption configuration constants"""
PBKDF2_ITERATIONS = 600_000
SALT_LENGTH = 32
MIN_PASSWORD_LENGTH = 8
RECOMMENDED_PASSWORD_LENGTH = 16
# Algorithm identifiers
ALGORITHM_PBKDF2_FERNET = "PBKDF2-SHA256-Fernet"
ALGORITHM_LEGACY = "LEGACY-BROKEN"
# Version tracking
CURRENT_VERSION = "1.0"
LEGACY_VERSIONS = ["0.9", "legacy", "broken"]