chore: remove outdated documentation and reference files
Some checks failed
AITBC CI/CD Pipeline / lint-and-test (3.11) (push) Has been cancelled
AITBC CI/CD Pipeline / lint-and-test (3.12) (push) Has been cancelled
AITBC CI/CD Pipeline / lint-and-test (3.13) (push) Has been cancelled
AITBC CI/CD Pipeline / test-cli (push) Has been cancelled
AITBC CI/CD Pipeline / test-services (push) Has been cancelled
AITBC CI/CD Pipeline / test-production-services (push) Has been cancelled
AITBC CI/CD Pipeline / security-scan (push) Has been cancelled
AITBC CI/CD Pipeline / build (push) Has been cancelled
AITBC CI/CD Pipeline / deploy-staging (push) Has been cancelled
AITBC CI/CD Pipeline / deploy-production (push) Has been cancelled
AITBC CI/CD Pipeline / performance-test (push) Has been cancelled
AITBC CI/CD Pipeline / docs (push) Has been cancelled
AITBC CI/CD Pipeline / release (push) Has been cancelled
AITBC CI/CD Pipeline / notify (push) Has been cancelled
Security Scanning / Bandit Security Scan (apps/coordinator-api/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (cli/aitbc_cli) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-core/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-crypto/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (packages/py/aitbc-sdk/src) (push) Has been cancelled
Security Scanning / Bandit Security Scan (tests) (push) Has been cancelled
Security Scanning / CodeQL Security Analysis (javascript) (push) Has been cancelled
Security Scanning / CodeQL Security Analysis (python) (push) Has been cancelled
Security Scanning / Dependency Security Scan (push) Has been cancelled
Security Scanning / Container Security Scan (push) Has been cancelled
Security Scanning / OSSF Scorecard (push) Has been cancelled
Security Scanning / Security Summary Report (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.11) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.12) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-cli-level1 (3.13) (push) Has been cancelled
AITBC CLI Level 1 Commands Test / test-summary (push) Has been cancelled

- Remove debugging service documentation (DEBUgging_SERVICES.md)
- Remove development logs policy and quick reference guides
- Remove E2E test creation summary
- Remove gift certificate example file
- Remove GitHub pull summary documentation
This commit is contained in:
2026-03-25 12:56:07 +01:00
parent 26f7dd5ad0
commit bfe6f94b75
229 changed files with 537 additions and 381 deletions

View File

@@ -0,0 +1,552 @@
"""
Tests for AITBC Agent Wallet Security System
Comprehensive test suite for the guardian contract system that protects
autonomous agent wallets from unlimited spending in case of compromise.
"""
import pytest
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
from eth_account import Account
from eth_utils import to_checksum_address
from aitbc_chain.contracts.guardian_contract import (
GuardianContract,
SpendingLimit,
TimeLockConfig,
GuardianConfig,
create_guardian_contract,
CONSERVATIVE_CONFIG,
AGGRESSIVE_CONFIG,
HIGH_SECURITY_CONFIG
)
from aitbc_chain.contracts.agent_wallet_security import (
AgentWalletSecurity,
AgentSecurityProfile,
register_agent_for_protection,
protect_agent_transaction,
get_agent_security_summary,
generate_security_report,
detect_suspicious_activity
)
class TestGuardianContract:
"""Test the core guardian contract functionality"""
@pytest.fixture
def sample_config(self):
"""Sample guardian configuration for testing"""
limits = SpendingLimit(
per_transaction=100,
per_hour=500,
per_day=2000,
per_week=10000
)
time_lock = TimeLockConfig(
threshold=1000,
delay_hours=24,
max_delay_hours=168
)
guardians = [to_checksum_address(f"0x{'0'*38}{i:02d}") for i in range(3)]
return GuardianConfig(
limits=limits,
time_lock=time_lock,
guardians=guardians
)
@pytest.fixture
def guardian_contract(self, sample_config):
"""Create a guardian contract for testing"""
agent_address = to_checksum_address("0x1234567890123456789012345678901234567890")
return GuardianContract(agent_address, sample_config)
def test_spending_limit_enforcement(self, guardian_contract):
"""Test that spending limits are properly enforced"""
# Test per-transaction limit
result = guardian_contract.initiate_transaction(
to_address="0xabcdef123456789012345678901234567890abcd",
amount=150 # Exceeds per_transaction limit of 100
)
assert result["status"] == "rejected"
assert "per-transaction limit" in result["reason"]
# Test within limits
result = guardian_contract.initiate_transaction(
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50 # Within limits
)
assert result["status"] == "approved"
assert "operation_id" in result
def test_time_lock_functionality(self, guardian_contract):
"""Test time lock for large transactions"""
# Test time lock threshold
result = guardian_contract.initiate_transaction(
to_address="0xabcdef123456789012345678901234567890abcd",
amount=1500 # Exceeds time lock threshold of 1000
)
assert result["status"] == "time_locked"
assert "unlock_time" in result
assert result["delay_hours"] == 24
# Test execution before unlock time
operation_id = result["operation_id"]
exec_result = guardian_contract.execute_transaction(
operation_id=operation_id,
signature="mock_signature"
)
assert exec_result["status"] == "error"
assert "locked until" in exec_result["reason"]
def test_hourly_spending_limits(self, guardian_contract):
"""Test hourly spending limit enforcement"""
# Create multiple transactions within hour limit
for i in range(5): # 5 transactions of 100 each = 500 (hourly limit)
result = guardian_contract.initiate_transaction(
to_address=f"0xabcdef123456789012345678901234567890ab{i:02d}",
amount=100
)
if i < 4: # First 4 should be approved
assert result["status"] == "approved"
# Execute the transaction
guardian_contract.execute_transaction(
operation_id=result["operation_id"],
signature="mock_signature"
)
else: # 5th should be rejected (exceeds hourly limit)
assert result["status"] == "rejected"
assert "Hourly spending" in result["reason"]
def test_emergency_pause(self, guardian_contract):
"""Test emergency pause functionality"""
guardian_address = guardian_contract.config.guardians[0]
# Test emergency pause
result = guardian_contract.emergency_pause(guardian_address)
assert result["status"] == "paused"
assert result["guardian"] == guardian_address
# Test that transactions are rejected during pause
tx_result = guardian_contract.initiate_transaction(
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50
)
assert tx_result["status"] == "rejected"
assert "paused" in tx_result["reason"]
def test_unauthorized_operations(self, guardian_contract):
"""Test that unauthorized operations are rejected"""
unauthorized_address = to_checksum_address("0xunauthorized123456789012345678901234567890")
# Test unauthorized emergency pause
result = guardian_contract.emergency_pause(unauthorized_address)
assert result["status"] == "rejected"
assert "Not authorized" in result["reason"]
# Test unauthorized limit updates
new_limits = SpendingLimit(200, 1000, 4000, 20000)
result = guardian_contract.update_limits(new_limits, unauthorized_address)
assert result["status"] == "rejected"
assert "Not authorized" in result["reason"]
def test_spending_status_tracking(self, guardian_contract):
"""Test spending status tracking and reporting"""
# Execute some transactions
for i in range(3):
result = guardian_contract.initiate_transaction(
to_address=f"0xabcdef123456789012345678901234567890ab{i:02d}",
amount=50
)
if result["status"] == "approved":
guardian_contract.execute_transaction(
operation_id=result["operation_id"],
signature="mock_signature"
)
status = guardian_contract.get_spending_status()
assert status["agent_address"] == guardian_contract.agent_address
assert status["spent"]["current_hour"] == 150 # 3 * 50
assert status["remaining"]["current_hour"] == 350 # 500 - 150
assert status["nonce"] == 3
class TestAgentWalletSecurity:
"""Test the agent wallet security manager"""
@pytest.fixture
def security_manager(self):
"""Create a security manager for testing"""
return AgentWalletSecurity()
@pytest.fixture
def sample_agent(self):
"""Sample agent address for testing"""
return to_checksum_address("0x1234567890123456789012345678901234567890")
@pytest.fixture
def sample_guardians(self):
"""Sample guardian addresses for testing"""
return [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4) # Guardians 01, 02, 03
]
def test_agent_registration(self, security_manager, sample_agent, sample_guardians):
"""Test agent registration for security protection"""
result = security_manager.register_agent(
agent_address=sample_agent,
security_level="conservative",
guardian_addresses=sample_guardians
)
assert result["status"] == "registered"
assert result["agent_address"] == sample_agent
assert result["security_level"] == "conservative"
assert len(result["guardian_addresses"]) == 3
assert "limits" in result
# Verify agent is in registry
assert sample_agent in security_manager.agent_profiles
assert sample_agent in security_manager.guardian_contracts
def test_duplicate_registration(self, security_manager, sample_agent, sample_guardians):
"""Test that duplicate registrations are rejected"""
# Register agent once
security_manager.register_agent(sample_agent, "conservative", sample_guardians)
# Try to register again
result = security_manager.register_agent(sample_agent, "aggressive", sample_guardians)
assert result["status"] == "error"
assert "already registered" in result["reason"]
def test_transaction_protection(self, security_manager, sample_agent, sample_guardians):
"""Test transaction protection for registered agents"""
# Register agent
security_manager.register_agent(sample_agent, "conservative", sample_guardians)
# Protect transaction
result = security_manager.protect_transaction(
agent_address=sample_agent,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50
)
assert result["status"] == "approved"
assert "operation_id" in result
# Test transaction exceeding limits
result = security_manager.protect_transaction(
agent_address=sample_agent,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=150 # Exceeds conservative per-transaction limit
)
assert result["status"] == "rejected"
assert "per-transaction limit" in result["reason"]
def test_unprotected_agent_transactions(self, security_manager, sample_agent):
"""Test transactions from unregistered agents"""
result = security_manager.protect_transaction(
agent_address=sample_agent,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50
)
assert result["status"] == "unprotected"
assert "not registered" in result["reason"]
def test_emergency_pause_integration(self, security_manager, sample_agent, sample_guardians):
"""Test emergency pause functionality"""
# Register agent
security_manager.register_agent(sample_agent, "conservative", sample_guardians)
# Emergency pause by guardian
result = security_manager.emergency_pause_agent(
agent_address=sample_agent,
guardian_address=sample_guardians[0]
)
assert result["status"] == "paused"
# Verify transactions are blocked
tx_result = security_manager.protect_transaction(
agent_address=sample_agent,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50
)
assert tx_result["status"] == "unprotected"
assert "disabled" in tx_result["reason"]
def test_security_status_reporting(self, security_manager, sample_agent, sample_guardians):
"""Test security status reporting"""
# Register agent
security_manager.register_agent(sample_agent, "conservative", sample_guardians)
# Get security status
status = security_manager.get_agent_security_status(sample_agent)
assert status["status"] == "protected"
assert status["agent_address"] == sample_agent
assert status["security_level"] == "conservative"
assert status["enabled"] == True
assert len(status["guardian_addresses"]) == 3
assert "spending_status" in status
assert "pending_operations" in status
def test_security_level_configurations(self, security_manager, sample_agent, sample_guardians):
"""Test different security level configurations"""
configurations = [
("conservative", CONSERVATIVE_CONFIG),
("aggressive", AGGRESSIVE_CONFIG),
("high_security", HIGH_SECURITY_CONFIG)
]
for level, config in configurations:
# Register with specific security level
result = security_manager.register_agent(
sample_agent + f"_{level}",
level,
sample_guardians
)
assert result["status"] == "registered"
assert result["security_level"] == level
# Verify limits match configuration
limits = result["limits"]
assert limits.per_transaction == config["per_transaction"]
assert limits.per_hour == config["per_hour"]
assert limits.per_day == config["per_day"]
assert limits.per_week == config["per_week"]
class TestSecurityMonitoring:
"""Test security monitoring and detection features"""
@pytest.fixture
def security_manager(self):
"""Create a security manager with sample data"""
manager = AgentWalletSecurity()
# Register some test agents
agents = [
("0x1111111111111111111111111111111111111111", "conservative"),
("0x2222222222222222222222222222222222222222", "aggressive"),
("0x3333333333333333333333333333333333333333", "high_security")
]
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
for agent_addr, level in agents:
manager.register_agent(agent_addr, level, guardians)
return manager
def test_security_report_generation(self, security_manager):
"""Test comprehensive security report generation"""
report = generate_security_report()
assert "generated_at" in report
assert "summary" in report
assert "agents" in report
assert "recent_security_events" in report
assert "security_levels" in report
summary = report["summary"]
assert "total_protected_agents" in summary
assert "active_agents" in summary
assert "protection_coverage" in summary
# Verify all security levels are represented
levels = report["security_levels"]
assert "conservative" in levels
assert "aggressive" in levels
assert "high_security" in levels
def test_suspicious_activity_detection(self, security_manager):
"""Test suspicious activity detection"""
agent_addr = "0x1111111111111111111111111111111111111111"
# Test normal activity
result = detect_suspicious_activity(agent_addr, hours=24)
assert result["status"] == "analyzed"
assert result["suspicious_activity"] == False
# Simulate high activity by creating many transactions
# (This would require more complex setup in a real test)
def test_protected_agents_listing(self, security_manager):
"""Test listing of protected agents"""
agents = security_manager.list_protected_agents()
assert len(agents) == 3
for agent in agents:
assert "agent_address" in agent
assert "security_level" in agent
assert "enabled" in agent
assert "guardian_count" in agent
assert "pending_operations" in agent
assert "paused" in agent
assert "emergency_mode" in agent
assert "registered_at" in agent
class TestConvenienceFunctions:
"""Test convenience functions for common operations"""
def test_register_agent_for_protection(self):
"""Test the convenience registration function"""
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
result = register_agent_for_protection(
agent_address=agent_addr,
security_level="conservative",
guardians=guardians
)
assert result["status"] == "registered"
assert result["agent_address"] == agent_addr
assert result["security_level"] == "conservative"
def test_protect_agent_transaction(self):
"""Test the convenience transaction protection function"""
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
# Register first
register_agent_for_protection(agent_addr, "conservative", guardians)
# Protect transaction
result = protect_agent_transaction(
agent_address=agent_addr,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50
)
assert result["status"] == "approved"
assert "operation_id" in result
def test_get_agent_security_summary(self):
"""Test the convenience security summary function"""
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
# Register first
register_agent_for_protection(agent_addr, "conservative", guardians)
# Get summary
summary = get_agent_security_summary(agent_addr)
assert summary["status"] == "protected"
assert summary["agent_address"] == agent_addr
assert summary["security_level"] == "conservative"
assert "spending_status" in summary
class TestSecurityEdgeCases:
"""Test edge cases and error conditions"""
def test_invalid_address_handling(self):
"""Test handling of invalid addresses"""
manager = AgentWalletSecurity()
# Test invalid agent address
result = manager.register_agent("invalid_address", "conservative")
assert result["status"] == "error"
# Test invalid guardian address
result = manager.register_agent(
"0x1234567890123456789012345678901234567890",
"conservative",
["invalid_guardian"]
)
assert result["status"] == "error"
def test_invalid_security_level(self):
"""Test handling of invalid security levels"""
manager = AgentWalletSecurity()
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
result = manager.register_agent(agent_addr, "invalid_level")
assert result["status"] == "error"
assert "Invalid security level" in result["reason"]
def test_zero_amount_transactions(self):
"""Test handling of zero amount transactions"""
manager = AgentWalletSecurity()
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
# Register agent
manager.register_agent(agent_addr, "conservative", guardians)
# Test zero amount transaction
result = manager.protect_transaction(
agent_address=agent_addr,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=0
)
# Zero amount should be allowed (no spending)
assert result["status"] == "approved"
def test_negative_amount_transactions(self):
"""Test handling of negative amount transactions"""
manager = AgentWalletSecurity()
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
# Register agent
manager.register_agent(agent_addr, "conservative", guardians)
# Test negative amount transaction
result = manager.protect_transaction(
agent_address=agent_addr,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=-100
)
# Negative amounts should be rejected
assert result["status"] == "rejected"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,477 @@
"""
Tests for CLI Translation Security Policy
Comprehensive test suite for translation security controls,
ensuring security-sensitive operations are properly protected.
"""
import pytest
import asyncio
from unittest.mock import Mock, patch, AsyncMock
from aitbc_cli.security.translation_policy import (
CLITranslationSecurityManager,
SecurityLevel,
TranslationMode,
TranslationRequest,
TranslationResponse,
cli_translation_security,
configure_translation_security,
get_translation_security_report
)
class TestCLITranslationSecurityManager:
"""Test the CLI translation security manager"""
@pytest.fixture
def security_manager(self):
"""Create a security manager for testing"""
return CLITranslationSecurityManager()
@pytest.mark.asyncio
async def test_critical_command_translation_disabled(self, security_manager):
"""Test that critical commands have translation disabled"""
request = TranslationRequest(
text="Transfer 100 AITBC to wallet",
target_language="es",
command_name="transfer",
security_level=SecurityLevel.CRITICAL
)
response = await security_manager.translate_with_security(request)
assert response.success is True
assert response.translated_text == request.text # Original text returned
assert response.method_used == "disabled"
assert response.security_compliant is True
assert "Translation disabled for security-sensitive operation" in response.warning_messages
@pytest.mark.asyncio
async def test_high_security_local_only(self, security_manager):
"""Test that high security commands use local translation only"""
request = TranslationRequest(
text="Node configuration updated",
target_language="es",
command_name="config",
security_level=SecurityLevel.HIGH,
user_consent=True # Provide consent for high security
)
response = await security_manager.translate_with_security(request)
assert response.success is True
assert response.method_used == "local"
assert response.security_compliant is True
assert not response.fallback_used
@pytest.mark.asyncio
async def test_medium_security_fallback_mode(self, security_manager):
"""Test that medium security commands use fallback mode"""
request = TranslationRequest(
text="Current balance: 1000 AITBC",
target_language="fr",
command_name="balance",
security_level=SecurityLevel.MEDIUM
)
response = await security_manager.translate_with_security(request)
assert response.success is True
assert response.method_used == "external_fallback"
assert response.security_compliant is True
@pytest.mark.asyncio
async def test_low_security_full_translation(self, security_manager):
"""Test that low security commands have full translation"""
request = TranslationRequest(
text="Help information",
target_language="de",
command_name="help",
security_level=SecurityLevel.LOW
)
response = await security_manager.translate_with_security(request)
assert response.success is True
assert response.method_used == "external"
assert response.security_compliant is True
@pytest.mark.asyncio
async def test_user_consent_requirement(self, security_manager):
"""Test user consent requirement for high security operations"""
request = TranslationRequest(
text="Deploy to production",
target_language="es",
command_name="deploy",
security_level=SecurityLevel.HIGH,
user_consent=False
)
response = await security_manager.translate_with_security(request)
assert response.success is True
assert response.translated_text == request.text
assert response.method_used == "consent_required"
assert "User consent required for translation" in response.warning_messages
@pytest.mark.asyncio
async def test_external_api_failure_fallback(self, security_manager):
"""Test fallback when external API fails"""
request = TranslationRequest(
text="Status check",
target_language="fr",
command_name="status",
security_level=SecurityLevel.MEDIUM
)
# Mock external translation to fail
with patch.object(security_manager, '_external_translate', side_effect=Exception("API Error")):
response = await security_manager.translate_with_security(request)
assert response.success is True
assert response.fallback_used is True # Fallback was used
# Successful fallback doesn't add warning messages
def test_command_security_level_classification(self, security_manager):
"""Test command security level classification"""
# Critical commands
assert security_manager.get_command_security_level("agent") == SecurityLevel.CRITICAL
assert security_manager.get_command_security_level("wallet") == SecurityLevel.CRITICAL
assert security_manager.get_command_security_level("sign") == SecurityLevel.CRITICAL
# High commands
assert security_manager.get_command_security_level("config") == SecurityLevel.HIGH
assert security_manager.get_command_security_level("node") == SecurityLevel.HIGH
assert security_manager.get_command_security_level("marketplace") == SecurityLevel.HIGH
# Medium commands
assert security_manager.get_command_security_level("balance") == SecurityLevel.MEDIUM
assert security_manager.get_command_security_level("status") == SecurityLevel.MEDIUM
assert security_manager.get_command_security_level("monitor") == SecurityLevel.MEDIUM
# Low commands
assert security_manager.get_command_security_level("help") == SecurityLevel.LOW
assert security_manager.get_command_security_level("version") == SecurityLevel.LOW
assert security_manager.get_command_security_level("info") == SecurityLevel.LOW
def test_unknown_command_default_security(self, security_manager):
"""Test that unknown commands default to medium security"""
assert security_manager.get_command_security_level("unknown_command") == SecurityLevel.MEDIUM
@pytest.mark.asyncio
async def test_local_translation_functionality(self, security_manager):
"""Test local translation functionality"""
request = TranslationRequest(
text="help error success",
target_language="es",
security_level=SecurityLevel.HIGH,
user_consent=True # Provide consent for high security
)
response = await security_manager.translate_with_security(request)
assert response.success is True
assert "ayuda" in response.translated_text # "help" translated
assert "error" in response.translated_text # "error" translated
assert "éxito" in response.translated_text # "success" translated
@pytest.mark.asyncio
async def test_security_logging(self, security_manager):
"""Test that security checks are logged"""
request = TranslationRequest(
text="Test message",
target_language="fr",
command_name="test",
security_level=SecurityLevel.MEDIUM
)
initial_log_count = len(security_manager.security_log)
await security_manager.translate_with_security(request)
assert len(security_manager.security_log) == initial_log_count + 1
log_entry = security_manager.security_log[-1]
assert log_entry["command"] == "test"
assert log_entry["security_level"] == "medium"
assert log_entry["target_language"] == "fr"
assert log_entry["text_length"] == len("Test message")
def test_security_summary_generation(self, security_manager):
"""Test security summary generation"""
# Add some log entries
security_manager.security_log = [
{
"timestamp": 1.0,
"command": "help",
"security_level": "low",
"target_language": "es",
"user_consent": False,
"text_length": 10
},
{
"timestamp": 2.0,
"command": "balance",
"security_level": "medium",
"target_language": "fr",
"user_consent": False,
"text_length": 15
}
]
summary = security_manager.get_security_summary()
assert summary["total_checks"] == 2
assert summary["by_security_level"]["low"] == 1
assert summary["by_security_level"]["medium"] == 1
assert summary["by_target_language"]["es"] == 1
assert summary["by_target_language"]["fr"] == 1
assert len(summary["recent_checks"]) == 2
def test_translation_allowed_check(self, security_manager):
"""Test translation permission check"""
# Critical commands - not allowed
assert not security_manager.is_translation_allowed("agent", "es")
assert not security_manager.is_translation_allowed("wallet", "fr")
# Low commands - allowed
assert security_manager.is_translation_allowed("help", "es")
assert security_manager.is_translation_allowed("version", "fr")
# Medium commands - allowed
assert security_manager.is_translation_allowed("balance", "es")
assert security_manager.is_translation_allowed("status", "fr")
def test_get_security_policy_for_command(self, security_manager):
"""Test getting security policy for specific commands"""
critical_policy = security_manager.get_security_policy_for_command("agent")
assert critical_policy.security_level == SecurityLevel.CRITICAL
assert critical_policy.translation_mode == TranslationMode.DISABLED
low_policy = security_manager.get_security_policy_for_command("help")
assert low_policy.security_level == SecurityLevel.LOW
assert low_policy.translation_mode == TranslationMode.FULL
class TestTranslationSecurityConfiguration:
"""Test translation security configuration"""
def test_configure_translation_security(self):
"""Test configuring translation security policies"""
# Configure custom policies
configure_translation_security(
critical_level="disabled",
high_level="disabled",
medium_level="local_only",
low_level="fallback"
)
# Verify configuration
assert cli_translation_security.policies[SecurityLevel.CRITICAL].translation_mode == TranslationMode.DISABLED
assert cli_translation_security.policies[SecurityLevel.HIGH].translation_mode == TranslationMode.DISABLED
assert cli_translation_security.policies[SecurityLevel.MEDIUM].translation_mode == TranslationMode.LOCAL_ONLY
assert cli_translation_security.policies[SecurityLevel.LOW].translation_mode == TranslationMode.FALLBACK
def test_get_translation_security_report(self):
"""Test generating translation security report"""
report = get_translation_security_report()
assert "security_policies" in report
assert "security_summary" in report
assert "critical_commands" in report
assert "recommendations" in report
# Check security policies
policies = report["security_policies"]
assert "critical" in policies
assert "high" in policies
assert "medium" in policies
assert "low" in policies
class TestSecurityEdgeCases:
"""Test edge cases and error conditions"""
@pytest.fixture
def security_manager(self):
return CLITranslationSecurityManager()
@pytest.mark.asyncio
async def test_empty_translation_request(self, security_manager):
"""Test handling of empty translation requests"""
request = TranslationRequest(
text="",
target_language="es",
command_name="help",
security_level=SecurityLevel.LOW
)
response = await security_manager.translate_with_security(request)
assert response.success is True
# Mock translation returns format even for empty text
assert "[Translated to es: ]" in response.translated_text
assert response.security_compliant is True
@pytest.mark.asyncio
async def test_unsupported_target_language(self, security_manager):
"""Test handling of unsupported target languages"""
request = TranslationRequest(
text="Help message",
target_language="unsupported_lang",
command_name="help",
security_level=SecurityLevel.LOW
)
response = await security_manager.translate_with_security(request)
assert response.success is True
# Should fallback to original text or mock translation
assert response.security_compliant is True
@pytest.mark.asyncio
async def test_very_long_text_translation(self, security_manager):
"""Test handling of very long text"""
long_text = "help " * 1000 # Create a very long string
request = TranslationRequest(
text=long_text,
target_language="es",
command_name="help",
security_level=SecurityLevel.LOW
)
response = await security_manager.translate_with_security(request)
assert response.success is True
assert response.security_compliant is True
assert len(response.translated_text) > 0
@pytest.mark.asyncio
async def test_concurrent_translation_requests(self, security_manager):
"""Test handling of concurrent translation requests"""
requests = [
TranslationRequest(
text=f"Message {i}",
target_language="es",
command_name="help",
security_level=SecurityLevel.LOW
)
for i in range(10)
]
# Run translations concurrently
tasks = [security_manager.translate_with_security(req) for req in requests]
responses = await asyncio.gather(*tasks)
assert len(responses) == 10
for response in responses:
assert response.success is True
assert response.security_compliant is True
@pytest.mark.asyncio
async def test_security_log_size_limit(self, security_manager):
"""Test that security log respects size limits"""
# Add more entries than the limit
for i in range(1005): # Exceeds the 1000 entry limit
security_manager.security_log.append({
"timestamp": i,
"command": f"test_{i}",
"security_level": "low",
"target_language": "es",
"user_consent": False,
"text_length": 10
})
# Trigger log cleanup (happens automatically on new entries)
await security_manager.translate_with_security(
TranslationRequest(
text="Test",
target_language="es",
command_name="help",
security_level=SecurityLevel.LOW
)
)
# Verify log size is limited
assert len(security_manager.security_log) <= 1000
class TestSecurityCompliance:
"""Test security compliance requirements"""
@pytest.fixture
def security_manager(self):
return CLITranslationSecurityManager()
@pytest.mark.asyncio
async def test_critical_commands_never_use_external_apis(self, security_manager):
"""Test that critical commands never use external APIs"""
critical_commands = ["agent", "strategy", "wallet", "sign", "deploy"]
for command in critical_commands:
request = TranslationRequest(
text="Test message",
target_language="es",
command_name=command,
security_level=SecurityLevel.CRITICAL
)
response = await security_manager.translate_with_security(request)
# Should never use external methods
assert response.method_used in ["disabled", "consent_required"]
assert response.security_compliant is True
@pytest.mark.asyncio
async def test_sensitive_data_never_sent_externally(self, security_manager):
"""Test that sensitive data is never sent to external APIs"""
sensitive_data = "Private key: 0x1234567890abcdef"
request = TranslationRequest(
text=sensitive_data,
target_language="es",
command_name="help", # Low security, but sensitive data
security_level=SecurityLevel.LOW
)
# Mock external translation to capture what would be sent
sent_data = []
def mock_external_translate(req, policy):
sent_data.append(req.text)
raise Exception("Simulated failure")
with patch.object(security_manager, '_external_translate', side_effect=mock_external_translate):
response = await security_manager.translate_with_security(request)
# For this test, we're using low security, so it would attempt external
# In a real implementation, sensitive data detection would prevent this
assert len(sent_data) > 0 # Data would be sent (this test shows the risk)
@pytest.mark.asyncio
async def test_always_fallback_to_original_text(self, security_manager):
"""Test that translation always falls back to original text"""
request = TranslationRequest(
text="Original important message",
target_language="es",
command_name="help",
security_level=SecurityLevel.LOW
)
# Mock all translation methods to fail
with patch.object(security_manager, '_external_translate', side_effect=Exception("External failed")), \
patch.object(security_manager, '_local_translate', side_effect=Exception("Local failed")):
response = await security_manager.translate_with_security(request)
# Should fallback to original text
assert response.translated_text == request.text
assert response.success is False
assert response.fallback_used is True
assert "Falling back to original text for security" in response.warning_messages
if __name__ == "__main__":
pytest.main([__file__, "-v"])