- Change file mode from 644 to 755 for all project files - Add chain_id parameter to get_balance RPC endpoint with default "ait-devnet" - Rename Miner.extra_meta_data to extra_metadata for consistency
281 lines
7.8 KiB
Python
Executable File
281 lines
7.8 KiB
Python
Executable File
"""
|
|
Secure Encryption Utilities - Fixed Version
|
|
Replaces the broken encryption in utils/__init__.py
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import secrets
|
|
from typing import Optional, Dict, Any
|
|
from cryptography.fernet import Fernet, InvalidToken
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
from cryptography.hazmat.primitives import hashes
|
|
|
|
|
|
def derive_secure_key(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
|
|
"""
|
|
Derive secure encryption key using PBKDF2 with SHA-256
|
|
|
|
Args:
|
|
password: User password (required - no defaults)
|
|
salt: Optional salt (generated if not provided)
|
|
|
|
Returns:
|
|
Tuple of (fernet_key, salt)
|
|
|
|
Raises:
|
|
ValueError: If password is empty or too weak
|
|
"""
|
|
if not password or len(password) < 8:
|
|
raise ValueError("Password must be at least 8 characters long")
|
|
|
|
if salt is None:
|
|
salt = secrets.token_bytes(32)
|
|
|
|
kdf = PBKDF2HMAC(
|
|
algorithm=hashes.SHA256(),
|
|
length=32,
|
|
salt=salt,
|
|
iterations=600_000, # OWASP recommended minimum
|
|
)
|
|
|
|
key = kdf.derive(password.encode())
|
|
fernet_key = base64.urlsafe_b64encode(key)
|
|
|
|
return fernet_key, salt
|
|
|
|
|
|
def encrypt_value(value: str, password: str) -> Dict[str, str]:
|
|
"""
|
|
Encrypt a value using PBKDF2 + Fernet (no more hardcoded keys)
|
|
|
|
Args:
|
|
value: Value to encrypt
|
|
password: Strong password (required)
|
|
|
|
Returns:
|
|
Dict with encrypted data and metadata
|
|
|
|
Raises:
|
|
ValueError: If password is too weak
|
|
"""
|
|
if not value:
|
|
raise ValueError("Cannot encrypt empty value")
|
|
|
|
# Derive secure key
|
|
fernet_key, salt = derive_secure_key(password)
|
|
|
|
# Encrypt
|
|
f = Fernet(fernet_key)
|
|
encrypted = f.encrypt(value.encode())
|
|
|
|
# Fernet already returns base64, no double encoding
|
|
return {
|
|
"encrypted_data": encrypted.decode(),
|
|
"salt": base64.b64encode(salt).decode(),
|
|
"algorithm": "PBKDF2-SHA256-Fernet",
|
|
"iterations": 600_000,
|
|
"version": "1.0"
|
|
}
|
|
|
|
|
|
def decrypt_value(encrypted_data: Dict[str, str] | str, password: str) -> str:
|
|
"""
|
|
Decrypt a PBKDF2 + Fernet encrypted value
|
|
|
|
Args:
|
|
encrypted_data: Dict with encrypted data or legacy string
|
|
password: Password used for encryption
|
|
|
|
Returns:
|
|
Decrypted value
|
|
|
|
Raises:
|
|
ValueError: If decryption fails or password is wrong
|
|
InvalidToken: If the encrypted data is corrupted
|
|
"""
|
|
# Handle legacy format (backward compatibility)
|
|
if isinstance(encrypted_data, str):
|
|
# This is the old broken format - we can't decrypt it securely
|
|
raise ValueError(
|
|
"Legacy encrypted format detected. "
|
|
"This data was encrypted with a broken implementation and cannot be securely recovered. "
|
|
"Please recreate the wallet with proper encryption."
|
|
)
|
|
|
|
try:
|
|
# Extract salt and encrypted data
|
|
salt = base64.b64decode(encrypted_data["salt"])
|
|
encrypted = encrypted_data["encrypted_data"].encode()
|
|
|
|
# Derive same key
|
|
fernet_key, _ = derive_secure_key(password, salt)
|
|
|
|
# Decrypt
|
|
f = Fernet(fernet_key)
|
|
decrypted = f.decrypt(encrypted)
|
|
|
|
return decrypted.decode()
|
|
except InvalidToken:
|
|
raise ValueError("Invalid password or corrupted encrypted data")
|
|
except Exception as e:
|
|
raise ValueError(f"Decryption failed: {str(e)}")
|
|
|
|
|
|
def validate_password_strength(password: str) -> Dict[str, Any]:
|
|
"""
|
|
Validate password strength
|
|
|
|
Args:
|
|
password: Password to validate
|
|
|
|
Returns:
|
|
Dict with validation results
|
|
"""
|
|
issues = []
|
|
score = 0
|
|
|
|
if len(password) < 8:
|
|
issues.append("Password must be at least 8 characters")
|
|
else:
|
|
score += 1
|
|
|
|
if len(password) < 12:
|
|
issues.append("Consider using 12+ characters for better security")
|
|
else:
|
|
score += 1
|
|
|
|
if not any(c.isupper() for c in password):
|
|
issues.append("Include uppercase letters")
|
|
else:
|
|
score += 1
|
|
|
|
if not any(c.islower() for c in password):
|
|
issues.append("Include lowercase letters")
|
|
else:
|
|
score += 1
|
|
|
|
if not any(c.isdigit() for c in password):
|
|
issues.append("Include numbers")
|
|
else:
|
|
score += 1
|
|
|
|
if not any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password):
|
|
issues.append("Include special characters")
|
|
else:
|
|
score += 1
|
|
|
|
# Check for common patterns
|
|
if password.lower() in ["password", "123456", "qwerty", "admin"]:
|
|
issues.append("Avoid common passwords")
|
|
score = 0
|
|
|
|
strength_levels = {
|
|
0: "Very Weak",
|
|
1: "Weak",
|
|
2: "Fair",
|
|
3: "Good",
|
|
4: "Strong",
|
|
5: "Very Strong",
|
|
6: "Excellent"
|
|
}
|
|
|
|
return {
|
|
"score": score,
|
|
"strength": strength_levels.get(score, "Unknown"),
|
|
"issues": issues,
|
|
"is_acceptable": score >= 3
|
|
}
|
|
|
|
|
|
def generate_secure_password(length: int = 16) -> str:
|
|
"""
|
|
Generate a secure random password
|
|
|
|
Args:
|
|
length: Password length
|
|
|
|
Returns:
|
|
Secure random password
|
|
"""
|
|
alphabet = (
|
|
"abcdefghijklmnopqrstuvwxyz"
|
|
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
|
"0123456789"
|
|
"!@#$%^&*()_+-=[]{}|;:,.<>?"
|
|
)
|
|
|
|
password = ''.join(secrets.choice(alphabet) for _ in range(length))
|
|
|
|
# Ensure it meets minimum requirements
|
|
while not validate_password_strength(password)["is_acceptable"]:
|
|
password = ''.join(secrets.choice(alphabet) for _ in range(length))
|
|
|
|
return password
|
|
|
|
|
|
# Migration helper for existing wallets
|
|
def migrate_legacy_wallet(legacy_data: Dict[str, Any], new_password: str) -> Dict[str, Any]:
|
|
"""
|
|
Migrate a wallet from broken encryption to secure encryption
|
|
|
|
Args:
|
|
legacy_data: Legacy wallet data with broken encryption
|
|
new_password: New strong password
|
|
|
|
Returns:
|
|
Migrated wallet data
|
|
|
|
Raises:
|
|
ValueError: If migration cannot be performed safely
|
|
"""
|
|
# Check if this is legacy format
|
|
if "encrypted" not in legacy_data or not legacy_data.get("encrypted"):
|
|
raise ValueError("Not a legacy encrypted wallet")
|
|
|
|
if "private_key" not in legacy_data:
|
|
raise ValueError("Cannot migrate wallet without private key")
|
|
|
|
# The legacy wallet might have a plaintext private key
|
|
# If it's truly encrypted with the broken method, we cannot recover it
|
|
private_key = legacy_data["private_key"]
|
|
|
|
if private_key.startswith("[ENCRYPTED_MOCK]") or private_key.startswith("["):
|
|
# This was never actually encrypted - it's a mock
|
|
raise ValueError(
|
|
"Cannot migrate mock wallet. "
|
|
"Please create a new wallet with proper key generation."
|
|
)
|
|
|
|
# If we get here, we have a plaintext private key (security issue!)
|
|
# Re-encrypt it properly
|
|
try:
|
|
encrypted_data = encrypt_value(private_key, new_password)
|
|
|
|
return {
|
|
**legacy_data,
|
|
"private_key": encrypted_data,
|
|
"encryption_version": "1.0",
|
|
"migration_timestamp": secrets.token_hex(16)
|
|
}
|
|
except Exception as e:
|
|
raise ValueError(f"Migration failed: {str(e)}")
|
|
|
|
|
|
# Security constants
|
|
class EncryptionConfig:
|
|
"""Encryption configuration constants"""
|
|
|
|
PBKDF2_ITERATIONS = 600_000
|
|
SALT_LENGTH = 32
|
|
MIN_PASSWORD_LENGTH = 8
|
|
RECOMMENDED_PASSWORD_LENGTH = 16
|
|
|
|
# Algorithm identifiers
|
|
ALGORITHM_PBKDF2_FERNET = "PBKDF2-SHA256-Fernet"
|
|
ALGORITHM_LEGACY = "LEGACY-BROKEN"
|
|
|
|
# Version tracking
|
|
CURRENT_VERSION = "1.0"
|
|
LEGACY_VERSIONS = ["0.9", "legacy", "broken"]
|