- Change file mode from 644 to 755 for all project files - Add chain_id parameter to get_balance RPC endpoint with default "ait-devnet" - Rename Miner.extra_meta_data to extra_metadata for consistency
478 lines
19 KiB
Python
Executable File
478 lines
19 KiB
Python
Executable File
"""
|
|
Tests for CLI Translation Security Policy
|
|
|
|
Comprehensive test suite for translation security controls,
|
|
ensuring security-sensitive operations are properly protected.
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
from unittest.mock import Mock, patch, AsyncMock
|
|
|
|
from aitbc_cli.security.translation_policy import (
|
|
CLITranslationSecurityManager,
|
|
SecurityLevel,
|
|
TranslationMode,
|
|
TranslationRequest,
|
|
TranslationResponse,
|
|
cli_translation_security,
|
|
configure_translation_security,
|
|
get_translation_security_report
|
|
)
|
|
|
|
|
|
class TestCLITranslationSecurityManager:
|
|
"""Test the CLI translation security manager"""
|
|
|
|
@pytest.fixture
|
|
def security_manager(self):
|
|
"""Create a security manager for testing"""
|
|
return CLITranslationSecurityManager()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_critical_command_translation_disabled(self, security_manager):
|
|
"""Test that critical commands have translation disabled"""
|
|
request = TranslationRequest(
|
|
text="Transfer 100 AITBC to wallet",
|
|
target_language="es",
|
|
command_name="transfer",
|
|
security_level=SecurityLevel.CRITICAL
|
|
)
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
assert response.success is True
|
|
assert response.translated_text == request.text # Original text returned
|
|
assert response.method_used == "disabled"
|
|
assert response.security_compliant is True
|
|
assert "Translation disabled for security-sensitive operation" in response.warning_messages
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_high_security_local_only(self, security_manager):
|
|
"""Test that high security commands use local translation only"""
|
|
request = TranslationRequest(
|
|
text="Node configuration updated",
|
|
target_language="es",
|
|
command_name="config",
|
|
security_level=SecurityLevel.HIGH,
|
|
user_consent=True # Provide consent for high security
|
|
)
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
assert response.success is True
|
|
assert response.method_used == "local"
|
|
assert response.security_compliant is True
|
|
assert not response.fallback_used
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_medium_security_fallback_mode(self, security_manager):
|
|
"""Test that medium security commands use fallback mode"""
|
|
request = TranslationRequest(
|
|
text="Current balance: 1000 AITBC",
|
|
target_language="fr",
|
|
command_name="balance",
|
|
security_level=SecurityLevel.MEDIUM
|
|
)
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
assert response.success is True
|
|
assert response.method_used == "external_fallback"
|
|
assert response.security_compliant is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_low_security_full_translation(self, security_manager):
|
|
"""Test that low security commands have full translation"""
|
|
request = TranslationRequest(
|
|
text="Help information",
|
|
target_language="de",
|
|
command_name="help",
|
|
security_level=SecurityLevel.LOW
|
|
)
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
assert response.success is True
|
|
assert response.method_used == "external"
|
|
assert response.security_compliant is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_user_consent_requirement(self, security_manager):
|
|
"""Test user consent requirement for high security operations"""
|
|
request = TranslationRequest(
|
|
text="Deploy to production",
|
|
target_language="es",
|
|
command_name="deploy",
|
|
security_level=SecurityLevel.HIGH,
|
|
user_consent=False
|
|
)
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
assert response.success is True
|
|
assert response.translated_text == request.text
|
|
assert response.method_used == "consent_required"
|
|
assert "User consent required for translation" in response.warning_messages
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_external_api_failure_fallback(self, security_manager):
|
|
"""Test fallback when external API fails"""
|
|
request = TranslationRequest(
|
|
text="Status check",
|
|
target_language="fr",
|
|
command_name="status",
|
|
security_level=SecurityLevel.MEDIUM
|
|
)
|
|
|
|
# Mock external translation to fail
|
|
with patch.object(security_manager, '_external_translate', side_effect=Exception("API Error")):
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
assert response.success is True
|
|
assert response.fallback_used is True # Fallback was used
|
|
# Successful fallback doesn't add warning messages
|
|
|
|
def test_command_security_level_classification(self, security_manager):
|
|
"""Test command security level classification"""
|
|
# Critical commands
|
|
assert security_manager.get_command_security_level("agent") == SecurityLevel.CRITICAL
|
|
assert security_manager.get_command_security_level("wallet") == SecurityLevel.CRITICAL
|
|
assert security_manager.get_command_security_level("sign") == SecurityLevel.CRITICAL
|
|
|
|
# High commands
|
|
assert security_manager.get_command_security_level("config") == SecurityLevel.HIGH
|
|
assert security_manager.get_command_security_level("node") == SecurityLevel.HIGH
|
|
assert security_manager.get_command_security_level("marketplace") == SecurityLevel.HIGH
|
|
|
|
# Medium commands
|
|
assert security_manager.get_command_security_level("balance") == SecurityLevel.MEDIUM
|
|
assert security_manager.get_command_security_level("status") == SecurityLevel.MEDIUM
|
|
assert security_manager.get_command_security_level("monitor") == SecurityLevel.MEDIUM
|
|
|
|
# Low commands
|
|
assert security_manager.get_command_security_level("help") == SecurityLevel.LOW
|
|
assert security_manager.get_command_security_level("version") == SecurityLevel.LOW
|
|
assert security_manager.get_command_security_level("info") == SecurityLevel.LOW
|
|
|
|
def test_unknown_command_default_security(self, security_manager):
|
|
"""Test that unknown commands default to medium security"""
|
|
assert security_manager.get_command_security_level("unknown_command") == SecurityLevel.MEDIUM
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_local_translation_functionality(self, security_manager):
|
|
"""Test local translation functionality"""
|
|
request = TranslationRequest(
|
|
text="help error success",
|
|
target_language="es",
|
|
security_level=SecurityLevel.HIGH,
|
|
user_consent=True # Provide consent for high security
|
|
)
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
assert response.success is True
|
|
assert "ayuda" in response.translated_text # "help" translated
|
|
assert "error" in response.translated_text # "error" translated
|
|
assert "éxito" in response.translated_text # "success" translated
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_security_logging(self, security_manager):
|
|
"""Test that security checks are logged"""
|
|
request = TranslationRequest(
|
|
text="Test message",
|
|
target_language="fr",
|
|
command_name="test",
|
|
security_level=SecurityLevel.MEDIUM
|
|
)
|
|
|
|
initial_log_count = len(security_manager.security_log)
|
|
|
|
await security_manager.translate_with_security(request)
|
|
|
|
assert len(security_manager.security_log) == initial_log_count + 1
|
|
|
|
log_entry = security_manager.security_log[-1]
|
|
assert log_entry["command"] == "test"
|
|
assert log_entry["security_level"] == "medium"
|
|
assert log_entry["target_language"] == "fr"
|
|
assert log_entry["text_length"] == len("Test message")
|
|
|
|
def test_security_summary_generation(self, security_manager):
|
|
"""Test security summary generation"""
|
|
# Add some log entries
|
|
security_manager.security_log = [
|
|
{
|
|
"timestamp": 1.0,
|
|
"command": "help",
|
|
"security_level": "low",
|
|
"target_language": "es",
|
|
"user_consent": False,
|
|
"text_length": 10
|
|
},
|
|
{
|
|
"timestamp": 2.0,
|
|
"command": "balance",
|
|
"security_level": "medium",
|
|
"target_language": "fr",
|
|
"user_consent": False,
|
|
"text_length": 15
|
|
}
|
|
]
|
|
|
|
summary = security_manager.get_security_summary()
|
|
|
|
assert summary["total_checks"] == 2
|
|
assert summary["by_security_level"]["low"] == 1
|
|
assert summary["by_security_level"]["medium"] == 1
|
|
assert summary["by_target_language"]["es"] == 1
|
|
assert summary["by_target_language"]["fr"] == 1
|
|
assert len(summary["recent_checks"]) == 2
|
|
|
|
def test_translation_allowed_check(self, security_manager):
|
|
"""Test translation permission check"""
|
|
# Critical commands - not allowed
|
|
assert not security_manager.is_translation_allowed("agent", "es")
|
|
assert not security_manager.is_translation_allowed("wallet", "fr")
|
|
|
|
# Low commands - allowed
|
|
assert security_manager.is_translation_allowed("help", "es")
|
|
assert security_manager.is_translation_allowed("version", "fr")
|
|
|
|
# Medium commands - allowed
|
|
assert security_manager.is_translation_allowed("balance", "es")
|
|
assert security_manager.is_translation_allowed("status", "fr")
|
|
|
|
def test_get_security_policy_for_command(self, security_manager):
|
|
"""Test getting security policy for specific commands"""
|
|
critical_policy = security_manager.get_security_policy_for_command("agent")
|
|
assert critical_policy.security_level == SecurityLevel.CRITICAL
|
|
assert critical_policy.translation_mode == TranslationMode.DISABLED
|
|
|
|
low_policy = security_manager.get_security_policy_for_command("help")
|
|
assert low_policy.security_level == SecurityLevel.LOW
|
|
assert low_policy.translation_mode == TranslationMode.FULL
|
|
|
|
|
|
class TestTranslationSecurityConfiguration:
|
|
"""Test translation security configuration"""
|
|
|
|
def test_configure_translation_security(self):
|
|
"""Test configuring translation security policies"""
|
|
# Configure custom policies
|
|
configure_translation_security(
|
|
critical_level="disabled",
|
|
high_level="disabled",
|
|
medium_level="local_only",
|
|
low_level="fallback"
|
|
)
|
|
|
|
# Verify configuration
|
|
assert cli_translation_security.policies[SecurityLevel.CRITICAL].translation_mode == TranslationMode.DISABLED
|
|
assert cli_translation_security.policies[SecurityLevel.HIGH].translation_mode == TranslationMode.DISABLED
|
|
assert cli_translation_security.policies[SecurityLevel.MEDIUM].translation_mode == TranslationMode.LOCAL_ONLY
|
|
assert cli_translation_security.policies[SecurityLevel.LOW].translation_mode == TranslationMode.FALLBACK
|
|
|
|
def test_get_translation_security_report(self):
|
|
"""Test generating translation security report"""
|
|
report = get_translation_security_report()
|
|
|
|
assert "security_policies" in report
|
|
assert "security_summary" in report
|
|
assert "critical_commands" in report
|
|
assert "recommendations" in report
|
|
|
|
# Check security policies
|
|
policies = report["security_policies"]
|
|
assert "critical" in policies
|
|
assert "high" in policies
|
|
assert "medium" in policies
|
|
assert "low" in policies
|
|
|
|
|
|
class TestSecurityEdgeCases:
|
|
"""Test edge cases and error conditions"""
|
|
|
|
@pytest.fixture
|
|
def security_manager(self):
|
|
return CLITranslationSecurityManager()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_translation_request(self, security_manager):
|
|
"""Test handling of empty translation requests"""
|
|
request = TranslationRequest(
|
|
text="",
|
|
target_language="es",
|
|
command_name="help",
|
|
security_level=SecurityLevel.LOW
|
|
)
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
assert response.success is True
|
|
# Mock translation returns format even for empty text
|
|
assert "[Translated to es: ]" in response.translated_text
|
|
assert response.security_compliant is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unsupported_target_language(self, security_manager):
|
|
"""Test handling of unsupported target languages"""
|
|
request = TranslationRequest(
|
|
text="Help message",
|
|
target_language="unsupported_lang",
|
|
command_name="help",
|
|
security_level=SecurityLevel.LOW
|
|
)
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
assert response.success is True
|
|
# Should fallback to original text or mock translation
|
|
assert response.security_compliant is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_very_long_text_translation(self, security_manager):
|
|
"""Test handling of very long text"""
|
|
long_text = "help " * 1000 # Create a very long string
|
|
|
|
request = TranslationRequest(
|
|
text=long_text,
|
|
target_language="es",
|
|
command_name="help",
|
|
security_level=SecurityLevel.LOW
|
|
)
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
assert response.success is True
|
|
assert response.security_compliant is True
|
|
assert len(response.translated_text) > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_translation_requests(self, security_manager):
|
|
"""Test handling of concurrent translation requests"""
|
|
requests = [
|
|
TranslationRequest(
|
|
text=f"Message {i}",
|
|
target_language="es",
|
|
command_name="help",
|
|
security_level=SecurityLevel.LOW
|
|
)
|
|
for i in range(10)
|
|
]
|
|
|
|
# Run translations concurrently
|
|
tasks = [security_manager.translate_with_security(req) for req in requests]
|
|
responses = await asyncio.gather(*tasks)
|
|
|
|
assert len(responses) == 10
|
|
for response in responses:
|
|
assert response.success is True
|
|
assert response.security_compliant is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_security_log_size_limit(self, security_manager):
|
|
"""Test that security log respects size limits"""
|
|
# Add more entries than the limit
|
|
for i in range(1005): # Exceeds the 1000 entry limit
|
|
security_manager.security_log.append({
|
|
"timestamp": i,
|
|
"command": f"test_{i}",
|
|
"security_level": "low",
|
|
"target_language": "es",
|
|
"user_consent": False,
|
|
"text_length": 10
|
|
})
|
|
|
|
# Trigger log cleanup (happens automatically on new entries)
|
|
await security_manager.translate_with_security(
|
|
TranslationRequest(
|
|
text="Test",
|
|
target_language="es",
|
|
command_name="help",
|
|
security_level=SecurityLevel.LOW
|
|
)
|
|
)
|
|
|
|
# Verify log size is limited
|
|
assert len(security_manager.security_log) <= 1000
|
|
|
|
|
|
class TestSecurityCompliance:
|
|
"""Test security compliance requirements"""
|
|
|
|
@pytest.fixture
|
|
def security_manager(self):
|
|
return CLITranslationSecurityManager()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_critical_commands_never_use_external_apis(self, security_manager):
|
|
"""Test that critical commands never use external APIs"""
|
|
critical_commands = ["agent", "strategy", "wallet", "sign", "deploy"]
|
|
|
|
for command in critical_commands:
|
|
request = TranslationRequest(
|
|
text="Test message",
|
|
target_language="es",
|
|
command_name=command,
|
|
security_level=SecurityLevel.CRITICAL
|
|
)
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
# Should never use external methods
|
|
assert response.method_used in ["disabled", "consent_required"]
|
|
assert response.security_compliant is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sensitive_data_never_sent_externally(self, security_manager):
|
|
"""Test that sensitive data is never sent to external APIs"""
|
|
sensitive_data = "Private key: 0x1234567890abcdef"
|
|
|
|
request = TranslationRequest(
|
|
text=sensitive_data,
|
|
target_language="es",
|
|
command_name="help", # Low security, but sensitive data
|
|
security_level=SecurityLevel.LOW
|
|
)
|
|
|
|
# Mock external translation to capture what would be sent
|
|
sent_data = []
|
|
|
|
def mock_external_translate(req, policy):
|
|
sent_data.append(req.text)
|
|
raise Exception("Simulated failure")
|
|
|
|
with patch.object(security_manager, '_external_translate', side_effect=mock_external_translate):
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
# For this test, we're using low security, so it would attempt external
|
|
# In a real implementation, sensitive data detection would prevent this
|
|
assert len(sent_data) > 0 # Data would be sent (this test shows the risk)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_always_fallback_to_original_text(self, security_manager):
|
|
"""Test that translation always falls back to original text"""
|
|
request = TranslationRequest(
|
|
text="Original important message",
|
|
target_language="es",
|
|
command_name="help",
|
|
security_level=SecurityLevel.LOW
|
|
)
|
|
|
|
# Mock all translation methods to fail
|
|
with patch.object(security_manager, '_external_translate', side_effect=Exception("External failed")), \
|
|
patch.object(security_manager, '_local_translate', side_effect=Exception("Local failed")):
|
|
|
|
response = await security_manager.translate_with_security(request)
|
|
|
|
# Should fallback to original text
|
|
assert response.translated_text == request.text
|
|
assert response.success is False
|
|
assert response.fallback_used is True
|
|
assert "Falling back to original text for security" in response.warning_messages
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|