Files
aitbc/tests/test_agent_wallet_security.py
oib 15427c96c0 chore: update file permissions to executable across repository
- Change file mode from 644 to 755 for all project files
- Add chain_id parameter to get_balance RPC endpoint with default "ait-devnet"
- Rename Miner.extra_meta_data to extra_metadata for consistency
2026-03-06 22:17:54 +01:00

553 lines
20 KiB
Python
Executable File

"""
Tests for AITBC Agent Wallet Security System
Comprehensive test suite for the guardian contract system that protects
autonomous agent wallets from unlimited spending in case of compromise.
"""
import pytest
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
from eth_account import Account
from eth_utils import to_checksum_address
from aitbc_chain.contracts.guardian_contract import (
GuardianContract,
SpendingLimit,
TimeLockConfig,
GuardianConfig,
create_guardian_contract,
CONSERVATIVE_CONFIG,
AGGRESSIVE_CONFIG,
HIGH_SECURITY_CONFIG
)
from aitbc_chain.contracts.agent_wallet_security import (
AgentWalletSecurity,
AgentSecurityProfile,
register_agent_for_protection,
protect_agent_transaction,
get_agent_security_summary,
generate_security_report,
detect_suspicious_activity
)
class TestGuardianContract:
"""Test the core guardian contract functionality"""
@pytest.fixture
def sample_config(self):
"""Sample guardian configuration for testing"""
limits = SpendingLimit(
per_transaction=100,
per_hour=500,
per_day=2000,
per_week=10000
)
time_lock = TimeLockConfig(
threshold=1000,
delay_hours=24,
max_delay_hours=168
)
guardians = [to_checksum_address(f"0x{'0'*38}{i:02d}") for i in range(3)]
return GuardianConfig(
limits=limits,
time_lock=time_lock,
guardians=guardians
)
@pytest.fixture
def guardian_contract(self, sample_config):
"""Create a guardian contract for testing"""
agent_address = to_checksum_address("0x1234567890123456789012345678901234567890")
return GuardianContract(agent_address, sample_config)
def test_spending_limit_enforcement(self, guardian_contract):
"""Test that spending limits are properly enforced"""
# Test per-transaction limit
result = guardian_contract.initiate_transaction(
to_address="0xabcdef123456789012345678901234567890abcd",
amount=150 # Exceeds per_transaction limit of 100
)
assert result["status"] == "rejected"
assert "per-transaction limit" in result["reason"]
# Test within limits
result = guardian_contract.initiate_transaction(
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50 # Within limits
)
assert result["status"] == "approved"
assert "operation_id" in result
def test_time_lock_functionality(self, guardian_contract):
"""Test time lock for large transactions"""
# Test time lock threshold
result = guardian_contract.initiate_transaction(
to_address="0xabcdef123456789012345678901234567890abcd",
amount=1500 # Exceeds time lock threshold of 1000
)
assert result["status"] == "time_locked"
assert "unlock_time" in result
assert result["delay_hours"] == 24
# Test execution before unlock time
operation_id = result["operation_id"]
exec_result = guardian_contract.execute_transaction(
operation_id=operation_id,
signature="mock_signature"
)
assert exec_result["status"] == "error"
assert "locked until" in exec_result["reason"]
def test_hourly_spending_limits(self, guardian_contract):
"""Test hourly spending limit enforcement"""
# Create multiple transactions within hour limit
for i in range(5): # 5 transactions of 100 each = 500 (hourly limit)
result = guardian_contract.initiate_transaction(
to_address=f"0xabcdef123456789012345678901234567890ab{i:02d}",
amount=100
)
if i < 4: # First 4 should be approved
assert result["status"] == "approved"
# Execute the transaction
guardian_contract.execute_transaction(
operation_id=result["operation_id"],
signature="mock_signature"
)
else: # 5th should be rejected (exceeds hourly limit)
assert result["status"] == "rejected"
assert "Hourly spending" in result["reason"]
def test_emergency_pause(self, guardian_contract):
"""Test emergency pause functionality"""
guardian_address = guardian_contract.config.guardians[0]
# Test emergency pause
result = guardian_contract.emergency_pause(guardian_address)
assert result["status"] == "paused"
assert result["guardian"] == guardian_address
# Test that transactions are rejected during pause
tx_result = guardian_contract.initiate_transaction(
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50
)
assert tx_result["status"] == "rejected"
assert "paused" in tx_result["reason"]
def test_unauthorized_operations(self, guardian_contract):
"""Test that unauthorized operations are rejected"""
unauthorized_address = to_checksum_address("0xunauthorized123456789012345678901234567890")
# Test unauthorized emergency pause
result = guardian_contract.emergency_pause(unauthorized_address)
assert result["status"] == "rejected"
assert "Not authorized" in result["reason"]
# Test unauthorized limit updates
new_limits = SpendingLimit(200, 1000, 4000, 20000)
result = guardian_contract.update_limits(new_limits, unauthorized_address)
assert result["status"] == "rejected"
assert "Not authorized" in result["reason"]
def test_spending_status_tracking(self, guardian_contract):
"""Test spending status tracking and reporting"""
# Execute some transactions
for i in range(3):
result = guardian_contract.initiate_transaction(
to_address=f"0xabcdef123456789012345678901234567890ab{i:02d}",
amount=50
)
if result["status"] == "approved":
guardian_contract.execute_transaction(
operation_id=result["operation_id"],
signature="mock_signature"
)
status = guardian_contract.get_spending_status()
assert status["agent_address"] == guardian_contract.agent_address
assert status["spent"]["current_hour"] == 150 # 3 * 50
assert status["remaining"]["current_hour"] == 350 # 500 - 150
assert status["nonce"] == 3
class TestAgentWalletSecurity:
"""Test the agent wallet security manager"""
@pytest.fixture
def security_manager(self):
"""Create a security manager for testing"""
return AgentWalletSecurity()
@pytest.fixture
def sample_agent(self):
"""Sample agent address for testing"""
return to_checksum_address("0x1234567890123456789012345678901234567890")
@pytest.fixture
def sample_guardians(self):
"""Sample guardian addresses for testing"""
return [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4) # Guardians 01, 02, 03
]
def test_agent_registration(self, security_manager, sample_agent, sample_guardians):
"""Test agent registration for security protection"""
result = security_manager.register_agent(
agent_address=sample_agent,
security_level="conservative",
guardian_addresses=sample_guardians
)
assert result["status"] == "registered"
assert result["agent_address"] == sample_agent
assert result["security_level"] == "conservative"
assert len(result["guardian_addresses"]) == 3
assert "limits" in result
# Verify agent is in registry
assert sample_agent in security_manager.agent_profiles
assert sample_agent in security_manager.guardian_contracts
def test_duplicate_registration(self, security_manager, sample_agent, sample_guardians):
"""Test that duplicate registrations are rejected"""
# Register agent once
security_manager.register_agent(sample_agent, "conservative", sample_guardians)
# Try to register again
result = security_manager.register_agent(sample_agent, "aggressive", sample_guardians)
assert result["status"] == "error"
assert "already registered" in result["reason"]
def test_transaction_protection(self, security_manager, sample_agent, sample_guardians):
"""Test transaction protection for registered agents"""
# Register agent
security_manager.register_agent(sample_agent, "conservative", sample_guardians)
# Protect transaction
result = security_manager.protect_transaction(
agent_address=sample_agent,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50
)
assert result["status"] == "approved"
assert "operation_id" in result
# Test transaction exceeding limits
result = security_manager.protect_transaction(
agent_address=sample_agent,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=150 # Exceeds conservative per-transaction limit
)
assert result["status"] == "rejected"
assert "per-transaction limit" in result["reason"]
def test_unprotected_agent_transactions(self, security_manager, sample_agent):
"""Test transactions from unregistered agents"""
result = security_manager.protect_transaction(
agent_address=sample_agent,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50
)
assert result["status"] == "unprotected"
assert "not registered" in result["reason"]
def test_emergency_pause_integration(self, security_manager, sample_agent, sample_guardians):
"""Test emergency pause functionality"""
# Register agent
security_manager.register_agent(sample_agent, "conservative", sample_guardians)
# Emergency pause by guardian
result = security_manager.emergency_pause_agent(
agent_address=sample_agent,
guardian_address=sample_guardians[0]
)
assert result["status"] == "paused"
# Verify transactions are blocked
tx_result = security_manager.protect_transaction(
agent_address=sample_agent,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50
)
assert tx_result["status"] == "unprotected"
assert "disabled" in tx_result["reason"]
def test_security_status_reporting(self, security_manager, sample_agent, sample_guardians):
"""Test security status reporting"""
# Register agent
security_manager.register_agent(sample_agent, "conservative", sample_guardians)
# Get security status
status = security_manager.get_agent_security_status(sample_agent)
assert status["status"] == "protected"
assert status["agent_address"] == sample_agent
assert status["security_level"] == "conservative"
assert status["enabled"] == True
assert len(status["guardian_addresses"]) == 3
assert "spending_status" in status
assert "pending_operations" in status
def test_security_level_configurations(self, security_manager, sample_agent, sample_guardians):
"""Test different security level configurations"""
configurations = [
("conservative", CONSERVATIVE_CONFIG),
("aggressive", AGGRESSIVE_CONFIG),
("high_security", HIGH_SECURITY_CONFIG)
]
for level, config in configurations:
# Register with specific security level
result = security_manager.register_agent(
sample_agent + f"_{level}",
level,
sample_guardians
)
assert result["status"] == "registered"
assert result["security_level"] == level
# Verify limits match configuration
limits = result["limits"]
assert limits.per_transaction == config["per_transaction"]
assert limits.per_hour == config["per_hour"]
assert limits.per_day == config["per_day"]
assert limits.per_week == config["per_week"]
class TestSecurityMonitoring:
"""Test security monitoring and detection features"""
@pytest.fixture
def security_manager(self):
"""Create a security manager with sample data"""
manager = AgentWalletSecurity()
# Register some test agents
agents = [
("0x1111111111111111111111111111111111111111", "conservative"),
("0x2222222222222222222222222222222222222222", "aggressive"),
("0x3333333333333333333333333333333333333333", "high_security")
]
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
for agent_addr, level in agents:
manager.register_agent(agent_addr, level, guardians)
return manager
def test_security_report_generation(self, security_manager):
"""Test comprehensive security report generation"""
report = generate_security_report()
assert "generated_at" in report
assert "summary" in report
assert "agents" in report
assert "recent_security_events" in report
assert "security_levels" in report
summary = report["summary"]
assert "total_protected_agents" in summary
assert "active_agents" in summary
assert "protection_coverage" in summary
# Verify all security levels are represented
levels = report["security_levels"]
assert "conservative" in levels
assert "aggressive" in levels
assert "high_security" in levels
def test_suspicious_activity_detection(self, security_manager):
"""Test suspicious activity detection"""
agent_addr = "0x1111111111111111111111111111111111111111"
# Test normal activity
result = detect_suspicious_activity(agent_addr, hours=24)
assert result["status"] == "analyzed"
assert result["suspicious_activity"] == False
# Simulate high activity by creating many transactions
# (This would require more complex setup in a real test)
def test_protected_agents_listing(self, security_manager):
"""Test listing of protected agents"""
agents = security_manager.list_protected_agents()
assert len(agents) == 3
for agent in agents:
assert "agent_address" in agent
assert "security_level" in agent
assert "enabled" in agent
assert "guardian_count" in agent
assert "pending_operations" in agent
assert "paused" in agent
assert "emergency_mode" in agent
assert "registered_at" in agent
class TestConvenienceFunctions:
"""Test convenience functions for common operations"""
def test_register_agent_for_protection(self):
"""Test the convenience registration function"""
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
result = register_agent_for_protection(
agent_address=agent_addr,
security_level="conservative",
guardians=guardians
)
assert result["status"] == "registered"
assert result["agent_address"] == agent_addr
assert result["security_level"] == "conservative"
def test_protect_agent_transaction(self):
"""Test the convenience transaction protection function"""
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
# Register first
register_agent_for_protection(agent_addr, "conservative", guardians)
# Protect transaction
result = protect_agent_transaction(
agent_address=agent_addr,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=50
)
assert result["status"] == "approved"
assert "operation_id" in result
def test_get_agent_security_summary(self):
"""Test the convenience security summary function"""
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
# Register first
register_agent_for_protection(agent_addr, "conservative", guardians)
# Get summary
summary = get_agent_security_summary(agent_addr)
assert summary["status"] == "protected"
assert summary["agent_address"] == agent_addr
assert summary["security_level"] == "conservative"
assert "spending_status" in summary
class TestSecurityEdgeCases:
"""Test edge cases and error conditions"""
def test_invalid_address_handling(self):
"""Test handling of invalid addresses"""
manager = AgentWalletSecurity()
# Test invalid agent address
result = manager.register_agent("invalid_address", "conservative")
assert result["status"] == "error"
# Test invalid guardian address
result = manager.register_agent(
"0x1234567890123456789012345678901234567890",
"conservative",
["invalid_guardian"]
)
assert result["status"] == "error"
def test_invalid_security_level(self):
"""Test handling of invalid security levels"""
manager = AgentWalletSecurity()
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
result = manager.register_agent(agent_addr, "invalid_level")
assert result["status"] == "error"
assert "Invalid security level" in result["reason"]
def test_zero_amount_transactions(self):
"""Test handling of zero amount transactions"""
manager = AgentWalletSecurity()
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
# Register agent
manager.register_agent(agent_addr, "conservative", guardians)
# Test zero amount transaction
result = manager.protect_transaction(
agent_address=agent_addr,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=0
)
# Zero amount should be allowed (no spending)
assert result["status"] == "approved"
def test_negative_amount_transactions(self):
"""Test handling of negative amount transactions"""
manager = AgentWalletSecurity()
agent_addr = to_checksum_address("0x1234567890123456789012345678901234567890")
guardians = [
to_checksum_address(f"0x{'0'*38}{i:02d}")
for i in range(1, 4)
]
# Register agent
manager.register_agent(agent_addr, "conservative", guardians)
# Test negative amount transaction
result = manager.protect_transaction(
agent_address=agent_addr,
to_address="0xabcdef123456789012345678901234567890abcd",
amount=-100
)
# Negative amounts should be rejected
assert result["status"] == "rejected"
if __name__ == "__main__":
pytest.main([__file__, "-v"])