Files
aitbc/cli/aitbc_cli/security/translation_policy.py
oib f353e00172 chore(security): enhance environment configuration, CI workflows, and wallet daemon with security improvements
- Restructure .env.example with security-focused documentation, service-specific environment file references, and AWS Secrets Manager integration
- Update CLI tests workflow to single Python 3.13 version, add pytest-mock dependency, and consolidate test execution with coverage
- Add comprehensive security validation to package publishing workflow with manual approval gates, secret scanning, and release
2026-03-03 10:33:46 +01:00

421 lines
17 KiB
Python

"""
AITBC CLI Translation Security Policy
This module implements strict security controls for CLI translation functionality,
ensuring that translation services never compromise security-sensitive operations.
"""
import os
import logging
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
import asyncio
from pathlib import Path
logger = logging.getLogger(__name__)
class SecurityLevel(Enum):
"""Security levels for CLI operations"""
CRITICAL = "critical" # Security-sensitive commands (agent strategy, wallet operations)
HIGH = "high" # Important operations (deployment, configuration)
MEDIUM = "medium" # Standard operations (monitoring, reporting)
LOW = "low" # Informational operations (help, status)
class TranslationMode(Enum):
"""Translation operation modes"""
DISABLED = "disabled" # No translation allowed
LOCAL_ONLY = "local_only" # Only local translation (no external APIs)
FALLBACK = "fallback" # External APIs with local fallback
FULL = "full" # Full translation capabilities
@dataclass
class SecurityPolicy:
"""Security policy for translation usage"""
security_level: SecurityLevel
translation_mode: TranslationMode
allow_external_apis: bool
require_explicit_consent: bool
timeout_seconds: int
max_retries: int
cache_translations: bool
@dataclass
class TranslationRequest:
"""Translation request with security context"""
text: str
target_language: str
source_language: str = "en"
command_name: Optional[str] = None
security_level: SecurityLevel = SecurityLevel.MEDIUM
user_consent: bool = False
@dataclass
class TranslationResponse:
"""Translation response with security metadata"""
translated_text: str
success: bool
method_used: str
security_compliant: bool
warning_messages: List[str]
fallback_used: bool
class CLITranslationSecurityManager:
"""
Security manager for CLI translation operations
Enforces strict policies to ensure translation never compromises
security-sensitive operations.
"""
def __init__(self, config_path: Optional[Path] = None):
self.config_path = config_path or Path.home() / ".aitbc" / "translation_security.json"
self.policies = self._load_default_policies()
self.security_log = []
def _load_default_policies(self) -> Dict[SecurityLevel, SecurityPolicy]:
"""Load default security policies"""
return {
SecurityLevel.CRITICAL: SecurityPolicy(
security_level=SecurityLevel.CRITICAL,
translation_mode=TranslationMode.DISABLED,
allow_external_apis=False,
require_explicit_consent=True,
timeout_seconds=0,
max_retries=0,
cache_translations=False
),
SecurityLevel.HIGH: SecurityPolicy(
security_level=SecurityLevel.HIGH,
translation_mode=TranslationMode.LOCAL_ONLY,
allow_external_apis=False,
require_explicit_consent=True,
timeout_seconds=5,
max_retries=1,
cache_translations=True
),
SecurityLevel.MEDIUM: SecurityPolicy(
security_level=SecurityLevel.MEDIUM,
translation_mode=TranslationMode.FALLBACK,
allow_external_apis=True,
require_explicit_consent=False,
timeout_seconds=10,
max_retries=2,
cache_translations=True
),
SecurityLevel.LOW: SecurityPolicy(
security_level=SecurityLevel.LOW,
translation_mode=TranslationMode.FULL,
allow_external_apis=True,
require_explicit_consent=False,
timeout_seconds=15,
max_retries=3,
cache_translations=True
)
}
def get_command_security_level(self, command_name: str) -> SecurityLevel:
"""Determine security level for a command"""
# Critical security-sensitive commands
critical_commands = {
'agent', 'strategy', 'wallet', 'sign', 'deploy', 'genesis',
'transfer', 'send', 'approve', 'mint', 'burn', 'stake'
}
# High importance commands
high_commands = {
'config', 'node', 'chain', 'marketplace', 'swap', 'liquidity',
'governance', 'vote', 'proposal'
}
# Medium importance commands
medium_commands = {
'balance', 'status', 'monitor', 'analytics', 'logs', 'history',
'simulate', 'test'
}
# Low importance commands (informational)
low_commands = {
'help', 'version', 'info', 'list', 'show', 'explain'
}
command_base = command_name.split()[0].lower()
if command_base in critical_commands:
return SecurityLevel.CRITICAL
elif command_base in high_commands:
return SecurityLevel.HIGH
elif command_base in medium_commands:
return SecurityLevel.MEDIUM
elif command_base in low_commands:
return SecurityLevel.LOW
else:
# Default to medium for unknown commands
return SecurityLevel.MEDIUM
async def translate_with_security(self, request: TranslationRequest) -> TranslationResponse:
"""
Translate text with security enforcement
Args:
request: Translation request with security context
Returns:
Translation response with security metadata
"""
# Determine security level if not provided
if request.security_level == SecurityLevel.MEDIUM and request.command_name:
request.security_level = self.get_command_security_level(request.command_name)
policy = self.policies[request.security_level]
warnings = []
# Log security check
self._log_security_check(request, policy)
# Check if translation is allowed
if policy.translation_mode == TranslationMode.DISABLED:
return TranslationResponse(
translated_text=request.text, # Return original
success=True,
method_used="disabled",
security_compliant=True,
warning_messages=["Translation disabled for security-sensitive operation"],
fallback_used=False
)
# Check user consent for high-security operations
if policy.require_explicit_consent and not request.user_consent:
return TranslationResponse(
translated_text=request.text, # Return original
success=True,
method_used="consent_required",
security_compliant=True,
warning_messages=["User consent required for translation"],
fallback_used=False
)
# Attempt translation based on policy
try:
if policy.translation_mode == TranslationMode.LOCAL_ONLY:
result = await self._local_translate(request)
method_used = "local"
elif policy.translation_mode == TranslationMode.FALLBACK:
# Try external first, fallback to local
result, fallback_used = await self._external_translate_with_fallback(request, policy)
method_used = "external_fallback"
else: # FULL
result = await self._external_translate(request, policy)
method_used = "external"
fallback_used = False
return TranslationResponse(
translated_text=result,
success=True,
method_used=method_used,
security_compliant=True,
warning_messages=warnings,
fallback_used=fallback_used if method_used == "external_fallback" else False
)
except Exception as e:
logger.error(f"Translation failed: {e}")
warnings.append(f"Translation failed: {str(e)}")
# Always fallback to original text for security
return TranslationResponse(
translated_text=request.text,
success=False,
method_used="error_fallback",
security_compliant=True,
warning_messages=warnings + ["Falling back to original text for security"],
fallback_used=True
)
async def _local_translate(self, request: TranslationRequest) -> str:
"""Local translation without external APIs"""
# Simple local translation dictionary for common terms
local_translations = {
# Help messages
"help": {"es": "ayuda", "fr": "aide", "de": "hilfe", "zh": "帮助"},
"error": {"es": "error", "fr": "erreur", "de": "fehler", "zh": "错误"},
"success": {"es": "éxito", "fr": "succès", "de": "erfolg", "zh": "成功"},
"warning": {"es": "advertencia", "fr": "avertissement", "de": "warnung", "zh": "警告"},
"status": {"es": "estado", "fr": "statut", "de": "status", "zh": "状态"},
"balance": {"es": "saldo", "fr": "solde", "de": "guthaben", "zh": "余额"},
"wallet": {"es": "cartera", "fr": "portefeuille", "de": "börse", "zh": "钱包"},
"transaction": {"es": "transacción", "fr": "transaction", "de": "transaktion", "zh": "交易"},
"blockchain": {"es": "cadena de bloques", "fr": "chaîne de blocs", "de": "blockchain", "zh": "区块链"},
"agent": {"es": "agente", "fr": "agent", "de": "agent", "zh": "代理"},
}
# Simple word-by-word translation
words = request.text.lower().split()
translated_words = []
for word in words:
if word in local_translations and request.target_language in local_translations[word]:
translated_words.append(local_translations[word][request.target_language])
else:
translated_words.append(word) # Keep original if no translation
return " ".join(translated_words)
async def _external_translate_with_fallback(self, request: TranslationRequest, policy: SecurityPolicy) -> tuple[str, bool]:
"""External translation with local fallback"""
try:
# Try external translation first
result = await self._external_translate(request, policy)
return result, False
except Exception as e:
logger.warning(f"External translation failed, using local fallback: {e}")
result = await self._local_translate(request)
return result, True
async def _external_translate(self, request: TranslationRequest, policy: SecurityPolicy) -> str:
"""External translation with timeout and retry logic"""
if not policy.allow_external_apis:
raise Exception("External APIs not allowed for this security level")
# This would integrate with external translation services
# For security, we'll implement a mock that demonstrates the pattern
await asyncio.sleep(0.1) # Simulate API call
# Mock translation - in reality, this would call external APIs
return f"[Translated to {request.target_language}: {request.text}]"
def _log_security_check(self, request: TranslationRequest, policy: SecurityPolicy):
"""Log security check for audit trail"""
log_entry = {
"timestamp": asyncio.get_event_loop().time(),
"command": request.command_name,
"security_level": request.security_level.value,
"translation_mode": policy.translation_mode.value,
"target_language": request.target_language,
"user_consent": request.user_consent,
"text_length": len(request.text)
}
self.security_log.append(log_entry)
# Keep only last 1000 entries
if len(self.security_log) > 1000:
self.security_log = self.security_log[-1000:]
def get_security_summary(self) -> Dict:
"""Get summary of security checks"""
if not self.security_log:
return {"total_checks": 0, "message": "No security checks performed"}
total_checks = len(self.security_log)
by_level = {}
by_language = {}
for entry in self.security_log:
level = entry["security_level"]
lang = entry["target_language"]
by_level[level] = by_level.get(level, 0) + 1
by_language[lang] = by_language.get(lang, 0) + 1
return {
"total_checks": total_checks,
"by_security_level": by_level,
"by_target_language": by_language,
"recent_checks": self.security_log[-10:] # Last 10 checks
}
def is_translation_allowed(self, command_name: str, target_language: str) -> bool:
"""Quick check if translation is allowed for a command"""
security_level = self.get_command_security_level(command_name)
policy = self.policies[security_level]
return policy.translation_mode != TranslationMode.DISABLED
def get_security_policy_for_command(self, command_name: str) -> SecurityPolicy:
"""Get security policy for a specific command"""
security_level = self.get_command_security_level(command_name)
return self.policies[security_level]
# Global security manager instance
cli_translation_security = CLITranslationSecurityManager()
# Decorator for CLI commands to enforce translation security
def secure_translation(allowed_languages: Optional[List[str]] = None, require_consent: bool = False):
"""
Decorator to enforce translation security on CLI commands
Args:
allowed_languages: List of allowed target languages
require_consent: Whether to require explicit user consent
"""
def decorator(func):
async def wrapper(*args, **kwargs):
# This would integrate with the CLI command framework
# to enforce translation policies
return await func(*args, **kwargs)
return wrapper
return decorator
# Security policy configuration functions
def configure_translation_security(
critical_level: str = "disabled",
high_level: str = "local_only",
medium_level: str = "fallback",
low_level: str = "full"
):
"""Configure translation security policies"""
mode_mapping = {
"disabled": TranslationMode.DISABLED,
"local_only": TranslationMode.LOCAL_ONLY,
"fallback": TranslationMode.FALLBACK,
"full": TranslationMode.FULL
}
cli_translation_security.policies[SecurityLevel.CRITICAL].translation_mode = mode_mapping[critical_level]
cli_translation_security.policies[SecurityLevel.HIGH].translation_mode = mode_mapping[high_level]
cli_translation_security.policies[SecurityLevel.MEDIUM].translation_mode = mode_mapping[medium_level]
cli_translation_security.policies[SecurityLevel.LOW].translation_mode = mode_mapping[low_level]
def get_translation_security_report() -> Dict:
"""Get comprehensive translation security report"""
return {
"security_policies": {
level.value: policy.translation_mode.value
for level, policy in cli_translation_security.policies.items()
},
"security_summary": cli_translation_security.get_security_summary(),
"critical_commands": [
cmd for cmd in ['agent', 'strategy', 'wallet', 'sign', 'deploy']
if cli_translation_security.get_command_security_level(cmd) == SecurityLevel.CRITICAL
],
"recommendations": _get_security_recommendations()
}
def _get_security_recommendations() -> List[str]:
"""Get security recommendations"""
recommendations = []
# Check if critical commands have proper restrictions
for cmd in ['agent', 'strategy', 'wallet', 'sign']:
if cli_translation_security.is_translation_allowed(cmd, 'es'):
recommendations.append(f"Consider disabling translation for '{cmd}' command")
# Check for external API usage in sensitive operations
critical_policy = cli_translation_security.policies[SecurityLevel.CRITICAL]
if critical_policy.allow_external_apis:
recommendations.append("External APIs should be disabled for critical operations")
return recommendations