"""Test suite for Guardian Contract - Agent wallet security and spending limits.""" from __future__ import annotations import pytest import tempfile import shutil from datetime import datetime, timedelta from pathlib import Path from unittest.mock import patch, Mock from typing import Generator from aitbc_chain.contracts.guardian_contract import ( GuardianContract, GuardianConfig, SpendingLimit, TimeLockConfig ) @pytest.fixture def temp_storage_dir() -> Generator[Path, None, None]: """Create a temporary directory for contract storage.""" temp_dir = Path(tempfile.mkdtemp()) yield temp_dir shutil.rmtree(temp_dir) @pytest.fixture def guardian_config() -> GuardianConfig: """Create a test guardian configuration.""" return GuardianConfig( limits=SpendingLimit( per_transaction=1000, per_hour=5000, per_day=20000, per_week=100000 ), time_lock=TimeLockConfig( threshold=5000, delay_hours=24, max_delay_hours=168 # 1 week max ), guardians=["0xguardian1", "0xguardian2", "0xguardian3"] ) @pytest.fixture def agent_address() -> str: """Test agent address.""" return "0x1234567890123456789012345678901234567890" @pytest.fixture def guardian_contract( agent_address: str, guardian_config: GuardianConfig, temp_storage_dir: Path ) -> Generator[GuardianContract, None, None]: """Create a guardian contract instance.""" contract = GuardianContract( agent_address=agent_address, config=guardian_config, storage_path=str(temp_storage_dir) ) yield contract # Cleanup is handled by temp_storage_dir fixture class TestGuardianContract: """Test Guardian Contract functionality.""" def test_contract_initialization( self, agent_address: str, guardian_config: GuardianConfig, temp_storage_dir: Path ) -> None: """Test contract initialization.""" contract = GuardianContract( agent_address=agent_address, config=guardian_config, storage_path=str(temp_storage_dir) ) assert contract.agent_address == agent_address.lower() assert contract.config == guardian_config assert contract.storage_dir == temp_storage_dir assert contract.paused is False assert contract.emergency_mode is False assert contract.nonce == 0 assert len(contract.spending_history) == 0 assert len(contract.pending_operations) == 0 def test_storage_initialization(self, guardian_contract: GuardianContract) -> None: """Test that storage is properly initialized.""" assert guardian_contract.db_path.exists() assert guardian_contract.db_path.is_file() def test_spending_limit_check_per_transaction(self, guardian_contract: GuardianContract) -> None: """Test per-transaction spending limit.""" # Should pass for amount within limit allowed, message = guardian_contract._check_spending_limits(500) assert allowed is True assert "passed" in message.lower() # Should fail for amount exceeding limit allowed, message = guardian_contract._check_spending_limits(1500) assert allowed is False assert "per-transaction limit" in message def test_spending_limit_check_hourly(self, guardian_contract: GuardianContract) -> None: """Test hourly spending limit.""" # Add some spending history base_time = datetime.utcnow() guardian_contract.spending_history = [ { "operation_id": "op1", "to": "0xrecipient", "amount": 3000, "data": "", "timestamp": base_time.isoformat(), "executed_at": base_time.isoformat(), "status": "completed", "nonce": 1 } ] # Should fail when exceeding hourly limit allowed, message = guardian_contract._check_spending_limits(2500, base_time) assert allowed is False assert "hourly spending" in message # Should pass for smaller amount allowed, message = guardian_contract._check_spending_limits(1500, base_time) assert allowed is True def test_spending_limit_check_daily(self, guardian_contract: GuardianContract) -> None: """Test daily spending limit.""" # Add spending history across the day base_time = datetime.utcnow() guardian_contract.spending_history = [ { "operation_id": "op1", "to": "0xrecipient", "amount": 15000, "data": "", "timestamp": base_time.isoformat(), "executed_at": base_time.isoformat(), "status": "completed", "nonce": 1 } ] # Should fail when exceeding daily limit allowed, message = guardian_contract._check_spending_limits(6000, base_time) assert allowed is False assert "daily spending" in message # Should pass for smaller amount allowed, message = guardian_contract._check_spending_limits(4000, base_time) assert allowed is True def test_spending_limit_check_weekly(self, guardian_contract: GuardianContract) -> None: """Test weekly spending limit.""" # Add spending history across the week base_time = datetime.utcnow() guardian_contract.spending_history = [ { "operation_id": "op1", "to": "0xrecipient", "amount": 80000, "data": "", "timestamp": base_time.isoformat(), "executed_at": base_time.isoformat(), "status": "completed", "nonce": 1 } ] # Should fail when exceeding weekly limit allowed, message = guardian_contract._check_spending_limits(25000, base_time) assert allowed is False assert "weekly spending" in message # Should pass for smaller amount allowed, message = guardian_contract._check_spending_limits(15000, base_time) assert allowed is True def test_time_lock_requirement(self, guardian_contract: GuardianContract) -> None: """Test time lock requirement for large amounts.""" # Should require time lock for amounts >= threshold assert guardian_contract._requires_time_lock(5000) is True assert guardian_contract._requires_time_lock(10000) is True # Should not require time lock for amounts < threshold assert guardian_contract._requires_time_lock(4000) is False assert guardian_contract._requires_time_lock(1000) is False def test_initiate_transaction_small_amount(self, guardian_contract: GuardianContract) -> None: """Test initiating transaction with small amount (no time lock).""" result = guardian_contract.initiate_transaction( to_address="0xrecipient", amount=1000, data="test transaction" ) assert result["status"] == "approved" assert "operation_id" in result assert "approved for execution" in result["message"] # Check operation is stored operation_id = result["operation_id"] assert operation_id in guardian_contract.pending_operations assert guardian_contract.pending_operations[operation_id]["status"] == "pending" def test_initiate_transaction_large_amount(self, guardian_contract: GuardianContract) -> None: """Test initiating transaction with large amount (time lock required).""" result = guardian_contract.initiate_transaction( to_address="0xrecipient", amount=6000, data="large transaction" ) assert result["status"] == "time_locked" assert "operation_id" in result assert "unlock_time" in result assert "delay_hours" in result assert result["delay_hours"] == 24 assert "time lock" in result["message"].lower() # Check operation is stored with time lock operation_id = result["operation_id"] assert operation_id in guardian_contract.pending_operations assert guardian_contract.pending_operations[operation_id]["status"] == "time_locked" assert "unlock_time" in guardian_contract.pending_operations[operation_id] def test_initiate_transaction_exceeds_limit(self, guardian_contract: GuardianContract) -> None: """Test initiating transaction that exceeds spending limits.""" result = guardian_contract.initiate_transaction( to_address="0xrecipient", amount=1500, # Exceeds per-transaction limit data="excessive transaction" ) assert result["status"] == "rejected" assert "exceeds per-transaction limit" in result["message"] assert "operation_id" not in result def test_execute_transaction_success(self, guardian_contract: GuardianContract) -> None: """Test successful transaction execution.""" # First initiate a transaction init_result = guardian_contract.initiate_transaction( to_address="0xrecipient", amount=1000, data="test transaction" ) operation_id = init_result["operation_id"] signature = "0xsignature" # Execute the transaction result = guardian_contract.execute_transaction(operation_id, signature) assert result["status"] == "executed" assert operation_id in result assert "executed successfully" in result["message"] # Check operation is no longer pending assert operation_id not in guardian_contract.pending_operations # Check it's in spending history assert len(guardian_contract.spending_history) > 0 executed_tx = next(tx for tx in guardian_contract.spending_history if tx["operation_id"] == operation_id) assert executed_tx["status"] == "completed" assert executed_tx["to"] == "0xrecipient" assert executed_tx["amount"] == 1000 def test_execute_transaction_not_found(self, guardian_contract: GuardianContract) -> None: """Test executing transaction that doesn't exist.""" result = guardian_contract.execute_transaction("nonexistent_id", "0xsignature") assert result["status"] == "error" assert "not found" in result["message"].lower() def test_execute_transaction_time_locked(self, guardian_contract: GuardianContract) -> None: """Test executing transaction that is still time locked.""" # Initiate a large transaction that gets time locked init_result = guardian_contract.initiate_transaction( to_address="0xrecipient", amount=6000, data="large transaction" ) operation_id = init_result["operation_id"] signature = "0xsignature" # Try to execute before time lock expires result = guardian_contract.execute_transaction(operation_id, signature) assert result["status"] == "error" assert "time locked" in result["message"].lower() def test_emergency_pause(self, guardian_contract: GuardianContract) -> None: """Test emergency pause functionality.""" # Emergency pause result = guardian_contract.emergency_pause("0xguardian1") assert result["status"] == "paused" assert "Emergency pause activated" in result["message"] assert guardian_contract.paused is True def test_emergency_unpause(self, guardian_contract: GuardianContract) -> None: """Test emergency unpause functionality.""" # First pause guardian_contract.emergency_pause("0xguardian1") # Then unpause with signatures result = guardian_contract.emergency_unpause(["0xguardian1", "0xguardian2"]) assert result["status"] == "active" assert "Emergency pause lifted" in result["message"] assert guardian_contract.paused is False def test_get_spending_status(self, guardian_contract: GuardianContract) -> None: """Test getting spending status.""" status = guardian_contract.get_spending_status() assert "spent_hour" in status assert "spent_day" in status assert "spent_week" in status assert "limits" in status assert "paused" in status assert "emergency_mode" in status assert "nonce" in status def test_period_key_generation(self, guardian_contract: GuardianContract) -> None: """Test period key generation for different time periods.""" base_time = datetime(2023, 6, 15, 14, 30, 0) # Thursday 2:30 PM # Hour key should be YYYY-MM-DD-HH hour_key = guardian_contract._get_period_key(base_time, "hour") assert hour_key == "2023-06-15-14" # Day key should be YYYY-MM-DD day_key = guardian_contract._get_period_key(base_time, "day") assert day_key == "2023-06-15" # Week key should be YYYY-WW (ISO week) week_key = guardian_contract._get_period_key(base_time, "week") assert week_key.startswith("2023-") # Should be 2023-W24 for this date def test_operation_hash_creation(self, guardian_contract: GuardianContract) -> None: """Test operation hash creation.""" operation = { "to": "0xrecipient", "amount": 1000, "nonce": 1 } hash1 = guardian_contract._create_operation_hash(operation) hash2 = guardian_contract._create_operation_hash(operation) # Same operation should produce same hash assert hash1 == hash2 assert len(hash1) == 64 # 64 hex chars (no 0x prefix) def test_persistence_across_instances( self, agent_address: str, guardian_config: GuardianConfig, temp_storage_dir: Path ) -> None: """Test that contract state persists across instances.""" # Create first instance and add data contract1 = GuardianContract( agent_address=agent_address, config=guardian_config, storage_path=str(temp_storage_dir) ) contract1.initiate_transaction( to_address="0xrecipient", amount=1000, data="persistence test" ) # Create second instance and check data is loaded contract2 = GuardianContract( agent_address=agent_address, config=guardian_config, storage_path=str(temp_storage_dir) ) assert len(contract2.pending_operations) == 1 assert contract2.nonce == 1 def test_config_properties(self, guardian_contract: GuardianContract) -> None: """Test that configuration properties are properly set.""" assert guardian_contract.config.limits.per_transaction == 1000 assert guardian_contract.config.limits.per_hour == 5000 assert guardian_contract.config.limits.per_day == 20000 assert guardian_contract.config.limits.per_week == 100000 assert guardian_contract.config.time_lock.threshold == 5000 assert guardian_contract.config.time_lock.delay_hours == 24 assert guardian_contract.config.time_lock.max_delay_hours == 168 assert guardian_contract.config.guardians == ["0xguardian1", "0xguardian2", "0xguardian3"] def test_nonce_increment(self, guardian_contract: GuardianContract) -> None: """Test that nonce increments properly.""" initial_nonce = guardian_contract.nonce # Initiate transaction guardian_contract.initiate_transaction( to_address="0xrecipient", amount=1000, data="nonce test" ) assert guardian_contract.nonce == initial_nonce + 1 def test_get_pending_operations(self, guardian_contract: GuardianContract) -> None: """Test getting list of pending operations.""" # Add some pending operations result1 = guardian_contract.initiate_transaction( to_address="0xrecipient1", amount=1000, data="pending 1" ) result2 = guardian_contract.initiate_transaction( to_address="0xrecipient2", amount=2000, data="pending 2" ) pending = guardian_contract.get_pending_operations() assert len(pending) == 2 assert result1["operation_id"] in pending assert result2["operation_id"] in pending