refactor: flatten CLI directory structure - remove 'box in a box'

BEFORE:
/opt/aitbc/cli/
├── aitbc_cli/                    # Python package (box in a box)
│   ├── commands/
│   ├── main.py
│   └── ...
├── setup.py

AFTER:
/opt/aitbc/cli/                    # Flat structure
├── commands/                      # Direct access
├── main.py                        # Direct access
├── auth/
├── config/
├── core/
├── models/
├── utils/
├── plugins.py
└── setup.py

CHANGES MADE:
- Moved all files from aitbc_cli/ to cli/ root
- Fixed all relative imports (from . to absolute imports)
- Updated setup.py entry point: aitbc_cli.main → main
- Added CLI directory to Python path in entry script
- Simplified deployment.py to remove dependency on deleted core.deployment
- Fixed import paths in all command files
- Recreated virtual environment with new structure

BENEFITS:
- Eliminated 'box in a box' nesting
- Simpler directory structure
- Direct access to all modules
- Cleaner imports
- Easier maintenance and development
- CLI works with both 'python main.py' and 'aitbc' commands
This commit is contained in:
2026-03-26 09:12:02 +01:00
parent b3cf7384ce
commit c0952c2525
89 changed files with 265 additions and 4605 deletions

368
cli/utils/__init__.py Executable file
View File

@@ -0,0 +1,368 @@
"""Utility functions for AITBC CLI"""
import time
import logging
import sys
import os
from pathlib import Path
from typing import Tuple, List, Dict, Optional, Any
from contextlib import contextmanager
from rich.console import Console
from rich.logging import RichHandler
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn
import json
import yaml
from tabulate import tabulate
console = Console()
@contextmanager
def progress_bar(description: str = "Working...", total: Optional[int] = None):
"""Context manager for progress bar display"""
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task(description, total=total)
yield progress, task
def progress_spinner(description: str = "Working..."):
"""Simple spinner for indeterminate operations"""
return console.status(f"[bold blue]{description}")
class AuditLogger:
"""Tamper-evident audit logging for CLI operations"""
def __init__(self, log_dir: Optional[Path] = None):
# Import secure audit logger
from .secure_audit import SecureAuditLogger
self._secure_logger = SecureAuditLogger(log_dir)
def log(self, action: str, details: dict = None, user: str = None):
"""Log an audit event with cryptographic integrity"""
self._secure_logger.log(action, details, user)
def get_logs(self, limit: int = 50, action_filter: str = None) -> list:
"""Read audit log entries with integrity verification"""
return self._secure_logger.get_logs(limit, action_filter)
def verify_integrity(self) -> Tuple[bool, List[str]]:
"""Verify audit log integrity"""
return self._secure_logger.verify_integrity()
def export_report(self, output_file: Optional[Path] = None) -> Dict:
"""Export comprehensive audit report"""
return self._secure_logger.export_audit_report(output_file)
def search_logs(self, query: str, limit: int = 50) -> List[Dict]:
"""Search audit logs"""
return self._secure_logger.search_logs(query, limit)
def _get_fernet_key(key: str = None) -> bytes:
"""Derive a Fernet key from a password using Argon2 KDF"""
from cryptography.fernet import Fernet
import base64
import secrets
import getpass
if key is None:
# CRITICAL SECURITY FIX: Never use hardcoded keys
# Always require user to provide a password or generate a secure random key
error("❌ CRITICAL: No encryption key provided. This is a security vulnerability.")
error("Please provide a password for encryption.")
key = getpass.getpass("Enter encryption password: ")
if not key:
error("❌ Password cannot be empty for encryption operations.")
raise ValueError("Encryption password is required")
# Use Argon2 for secure key derivation (replaces insecure SHA-256)
try:
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
# Generate a secure salt
salt = secrets.token_bytes(16)
# Derive key using Argon2
ph = PasswordHasher(
time_cost=3, # Number of iterations
memory_cost=65536, # Memory usage in KB
parallelism=4, # Number of parallel threads
hash_len=32, # Output hash length
salt_len=16 # Salt length
)
# Hash the password to get a 32-byte key
hashed_key = ph.hash(key + salt.decode('utf-8'))
# Extract the hash part and convert to bytes suitable for Fernet
key_bytes = hashed_key.encode('utf-8')[:32]
# Ensure we have exactly 32 bytes for Fernet
if len(key_bytes) < 32:
key_bytes += secrets.token_bytes(32 - len(key_bytes))
elif len(key_bytes) > 32:
key_bytes = key_bytes[:32]
return base64.urlsafe_b64encode(key_bytes)
except ImportError:
# Fallback to PBKDF2 if Argon2 is not available
import hashlib
import hmac
warning("⚠️ Argon2 not available, falling back to PBKDF2 (less secure)")
# Generate a secure salt
salt = secrets.token_bytes(16)
# Use PBKDF2 with SHA-256 (better than plain SHA-256)
key_bytes = hashlib.pbkdf2_hmac(
'sha256',
key.encode('utf-8'),
salt,
100000, # 100k iterations
32 # 32-byte key
)
return base64.urlsafe_b64encode(key_bytes)
def encrypt_value(value: str, key: str = None) -> str:
"""Encrypt a value using Fernet symmetric encryption"""
from cryptography.fernet import Fernet
import base64
fernet_key = _get_fernet_key(key)
f = Fernet(fernet_key)
encrypted = f.encrypt(value.encode())
return base64.b64encode(encrypted).decode()
def decrypt_value(encrypted: str, key: str = None) -> str:
"""Decrypt a Fernet-encrypted value"""
from cryptography.fernet import Fernet
import base64
fernet_key = _get_fernet_key(key)
f = Fernet(fernet_key)
data = base64.b64decode(encrypted)
return f.decrypt(data).decode()
def setup_logging(verbosity: int, debug: bool = False) -> str:
"""Setup logging with Rich"""
log_level = "WARNING"
if verbosity >= 3 or debug:
log_level = "DEBUG"
elif verbosity == 2:
log_level = "INFO"
elif verbosity == 1:
log_level = "WARNING"
logging.basicConfig(
level=log_level,
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(console=console, rich_tracebacks=True)]
)
return log_level
def render(data: Any, format_type: str = "table", title: str = None):
"""Format and output data"""
if format_type == "json":
console.print(json.dumps(data, indent=2, default=str))
elif format_type == "yaml":
console.print(yaml.dump(data, default_flow_style=False, sort_keys=False))
elif format_type == "table":
if isinstance(data, dict) and not isinstance(data, list):
# Simple key-value table
table = Table(show_header=False, box=None, title=title)
table.add_column("Key", style="cyan")
table.add_column("Value", style="green")
for key, value in data.items():
if isinstance(value, (dict, list)):
value = json.dumps(value, default=str)
table.add_row(str(key), str(value))
console.print(table)
elif isinstance(data, list) and data:
if all(isinstance(item, dict) for item in data):
# Table from list of dicts
headers = list(data[0].keys())
table = Table()
for header in headers:
table.add_column(header, style="cyan")
for item in data:
row = [str(item.get(h, "")) for h in headers]
table.add_row(*row)
console.print(table)
else:
# Simple list
for item in data:
console.print(f"{item}")
else:
console.print(data)
else:
console.print(data)
# Backward compatibility alias
def output(data: Any, format_type: str = "table", title: str = None):
"""Deprecated: use render() instead - kept for backward compatibility"""
return render(data, format_type, title)
def error(message: str):
"""Print error message"""
console.print(Panel(f"[red]Error: {message}[/red]", title=""))
def success(message: str):
"""Print success message"""
console.print(Panel(f"[green]{message}[/green]", title=""))
def warning(message: str):
"""Print warning message"""
console.print(Panel(f"[yellow]{message}[/yellow]", title="⚠️"))
def retry_with_backoff(
func,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_factor: float = 2.0,
exceptions: tuple = (Exception,)
):
"""
Retry function with exponential backoff
Args:
func: Function to retry
max_retries: Maximum number of retries
base_delay: Initial delay in seconds
max_delay: Maximum delay in seconds
backoff_factor: Multiplier for delay after each retry
exceptions: Tuple of exceptions to catch and retry on
Returns:
Result of function call
"""
last_exception = None
for attempt in range(max_retries + 1):
try:
return func()
except exceptions as e:
last_exception = e
if attempt == max_retries:
error(f"Max retries ({max_retries}) exceeded. Last error: {e}")
raise
# Calculate delay with exponential backoff
delay = min(base_delay * (backoff_factor ** attempt), max_delay)
warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s...")
time.sleep(delay)
raise last_exception
def create_http_client_with_retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
timeout: float = 30.0
):
"""
Create an HTTP client with retry capabilities
Args:
max_retries: Maximum number of retries
base_delay: Initial delay in seconds
max_delay: Maximum delay in seconds
timeout: Request timeout in seconds
Returns:
httpx.Client with retry transport
"""
import httpx
class RetryTransport(httpx.Transport):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.backoff_factor = 2.0
def handle_request(self, request):
last_exception = None
for attempt in range(self.max_retries + 1):
try:
response = super().handle_request(request)
# Check for retryable HTTP status codes
if hasattr(response, 'status_code'):
retryable_codes = {429, 502, 503, 504}
if response.status_code in retryable_codes:
last_exception = httpx.HTTPStatusError(
f"Retryable status code {response.status_code}",
request=request,
response=response
)
if attempt == self.max_retries:
break
delay = min(
self.base_delay * (self.backoff_factor ** attempt),
self.max_delay
)
time.sleep(delay)
continue
return response
except (httpx.NetworkError, httpx.TimeoutException) as e:
last_exception = e
if attempt == self.max_retries:
break
delay = min(
self.base_delay * (self.backoff_factor ** attempt),
self.max_delay
)
time.sleep(delay)
raise last_exception
return httpx.Client(
transport=RetryTransport(),
timeout=timeout
)
from .subprocess import run_subprocess

233
cli/utils/crypto_utils.py Executable file
View File

@@ -0,0 +1,233 @@
"""
Cryptographic Utilities for CLI Security
Provides real signature verification for multisig operations
"""
import hashlib
import secrets
from typing import Dict, Optional, Tuple
from eth_account import Account
from eth_utils import to_checksum_address, keccak
import json
def create_signature_challenge(tx_data: Dict, nonce: str) -> str:
"""
Create a cryptographic challenge for transaction signing
Args:
tx_data: Transaction data to sign
nonce: Unique nonce to prevent replay attacks
Returns:
Challenge string to be signed
"""
# Create deterministic challenge from transaction data
challenge_data = {
"tx_id": tx_data.get("tx_id"),
"to": tx_data.get("to"),
"amount": tx_data.get("amount"),
"nonce": nonce,
"timestamp": tx_data.get("timestamp")
}
# Sort keys for deterministic ordering
challenge_str = json.dumps(challenge_data, sort_keys=True, separators=(',', ':'))
challenge_hash = keccak(challenge_str.encode())
return f"AITBC_MULTISIG_CHALLENGE:{challenge_hash.hex()}"
def verify_signature(
challenge: str,
signature: str,
signer_address: str
) -> bool:
"""
Verify that a signature was created by the specified signer
Args:
challenge: Challenge string that was signed
signature: Hex signature string
signer_address: Expected signer address
Returns:
True if signature is valid
"""
try:
# Remove 0x prefix if present
if signature.startswith("0x"):
signature = signature[2:]
# Convert to bytes
signature_bytes = bytes.fromhex(signature)
# Recover address from signature
message_hash = keccak(challenge.encode())
recovered_address = Account.recover_message(
signable_hash=message_hash,
signature=signature_bytes
)
# Compare with expected signer
return to_checksum_address(recovered_address) == to_checksum_address(signer_address)
except Exception:
return False
def sign_challenge(challenge: str, private_key: str) -> str:
"""
Sign a challenge with a private key
Args:
challenge: Challenge string to sign
private_key: Private key in hex format
Returns:
Signature as hex string
"""
try:
# Remove 0x prefix if present
if private_key.startswith("0x"):
private_key = private_key[2:]
account = Account.from_key("0x" + private_key)
message_hash = keccak(challenge.encode())
signature = account.sign_message(message_hash)
return "0x" + signature.signature.hex()
except Exception as e:
raise ValueError(f"Failed to sign challenge: {e}")
def generate_nonce() -> str:
"""Generate a secure nonce for transaction challenges"""
return secrets.token_hex(16)
def validate_multisig_transaction(tx_data: Dict) -> Tuple[bool, str]:
"""
Validate multisig transaction structure
Args:
tx_data: Transaction data to validate
Returns:
Tuple of (is_valid, error_message)
"""
required_fields = ["tx_id", "to", "amount", "timestamp", "nonce"]
for field in required_fields:
if field not in tx_data:
return False, f"Missing required field: {field}"
# Validate address format
try:
to_checksum_address(tx_data["to"])
except Exception:
return False, "Invalid recipient address format"
# Validate amount
try:
amount = float(tx_data["amount"])
if amount <= 0:
return False, "Amount must be positive"
except Exception:
return False, "Invalid amount format"
return True, ""
class MultisigSecurityManager:
"""Security manager for multisig operations"""
def __init__(self):
self.pending_challenges: Dict[str, Dict] = {}
def create_signing_request(
self,
tx_data: Dict,
multisig_wallet: str
) -> Dict[str, str]:
"""
Create a signing request with cryptographic challenge
Args:
tx_data: Transaction data
multisig_wallet: Multisig wallet identifier
Returns:
Signing request with challenge
"""
# Validate transaction
is_valid, error = validate_multisig_transaction(tx_data)
if not is_valid:
raise ValueError(f"Invalid transaction: {error}")
# Generate nonce and challenge
nonce = generate_nonce()
challenge = create_signature_challenge(tx_data, nonce)
# Store challenge for verification
self.pending_challenges[tx_data["tx_id"]] = {
"challenge": challenge,
"tx_data": tx_data,
"multisig_wallet": multisig_wallet,
"nonce": nonce,
"created_at": secrets.token_hex(8)
}
return {
"tx_id": tx_data["tx_id"],
"challenge": challenge,
"nonce": nonce,
"signers_required": len(tx_data.get("required_signers", [])),
"message": f"Please sign this challenge to authorize transaction {tx_data['tx_id']}"
}
def verify_and_add_signature(
self,
tx_id: str,
signature: str,
signer_address: str
) -> Tuple[bool, str]:
"""
Verify signature and add to transaction
Args:
tx_id: Transaction ID
signature: Signature to verify
signer_address: Address of signer
Returns:
Tuple of (success, message)
"""
if tx_id not in self.pending_challenges:
return False, "Transaction not found or expired"
challenge_data = self.pending_challenges[tx_id]
challenge = challenge_data["challenge"]
# Verify signature
if not verify_signature(challenge, signature, signer_address):
return False, f"Invalid signature for signer {signer_address}"
# Check if signer is authorized
tx_data = challenge_data["tx_data"]
authorized_signers = tx_data.get("required_signers", [])
if signer_address not in authorized_signers:
return False, f"Signer {signer_address} is not authorized"
return True, "Signature verified successfully"
def cleanup_challenge(self, tx_id: str):
"""Clean up challenge after transaction completion"""
if tx_id in self.pending_challenges:
del self.pending_challenges[tx_id]
# Global security manager instance
multisig_security = MultisigSecurityManager()

335
cli/utils/secure_audit.py Executable file
View File

@@ -0,0 +1,335 @@
"""
Tamper-Evident Audit Logger
Provides cryptographic integrity for audit logs
"""
import json
import hashlib
import secrets
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from eth_utils import keccak
class SecureAuditLogger:
"""
Tamper-evident audit logger with cryptographic integrity
Each entry includes hash of previous entry for chain integrity
"""
def __init__(self, log_dir: Optional[Path] = None):
self.log_dir = log_dir or Path.home() / ".aitbc" / "audit"
self.log_dir.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_dir / "audit_secure.jsonl"
self.integrity_file = self.log_dir / "integrity.json"
# Initialize integrity tracking
self._init_integrity()
def _init_integrity(self):
"""Initialize integrity tracking"""
if not self.integrity_file.exists():
integrity_data = {
"genesis_hash": None,
"last_hash": None,
"entry_count": 0,
"created_at": datetime.utcnow().isoformat(),
"version": "1.0"
}
with open(self.integrity_file, "w") as f:
json.dump(integrity_data, f, indent=2)
def _get_integrity_data(self) -> Dict:
"""Get current integrity data"""
with open(self.integrity_file, "r") as f:
return json.load(f)
def _update_integrity(self, entry_hash: str):
"""Update integrity tracking"""
integrity_data = self._get_integrity_data()
if integrity_data["genesis_hash"] is None:
integrity_data["genesis_hash"] = entry_hash
integrity_data["last_hash"] = entry_hash
integrity_data["entry_count"] += 1
integrity_data["last_updated"] = datetime.utcnow().isoformat()
with open(self.integrity_file, "w") as f:
json.dump(integrity_data, f, indent=2)
def _create_entry_hash(self, entry: Dict, previous_hash: Optional[str] = None) -> str:
"""
Create cryptographic hash for audit entry
Args:
entry: Audit entry data
previous_hash: Hash of previous entry for chain integrity
Returns:
Entry hash
"""
# Create canonical representation
entry_data = {
"timestamp": entry["timestamp"],
"action": entry["action"],
"user": entry["user"],
"details": entry["details"],
"previous_hash": previous_hash,
"nonce": entry.get("nonce", "")
}
# Sort keys for deterministic ordering
entry_str = json.dumps(entry_data, sort_keys=True, separators=(',', ':'))
return keccak(entry_str.encode()).hex()
def log(self, action: str, details: dict = None, user: str = None):
"""
Log an audit event with cryptographic integrity
Args:
action: Action being logged
details: Additional details
user: User performing action
"""
# Get previous hash for chain integrity
integrity_data = self._get_integrity_data()
previous_hash = integrity_data["last_hash"]
# Create audit entry
entry = {
"timestamp": datetime.utcnow().isoformat(),
"action": action,
"user": user or "unknown",
"details": details or {},
"nonce": secrets.token_hex(16)
}
# Create entry hash
entry_hash = self._create_entry_hash(entry, previous_hash)
entry["entry_hash"] = entry_hash
# Write to log file
with open(self.log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
# Update integrity tracking
self._update_integrity(entry_hash)
def verify_integrity(self) -> Tuple[bool, List[str]]:
"""
Verify the integrity of the entire audit log
Returns:
Tuple of (is_valid, issues)
"""
if not self.log_file.exists():
return True, ["No audit log exists"]
issues = []
previous_hash = None
entry_count = 0
try:
with open(self.log_file, "r") as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
entry = json.loads(line)
entry_count += 1
# Verify entry hash
expected_hash = self._create_entry_hash(entry, previous_hash)
actual_hash = entry.get("entry_hash")
if actual_hash != expected_hash:
issues.append(f"Line {line_num}: Hash mismatch - entry may be tampered")
# Verify chain integrity
if previous_hash and entry.get("previous_hash") != previous_hash:
issues.append(f"Line {line_num}: Chain integrity broken")
previous_hash = actual_hash
# Verify against integrity file
integrity_data = self._get_integrity_data()
if integrity_data["entry_count"] != entry_count:
issues.append(f"Entry count mismatch: log has {entry_count}, integrity says {integrity_data['entry_count']}")
if integrity_data["last_hash"] != previous_hash:
issues.append("Final hash mismatch with integrity file")
return len(issues) == 0, issues
except Exception as e:
return False, [f"Verification failed: {str(e)}"]
def get_logs(self, limit: int = 50, action_filter: str = None, verify: bool = True) -> List[Dict]:
"""
Read audit log entries with optional integrity verification
Args:
limit: Maximum number of entries
action_filter: Filter by action type
verify: Whether to verify integrity
Returns:
List of audit entries
"""
if verify:
is_valid, issues = self.verify_integrity()
if not is_valid:
raise ValueError(f"Audit log integrity compromised: {issues}")
if not self.log_file.exists():
return []
entries = []
with open(self.log_file) as f:
for line in f:
line = line.strip()
if line:
entry = json.loads(line)
if action_filter and entry.get("action") != action_filter:
continue
entries.append(entry)
return entries[-limit:]
def export_audit_report(self, output_file: Optional[Path] = None) -> Dict:
"""
Export comprehensive audit report with integrity verification
Args:
output_file: Optional file to write report
Returns:
Audit report data
"""
# Verify integrity
is_valid, issues = self.verify_integrity()
# Get statistics
all_entries = self.get_logs(limit=10000, verify=False) # Don't double-verify
# Action statistics
action_counts = {}
user_counts = {}
hourly_counts = {}
for entry in all_entries:
# Action counts
action = entry.get("action", "unknown")
action_counts[action] = action_counts.get(action, 0) + 1
# User counts
user = entry.get("user", "unknown")
user_counts[user] = user_counts.get(user, 0) + 1
# Hourly counts
try:
hour = entry["timestamp"][:13] # YYYY-MM-DDTHH
hourly_counts[hour] = hourly_counts.get(hour, 0) + 1
except:
pass
# Create report
report = {
"audit_report": {
"generated_at": datetime.utcnow().isoformat(),
"integrity": {
"is_valid": is_valid,
"issues": issues
},
"statistics": {
"total_entries": len(all_entries),
"unique_actions": len(action_counts),
"unique_users": len(user_counts),
"date_range": {
"first_entry": all_entries[0]["timestamp"] if all_entries else None,
"last_entry": all_entries[-1]["timestamp"] if all_entries else None
}
},
"action_breakdown": action_counts,
"user_breakdown": user_counts,
"recent_activity": hourly_counts
},
"sample_entries": all_entries[-10:] # Last 10 entries
}
# Write to file if specified
if output_file:
with open(output_file, "w") as f:
json.dump(report, f, indent=2)
return report
def search_logs(self, query: str, limit: int = 50) -> List[Dict]:
"""
Search audit logs for specific content
Args:
query: Search query
limit: Maximum results
Returns:
Matching entries
"""
entries = self.get_logs(limit=1000, verify=False) # Get more for search
matches = []
query_lower = query.lower()
for entry in entries:
# Search in action, user, and details
searchable_text = f"{entry.get('action', '')} {entry.get('user', '')} {json.dumps(entry.get('details', {}))}"
if query_lower in searchable_text.lower():
matches.append(entry)
if len(matches) >= limit:
break
return matches
def get_chain_info(self) -> Dict:
"""
Get information about the audit chain
Returns:
Chain information
"""
integrity_data = self._get_integrity_data()
return {
"genesis_hash": integrity_data["genesis_hash"],
"last_hash": integrity_data["last_hash"],
"entry_count": integrity_data["entry_count"],
"created_at": integrity_data["created_at"],
"last_updated": integrity_data.get("last_updated"),
"version": integrity_data["version"],
"log_file": str(self.log_file),
"integrity_file": str(self.integrity_file)
}
# Global secure audit logger instance
secure_audit_logger = SecureAuditLogger()
# Convenience functions for backward compatibility
def log_action(action: str, details: dict = None, user: str = None):
"""Log an action with secure audit logger"""
secure_audit_logger.log(action, details, user)
def verify_audit_integrity() -> Tuple[bool, List[str]]:
"""Verify audit log integrity"""
return secure_audit_logger.verify_integrity()
def get_audit_logs(limit: int = 50, action_filter: str = None) -> List[Dict]:
"""Get audit logs with integrity verification"""
return secure_audit_logger.get_logs(limit, action_filter)

280
cli/utils/security.py Executable file
View File

@@ -0,0 +1,280 @@
"""
Secure Encryption Utilities - Fixed Version
Replaces the broken encryption in utils/__init__.py
"""
import base64
import hashlib
import secrets
from typing import Optional, Dict, Any
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
def derive_secure_key(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
"""
Derive secure encryption key using PBKDF2 with SHA-256
Args:
password: User password (required - no defaults)
salt: Optional salt (generated if not provided)
Returns:
Tuple of (fernet_key, salt)
Raises:
ValueError: If password is empty or too weak
"""
if not password or len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
if salt is None:
salt = secrets.token_bytes(32)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=600_000, # OWASP recommended minimum
)
key = kdf.derive(password.encode())
fernet_key = base64.urlsafe_b64encode(key)
return fernet_key, salt
def encrypt_value(value: str, password: str) -> Dict[str, str]:
"""
Encrypt a value using PBKDF2 + Fernet (no more hardcoded keys)
Args:
value: Value to encrypt
password: Strong password (required)
Returns:
Dict with encrypted data and metadata
Raises:
ValueError: If password is too weak
"""
if not value:
raise ValueError("Cannot encrypt empty value")
# Derive secure key
fernet_key, salt = derive_secure_key(password)
# Encrypt
f = Fernet(fernet_key)
encrypted = f.encrypt(value.encode())
# Fernet already returns base64, no double encoding
return {
"encrypted_data": encrypted.decode(),
"salt": base64.b64encode(salt).decode(),
"algorithm": "PBKDF2-SHA256-Fernet",
"iterations": 600_000,
"version": "1.0"
}
def decrypt_value(encrypted_data: Dict[str, str] | str, password: str) -> str:
"""
Decrypt a PBKDF2 + Fernet encrypted value
Args:
encrypted_data: Dict with encrypted data or legacy string
password: Password used for encryption
Returns:
Decrypted value
Raises:
ValueError: If decryption fails or password is wrong
InvalidToken: If the encrypted data is corrupted
"""
# Handle legacy format (backward compatibility)
if isinstance(encrypted_data, str):
# This is the old broken format - we can't decrypt it securely
raise ValueError(
"Legacy encrypted format detected. "
"This data was encrypted with a broken implementation and cannot be securely recovered. "
"Please recreate the wallet with proper encryption."
)
try:
# Extract salt and encrypted data
salt = base64.b64decode(encrypted_data["salt"])
encrypted = encrypted_data["encrypted_data"].encode()
# Derive same key
fernet_key, _ = derive_secure_key(password, salt)
# Decrypt
f = Fernet(fernet_key)
decrypted = f.decrypt(encrypted)
return decrypted.decode()
except InvalidToken:
raise ValueError("Invalid password or corrupted encrypted data")
except Exception as e:
raise ValueError(f"Decryption failed: {str(e)}")
def validate_password_strength(password: str) -> Dict[str, Any]:
"""
Validate password strength
Args:
password: Password to validate
Returns:
Dict with validation results
"""
issues = []
score = 0
if len(password) < 8:
issues.append("Password must be at least 8 characters")
else:
score += 1
if len(password) < 12:
issues.append("Consider using 12+ characters for better security")
else:
score += 1
if not any(c.isupper() for c in password):
issues.append("Include uppercase letters")
else:
score += 1
if not any(c.islower() for c in password):
issues.append("Include lowercase letters")
else:
score += 1
if not any(c.isdigit() for c in password):
issues.append("Include numbers")
else:
score += 1
if not any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password):
issues.append("Include special characters")
else:
score += 1
# Check for common patterns
if password.lower() in ["password", "123456", "qwerty", "admin"]:
issues.append("Avoid common passwords")
score = 0
strength_levels = {
0: "Very Weak",
1: "Weak",
2: "Fair",
3: "Good",
4: "Strong",
5: "Very Strong",
6: "Excellent"
}
return {
"score": score,
"strength": strength_levels.get(score, "Unknown"),
"issues": issues,
"is_acceptable": score >= 3
}
def generate_secure_password(length: int = 16) -> str:
"""
Generate a secure random password
Args:
length: Password length
Returns:
Secure random password
"""
alphabet = (
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"0123456789"
"!@#$%^&*()_+-=[]{}|;:,.<>?"
)
password = ''.join(secrets.choice(alphabet) for _ in range(length))
# Ensure it meets minimum requirements
while not validate_password_strength(password)["is_acceptable"]:
password = ''.join(secrets.choice(alphabet) for _ in range(length))
return password
# Migration helper for existing wallets
def migrate_legacy_wallet(legacy_data: Dict[str, Any], new_password: str) -> Dict[str, Any]:
"""
Migrate a wallet from broken encryption to secure encryption
Args:
legacy_data: Legacy wallet data with broken encryption
new_password: New strong password
Returns:
Migrated wallet data
Raises:
ValueError: If migration cannot be performed safely
"""
# Check if this is legacy format
if "encrypted" not in legacy_data or not legacy_data.get("encrypted"):
raise ValueError("Not a legacy encrypted wallet")
if "private_key" not in legacy_data:
raise ValueError("Cannot migrate wallet without private key")
# The legacy wallet might have a plaintext private key
# If it's truly encrypted with the broken method, we cannot recover it
private_key = legacy_data["private_key"]
if private_key.startswith("[ENCRYPTED_MOCK]") or private_key.startswith("["):
# This was never actually encrypted - it's a mock
raise ValueError(
"Cannot migrate mock wallet. "
"Please create a new wallet with proper key generation."
)
# If we get here, we have a plaintext private key (security issue!)
# Re-encrypt it properly
try:
encrypted_data = encrypt_value(private_key, new_password)
return {
**legacy_data,
"private_key": encrypted_data,
"encryption_version": "1.0",
"migration_timestamp": secrets.token_hex(16)
}
except Exception as e:
raise ValueError(f"Migration failed: {str(e)}")
# Security constants
class EncryptionConfig:
"""Encryption configuration constants"""
PBKDF2_ITERATIONS = 600_000
SALT_LENGTH = 32
MIN_PASSWORD_LENGTH = 8
RECOMMENDED_PASSWORD_LENGTH = 16
# Algorithm identifiers
ALGORITHM_PBKDF2_FERNET = "PBKDF2-SHA256-Fernet"
ALGORITHM_LEGACY = "LEGACY-BROKEN"
# Version tracking
CURRENT_VERSION = "1.0"
LEGACY_VERSIONS = ["0.9", "legacy", "broken"]

31
cli/utils/subprocess.py Normal file
View File

@@ -0,0 +1,31 @@
import subprocess
import sys
from typing import List, Optional, Union, Any
from . import error, output
def run_subprocess(cmd: List[str], check: bool = True, capture_output: bool = True, shell: bool = False, **kwargs: Any) -> Optional[Union[str, subprocess.CompletedProcess]]:
"""Run a subprocess command safely with logging"""
try:
if shell:
# When shell=True, cmd should be a string
cmd_str = " ".join(cmd) if isinstance(cmd, list) else cmd
result = subprocess.run(cmd_str, shell=True, check=check, capture_output=capture_output, text=True, **kwargs)
else:
result = subprocess.run(cmd, check=check, capture_output=capture_output, text=True, **kwargs)
if capture_output:
return result.stdout.strip()
return result
except subprocess.CalledProcessError as e:
error(f"Command failed with exit code {e.returncode}")
if capture_output and getattr(e, 'stderr', None):
print(e.stderr, file=sys.stderr)
if check:
sys.exit(e.returncode)
return getattr(e, 'stdout', None) if capture_output else None
except Exception as e:
error(f"Failed to execute command: {e}")
if check:
sys.exit(1)
return None