BEFORE: /opt/aitbc/cli/ ├── aitbc_cli/ # Python package (box in a box) │ ├── commands/ │ ├── main.py │ └── ... ├── setup.py AFTER: /opt/aitbc/cli/ # Flat structure ├── commands/ # Direct access ├── main.py # Direct access ├── auth/ ├── config/ ├── core/ ├── models/ ├── utils/ ├── plugins.py └── setup.py CHANGES MADE: - Moved all files from aitbc_cli/ to cli/ root - Fixed all relative imports (from . to absolute imports) - Updated setup.py entry point: aitbc_cli.main → main - Added CLI directory to Python path in entry script - Simplified deployment.py to remove dependency on deleted core.deployment - Fixed import paths in all command files - Recreated virtual environment with new structure BENEFITS: - Eliminated 'box in a box' nesting - Simpler directory structure - Direct access to all modules - Cleaner imports - Easier maintenance and development - CLI works with both 'python main.py' and 'aitbc' commands
421 lines
17 KiB
Python
Executable File
421 lines
17 KiB
Python
Executable File
"""
|
|
AITBC CLI Translation Security Policy
|
|
|
|
This module implements strict security controls for CLI translation functionality,
|
|
ensuring that translation services never compromise security-sensitive operations.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import Dict, List, Optional, Union
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import asyncio
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SecurityLevel(Enum):
|
|
"""Security levels for CLI operations"""
|
|
CRITICAL = "critical" # Security-sensitive commands (agent strategy, wallet operations)
|
|
HIGH = "high" # Important operations (deployment, configuration)
|
|
MEDIUM = "medium" # Standard operations (monitoring, reporting)
|
|
LOW = "low" # Informational operations (help, status)
|
|
|
|
|
|
class TranslationMode(Enum):
|
|
"""Translation operation modes"""
|
|
DISABLED = "disabled" # No translation allowed
|
|
LOCAL_ONLY = "local_only" # Only local translation (no external APIs)
|
|
FALLBACK = "fallback" # External APIs with local fallback
|
|
FULL = "full" # Full translation capabilities
|
|
|
|
|
|
@dataclass
|
|
class SecurityPolicy:
|
|
"""Security policy for translation usage"""
|
|
security_level: SecurityLevel
|
|
translation_mode: TranslationMode
|
|
allow_external_apis: bool
|
|
require_explicit_consent: bool
|
|
timeout_seconds: int
|
|
max_retries: int
|
|
cache_translations: bool
|
|
|
|
|
|
@dataclass
|
|
class TranslationRequest:
|
|
"""Translation request with security context"""
|
|
text: str
|
|
target_language: str
|
|
source_language: str = "en"
|
|
command_name: Optional[str] = None
|
|
security_level: SecurityLevel = SecurityLevel.MEDIUM
|
|
user_consent: bool = False
|
|
|
|
|
|
@dataclass
|
|
class TranslationResponse:
|
|
"""Translation response with security metadata"""
|
|
translated_text: str
|
|
success: bool
|
|
method_used: str
|
|
security_compliant: bool
|
|
warning_messages: List[str]
|
|
fallback_used: bool
|
|
|
|
|
|
class CLITranslationSecurityManager:
|
|
"""
|
|
Security manager for CLI translation operations
|
|
|
|
Enforces strict policies to ensure translation never compromises
|
|
security-sensitive operations.
|
|
"""
|
|
|
|
def __init__(self, config_path: Optional[Path] = None):
|
|
self.config_path = config_path or Path.home() / ".aitbc" / "translation_security.json"
|
|
self.policies = self._load_default_policies()
|
|
self.security_log = []
|
|
|
|
def _load_default_policies(self) -> Dict[SecurityLevel, SecurityPolicy]:
|
|
"""Load default security policies"""
|
|
return {
|
|
SecurityLevel.CRITICAL: SecurityPolicy(
|
|
security_level=SecurityLevel.CRITICAL,
|
|
translation_mode=TranslationMode.DISABLED,
|
|
allow_external_apis=False,
|
|
require_explicit_consent=True,
|
|
timeout_seconds=0,
|
|
max_retries=0,
|
|
cache_translations=False
|
|
),
|
|
SecurityLevel.HIGH: SecurityPolicy(
|
|
security_level=SecurityLevel.HIGH,
|
|
translation_mode=TranslationMode.LOCAL_ONLY,
|
|
allow_external_apis=False,
|
|
require_explicit_consent=True,
|
|
timeout_seconds=5,
|
|
max_retries=1,
|
|
cache_translations=True
|
|
),
|
|
SecurityLevel.MEDIUM: SecurityPolicy(
|
|
security_level=SecurityLevel.MEDIUM,
|
|
translation_mode=TranslationMode.FALLBACK,
|
|
allow_external_apis=True,
|
|
require_explicit_consent=False,
|
|
timeout_seconds=10,
|
|
max_retries=2,
|
|
cache_translations=True
|
|
),
|
|
SecurityLevel.LOW: SecurityPolicy(
|
|
security_level=SecurityLevel.LOW,
|
|
translation_mode=TranslationMode.FULL,
|
|
allow_external_apis=True,
|
|
require_explicit_consent=False,
|
|
timeout_seconds=15,
|
|
max_retries=3,
|
|
cache_translations=True
|
|
)
|
|
}
|
|
|
|
def get_command_security_level(self, command_name: str) -> SecurityLevel:
|
|
"""Determine security level for a command"""
|
|
# Critical security-sensitive commands
|
|
critical_commands = {
|
|
'agent', 'strategy', 'wallet', 'sign', 'deploy', 'genesis',
|
|
'transfer', 'send', 'approve', 'mint', 'burn', 'stake'
|
|
}
|
|
|
|
# High importance commands
|
|
high_commands = {
|
|
'config', 'node', 'chain', 'marketplace', 'swap', 'liquidity',
|
|
'governance', 'vote', 'proposal'
|
|
}
|
|
|
|
# Medium importance commands
|
|
medium_commands = {
|
|
'balance', 'status', 'monitor', 'analytics', 'logs', 'history',
|
|
'simulate', 'test'
|
|
}
|
|
|
|
# Low importance commands (informational)
|
|
low_commands = {
|
|
'help', 'version', 'info', 'list', 'show', 'explain'
|
|
}
|
|
|
|
command_base = command_name.split()[0].lower()
|
|
|
|
if command_base in critical_commands:
|
|
return SecurityLevel.CRITICAL
|
|
elif command_base in high_commands:
|
|
return SecurityLevel.HIGH
|
|
elif command_base in medium_commands:
|
|
return SecurityLevel.MEDIUM
|
|
elif command_base in low_commands:
|
|
return SecurityLevel.LOW
|
|
else:
|
|
# Default to medium for unknown commands
|
|
return SecurityLevel.MEDIUM
|
|
|
|
async def translate_with_security(self, request: TranslationRequest) -> TranslationResponse:
|
|
"""
|
|
Translate text with security enforcement
|
|
|
|
Args:
|
|
request: Translation request with security context
|
|
|
|
Returns:
|
|
Translation response with security metadata
|
|
"""
|
|
# Determine security level if not provided
|
|
if request.security_level == SecurityLevel.MEDIUM and request.command_name:
|
|
request.security_level = self.get_command_security_level(request.command_name)
|
|
|
|
policy = self.policies[request.security_level]
|
|
warnings = []
|
|
|
|
# Log security check
|
|
self._log_security_check(request, policy)
|
|
|
|
# Check if translation is allowed
|
|
if policy.translation_mode == TranslationMode.DISABLED:
|
|
return TranslationResponse(
|
|
translated_text=request.text, # Return original
|
|
success=True,
|
|
method_used="disabled",
|
|
security_compliant=True,
|
|
warning_messages=["Translation disabled for security-sensitive operation"],
|
|
fallback_used=False
|
|
)
|
|
|
|
# Check user consent for high-security operations
|
|
if policy.require_explicit_consent and not request.user_consent:
|
|
return TranslationResponse(
|
|
translated_text=request.text, # Return original
|
|
success=True,
|
|
method_used="consent_required",
|
|
security_compliant=True,
|
|
warning_messages=["User consent required for translation"],
|
|
fallback_used=False
|
|
)
|
|
|
|
# Attempt translation based on policy
|
|
try:
|
|
if policy.translation_mode == TranslationMode.LOCAL_ONLY:
|
|
result = await self._local_translate(request)
|
|
method_used = "local"
|
|
elif policy.translation_mode == TranslationMode.FALLBACK:
|
|
# Try external first, fallback to local
|
|
result, fallback_used = await self._external_translate_with_fallback(request, policy)
|
|
method_used = "external_fallback"
|
|
else: # FULL
|
|
result = await self._external_translate(request, policy)
|
|
method_used = "external"
|
|
fallback_used = False
|
|
|
|
return TranslationResponse(
|
|
translated_text=result,
|
|
success=True,
|
|
method_used=method_used,
|
|
security_compliant=True,
|
|
warning_messages=warnings,
|
|
fallback_used=fallback_used if method_used == "external_fallback" else False
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Translation failed: {e}")
|
|
warnings.append(f"Translation failed: {str(e)}")
|
|
|
|
# Always fallback to original text for security
|
|
return TranslationResponse(
|
|
translated_text=request.text,
|
|
success=False,
|
|
method_used="error_fallback",
|
|
security_compliant=True,
|
|
warning_messages=warnings + ["Falling back to original text for security"],
|
|
fallback_used=True
|
|
)
|
|
|
|
async def _local_translate(self, request: TranslationRequest) -> str:
|
|
"""Local translation without external APIs"""
|
|
# Simple local translation dictionary for common terms
|
|
local_translations = {
|
|
# Help messages
|
|
"help": {"es": "ayuda", "fr": "aide", "de": "hilfe", "zh": "帮助"},
|
|
"error": {"es": "error", "fr": "erreur", "de": "fehler", "zh": "错误"},
|
|
"success": {"es": "éxito", "fr": "succès", "de": "erfolg", "zh": "成功"},
|
|
"warning": {"es": "advertencia", "fr": "avertissement", "de": "warnung", "zh": "警告"},
|
|
"status": {"es": "estado", "fr": "statut", "de": "status", "zh": "状态"},
|
|
"balance": {"es": "saldo", "fr": "solde", "de": "guthaben", "zh": "余额"},
|
|
"wallet": {"es": "cartera", "fr": "portefeuille", "de": "börse", "zh": "钱包"},
|
|
"transaction": {"es": "transacción", "fr": "transaction", "de": "transaktion", "zh": "交易"},
|
|
"blockchain": {"es": "cadena de bloques", "fr": "chaîne de blocs", "de": "blockchain", "zh": "区块链"},
|
|
"agent": {"es": "agente", "fr": "agent", "de": "agent", "zh": "代理"},
|
|
}
|
|
|
|
# Simple word-by-word translation
|
|
words = request.text.lower().split()
|
|
translated_words = []
|
|
|
|
for word in words:
|
|
if word in local_translations and request.target_language in local_translations[word]:
|
|
translated_words.append(local_translations[word][request.target_language])
|
|
else:
|
|
translated_words.append(word) # Keep original if no translation
|
|
|
|
return " ".join(translated_words)
|
|
|
|
async def _external_translate_with_fallback(self, request: TranslationRequest, policy: SecurityPolicy) -> tuple[str, bool]:
|
|
"""External translation with local fallback"""
|
|
try:
|
|
# Try external translation first
|
|
result = await self._external_translate(request, policy)
|
|
return result, False
|
|
except Exception as e:
|
|
logger.warning(f"External translation failed, using local fallback: {e}")
|
|
result = await self._local_translate(request)
|
|
return result, True
|
|
|
|
async def _external_translate(self, request: TranslationRequest, policy: SecurityPolicy) -> str:
|
|
"""External translation with timeout and retry logic"""
|
|
if not policy.allow_external_apis:
|
|
raise Exception("External APIs not allowed for this security level")
|
|
|
|
# This would integrate with external translation services
|
|
# For security, we'll implement a mock that demonstrates the pattern
|
|
await asyncio.sleep(0.1) # Simulate API call
|
|
|
|
# Mock translation - in reality, this would call external APIs
|
|
return f"[Translated to {request.target_language}: {request.text}]"
|
|
|
|
def _log_security_check(self, request: TranslationRequest, policy: SecurityPolicy):
|
|
"""Log security check for audit trail"""
|
|
log_entry = {
|
|
"timestamp": asyncio.get_event_loop().time(),
|
|
"command": request.command_name,
|
|
"security_level": request.security_level.value,
|
|
"translation_mode": policy.translation_mode.value,
|
|
"target_language": request.target_language,
|
|
"user_consent": request.user_consent,
|
|
"text_length": len(request.text)
|
|
}
|
|
|
|
self.security_log.append(log_entry)
|
|
|
|
# Keep only last 1000 entries
|
|
if len(self.security_log) > 1000:
|
|
self.security_log = self.security_log[-1000:]
|
|
|
|
def get_security_summary(self) -> Dict:
|
|
"""Get summary of security checks"""
|
|
if not self.security_log:
|
|
return {"total_checks": 0, "message": "No security checks performed"}
|
|
|
|
total_checks = len(self.security_log)
|
|
by_level = {}
|
|
by_language = {}
|
|
|
|
for entry in self.security_log:
|
|
level = entry["security_level"]
|
|
lang = entry["target_language"]
|
|
|
|
by_level[level] = by_level.get(level, 0) + 1
|
|
by_language[lang] = by_language.get(lang, 0) + 1
|
|
|
|
return {
|
|
"total_checks": total_checks,
|
|
"by_security_level": by_level,
|
|
"by_target_language": by_language,
|
|
"recent_checks": self.security_log[-10:] # Last 10 checks
|
|
}
|
|
|
|
def is_translation_allowed(self, command_name: str, target_language: str) -> bool:
|
|
"""Quick check if translation is allowed for a command"""
|
|
security_level = self.get_command_security_level(command_name)
|
|
policy = self.policies[security_level]
|
|
|
|
return policy.translation_mode != TranslationMode.DISABLED
|
|
|
|
def get_security_policy_for_command(self, command_name: str) -> SecurityPolicy:
|
|
"""Get security policy for a specific command"""
|
|
security_level = self.get_command_security_level(command_name)
|
|
return self.policies[security_level]
|
|
|
|
|
|
# Global security manager instance
|
|
cli_translation_security = CLITranslationSecurityManager()
|
|
|
|
|
|
# Decorator for CLI commands to enforce translation security
|
|
def secure_translation(allowed_languages: Optional[List[str]] = None, require_consent: bool = False):
|
|
"""
|
|
Decorator to enforce translation security on CLI commands
|
|
|
|
Args:
|
|
allowed_languages: List of allowed target languages
|
|
require_consent: Whether to require explicit user consent
|
|
"""
|
|
def decorator(func):
|
|
async def wrapper(*args, **kwargs):
|
|
# This would integrate with the CLI command framework
|
|
# to enforce translation policies
|
|
return await func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
# Security policy configuration functions
|
|
def configure_translation_security(
|
|
critical_level: str = "disabled",
|
|
high_level: str = "local_only",
|
|
medium_level: str = "fallback",
|
|
low_level: str = "full"
|
|
):
|
|
"""Configure translation security policies"""
|
|
mode_mapping = {
|
|
"disabled": TranslationMode.DISABLED,
|
|
"local_only": TranslationMode.LOCAL_ONLY,
|
|
"fallback": TranslationMode.FALLBACK,
|
|
"full": TranslationMode.FULL
|
|
}
|
|
|
|
cli_translation_security.policies[SecurityLevel.CRITICAL].translation_mode = mode_mapping[critical_level]
|
|
cli_translation_security.policies[SecurityLevel.HIGH].translation_mode = mode_mapping[high_level]
|
|
cli_translation_security.policies[SecurityLevel.MEDIUM].translation_mode = mode_mapping[medium_level]
|
|
cli_translation_security.policies[SecurityLevel.LOW].translation_mode = mode_mapping[low_level]
|
|
|
|
|
|
def get_translation_security_report() -> Dict:
|
|
"""Get comprehensive translation security report"""
|
|
return {
|
|
"security_policies": {
|
|
level.value: policy.translation_mode.value
|
|
for level, policy in cli_translation_security.policies.items()
|
|
},
|
|
"security_summary": cli_translation_security.get_security_summary(),
|
|
"critical_commands": [
|
|
cmd for cmd in ['agent', 'strategy', 'wallet', 'sign', 'deploy']
|
|
if cli_translation_security.get_command_security_level(cmd) == SecurityLevel.CRITICAL
|
|
],
|
|
"recommendations": _get_security_recommendations()
|
|
}
|
|
|
|
|
|
def _get_security_recommendations() -> List[str]:
|
|
"""Get security recommendations"""
|
|
recommendations = []
|
|
|
|
# Check if critical commands have proper restrictions
|
|
for cmd in ['agent', 'strategy', 'wallet', 'sign']:
|
|
if cli_translation_security.is_translation_allowed(cmd, 'es'):
|
|
recommendations.append(f"Consider disabling translation for '{cmd}' command")
|
|
|
|
# Check for external API usage in sensitive operations
|
|
critical_policy = cli_translation_security.policies[SecurityLevel.CRITICAL]
|
|
if critical_policy.allow_external_apis:
|
|
recommendations.append("External APIs should be disabled for critical operations")
|
|
|
|
return recommendations
|