diff --git a/apps/blockchain-node/tests/test_guardian_contract.py b/apps/blockchain-node/tests/test_guardian_contract.py new file mode 100644 index 00000000..4bfc9c86 --- /dev/null +++ b/apps/blockchain-node/tests/test_guardian_contract.py @@ -0,0 +1,441 @@ +"""Test suite for Guardian Contract - Agent wallet security and spending limits.""" + +from __future__ import annotations + +import pytest +import tempfile +import shutil +from datetime import datetime, timedelta +from pathlib import Path +from unittest.mock import patch, Mock +from typing import Generator + +from aitbc_chain.contracts.guardian_contract import ( + GuardianContract, + GuardianConfig, + SpendingLimit, + TimeLockConfig +) + + +@pytest.fixture +def temp_storage_dir() -> Generator[Path, None, None]: + """Create a temporary directory for contract storage.""" + temp_dir = Path(tempfile.mkdtemp()) + yield temp_dir + shutil.rmtree(temp_dir) + + +@pytest.fixture +def guardian_config() -> GuardianConfig: + """Create a test guardian configuration.""" + return GuardianConfig( + limits=SpendingLimit( + per_transaction=1000, + per_hour=5000, + per_day=20000, + per_week=100000 + ), + time_lock=TimeLockConfig( + threshold=5000, + delay_hours=24, + max_delay_hours=168 # 1 week max + ), + guardians=["0xguardian1", "0xguardian2", "0xguardian3"] + ) + + +@pytest.fixture +def agent_address() -> str: + """Test agent address.""" + return "0x1234567890123456789012345678901234567890" + + +@pytest.fixture +def guardian_contract( + agent_address: str, + guardian_config: GuardianConfig, + temp_storage_dir: Path +) -> Generator[GuardianContract, None, None]: + """Create a guardian contract instance.""" + contract = GuardianContract( + agent_address=agent_address, + config=guardian_config, + storage_path=str(temp_storage_dir) + ) + yield contract + # Cleanup is handled by temp_storage_dir fixture + + +class TestGuardianContract: + """Test Guardian Contract functionality.""" + + def test_contract_initialization( + self, + agent_address: str, + guardian_config: GuardianConfig, + temp_storage_dir: Path + ) -> None: + """Test contract initialization.""" + contract = GuardianContract( + agent_address=agent_address, + config=guardian_config, + storage_path=str(temp_storage_dir) + ) + + assert contract.agent_address == agent_address.lower() + assert contract.config == guardian_config + assert contract.storage_dir == temp_storage_dir + assert contract.paused is False + assert contract.emergency_mode is False + assert contract.nonce == 0 + assert len(contract.spending_history) == 0 + assert len(contract.pending_operations) == 0 + + def test_storage_initialization(self, guardian_contract: GuardianContract) -> None: + """Test that storage is properly initialized.""" + assert guardian_contract.db_path.exists() + assert guardian_contract.db_path.is_file() + + def test_spending_limit_check_per_transaction(self, guardian_contract: GuardianContract) -> None: + """Test per-transaction spending limit.""" + # Should pass for amount within limit + allowed, message = guardian_contract._check_spending_limits(500) + assert allowed is True + assert "passed" in message.lower() + + # Should fail for amount exceeding limit + allowed, message = guardian_contract._check_spending_limits(1500) + assert allowed is False + assert "per-transaction limit" in message + + def test_spending_limit_check_hourly(self, guardian_contract: GuardianContract) -> None: + """Test hourly spending limit.""" + # Add some spending history + base_time = datetime.utcnow() + guardian_contract.spending_history = [ + { + "operation_id": "op1", + "to": "0xrecipient", + "amount": 3000, + "data": "", + "timestamp": base_time.isoformat(), + "executed_at": base_time.isoformat(), + "status": "completed", + "nonce": 1 + } + ] + + # Should fail when exceeding hourly limit + allowed, message = guardian_contract._check_spending_limits(2500, base_time) + assert allowed is False + assert "hourly spending" in message + + # Should pass for smaller amount + allowed, message = guardian_contract._check_spending_limits(1500, base_time) + assert allowed is True + + def test_spending_limit_check_daily(self, guardian_contract: GuardianContract) -> None: + """Test daily spending limit.""" + # Add spending history across the day + base_time = datetime.utcnow() + guardian_contract.spending_history = [ + { + "operation_id": "op1", + "to": "0xrecipient", + "amount": 15000, + "data": "", + "timestamp": base_time.isoformat(), + "executed_at": base_time.isoformat(), + "status": "completed", + "nonce": 1 + } + ] + + # Should fail when exceeding daily limit + allowed, message = guardian_contract._check_spending_limits(6000, base_time) + assert allowed is False + assert "daily spending" in message + + # Should pass for smaller amount + allowed, message = guardian_contract._check_spending_limits(4000, base_time) + assert allowed is True + + def test_spending_limit_check_weekly(self, guardian_contract: GuardianContract) -> None: + """Test weekly spending limit.""" + # Add spending history across the week + base_time = datetime.utcnow() + guardian_contract.spending_history = [ + { + "operation_id": "op1", + "to": "0xrecipient", + "amount": 80000, + "data": "", + "timestamp": base_time.isoformat(), + "executed_at": base_time.isoformat(), + "status": "completed", + "nonce": 1 + } + ] + + # Should fail when exceeding weekly limit + allowed, message = guardian_contract._check_spending_limits(25000, base_time) + assert allowed is False + assert "weekly spending" in message + + # Should pass for smaller amount + allowed, message = guardian_contract._check_spending_limits(15000, base_time) + assert allowed is True + + def test_time_lock_requirement(self, guardian_contract: GuardianContract) -> None: + """Test time lock requirement for large amounts.""" + # Should require time lock for amounts >= threshold + assert guardian_contract._requires_time_lock(5000) is True + assert guardian_contract._requires_time_lock(10000) is True + + # Should not require time lock for amounts < threshold + assert guardian_contract._requires_time_lock(4000) is False + assert guardian_contract._requires_time_lock(1000) is False + + def test_initiate_transaction_small_amount(self, guardian_contract: GuardianContract) -> None: + """Test initiating transaction with small amount (no time lock).""" + result = guardian_contract.initiate_transaction( + to_address="0xrecipient", + amount=1000, + data="test transaction" + ) + + assert result["status"] == "approved" + assert "operation_id" in result + assert "approved for execution" in result["message"] + + # Check operation is stored + operation_id = result["operation_id"] + assert operation_id in guardian_contract.pending_operations + assert guardian_contract.pending_operations[operation_id]["status"] == "pending" + + def test_initiate_transaction_large_amount(self, guardian_contract: GuardianContract) -> None: + """Test initiating transaction with large amount (time lock required).""" + result = guardian_contract.initiate_transaction( + to_address="0xrecipient", + amount=6000, + data="large transaction" + ) + + assert result["status"] == "time_locked" + assert "operation_id" in result + assert "unlock_time" in result + assert "delay_hours" in result + assert result["delay_hours"] == 24 + assert "time lock" in result["message"].lower() + + # Check operation is stored with time lock + operation_id = result["operation_id"] + assert operation_id in guardian_contract.pending_operations + assert guardian_contract.pending_operations[operation_id]["status"] == "time_locked" + assert "unlock_time" in guardian_contract.pending_operations[operation_id] + + def test_initiate_transaction_exceeds_limit(self, guardian_contract: GuardianContract) -> None: + """Test initiating transaction that exceeds spending limits.""" + result = guardian_contract.initiate_transaction( + to_address="0xrecipient", + amount=1500, # Exceeds per-transaction limit + data="excessive transaction" + ) + + assert result["status"] == "rejected" + assert "exceeds per-transaction limit" in result["message"] + assert "operation_id" not in result + + def test_execute_transaction_success(self, guardian_contract: GuardianContract) -> None: + """Test successful transaction execution.""" + # First initiate a transaction + init_result = guardian_contract.initiate_transaction( + to_address="0xrecipient", + amount=1000, + data="test transaction" + ) + + operation_id = init_result["operation_id"] + signature = "0xsignature" + + # Execute the transaction + result = guardian_contract.execute_transaction(operation_id, signature) + + assert result["status"] == "executed" + assert operation_id in result + assert "executed successfully" in result["message"] + + # Check operation is no longer pending + assert operation_id not in guardian_contract.pending_operations + + # Check it's in spending history + assert len(guardian_contract.spending_history) > 0 + executed_tx = next(tx for tx in guardian_contract.spending_history if tx["operation_id"] == operation_id) + assert executed_tx["status"] == "completed" + assert executed_tx["to"] == "0xrecipient" + assert executed_tx["amount"] == 1000 + + def test_execute_transaction_not_found(self, guardian_contract: GuardianContract) -> None: + """Test executing transaction that doesn't exist.""" + result = guardian_contract.execute_transaction("nonexistent_id", "0xsignature") + + assert result["status"] == "error" + assert "not found" in result["message"].lower() + + def test_execute_transaction_time_locked(self, guardian_contract: GuardianContract) -> None: + """Test executing transaction that is still time locked.""" + # Initiate a large transaction that gets time locked + init_result = guardian_contract.initiate_transaction( + to_address="0xrecipient", + amount=6000, + data="large transaction" + ) + + operation_id = init_result["operation_id"] + signature = "0xsignature" + + # Try to execute before time lock expires + result = guardian_contract.execute_transaction(operation_id, signature) + + assert result["status"] == "error" + assert "time locked" in result["message"].lower() + + def test_emergency_pause(self, guardian_contract: GuardianContract) -> None: + """Test emergency pause functionality.""" + # Emergency pause + result = guardian_contract.emergency_pause("0xguardian1") + + assert result["status"] == "paused" + assert "Emergency pause activated" in result["message"] + assert guardian_contract.paused is True + + def test_emergency_unpause(self, guardian_contract: GuardianContract) -> None: + """Test emergency unpause functionality.""" + # First pause + guardian_contract.emergency_pause("0xguardian1") + + # Then unpause with signatures + result = guardian_contract.emergency_unpause(["0xguardian1", "0xguardian2"]) + + assert result["status"] == "active" + assert "Emergency pause lifted" in result["message"] + assert guardian_contract.paused is False + + def test_get_spending_status(self, guardian_contract: GuardianContract) -> None: + """Test getting spending status.""" + status = guardian_contract.get_spending_status() + + assert "spent_hour" in status + assert "spent_day" in status + assert "spent_week" in status + assert "limits" in status + assert "paused" in status + assert "emergency_mode" in status + assert "nonce" in status + + def test_period_key_generation(self, guardian_contract: GuardianContract) -> None: + """Test period key generation for different time periods.""" + base_time = datetime(2023, 6, 15, 14, 30, 0) # Thursday 2:30 PM + + # Hour key should be YYYY-MM-DD-HH + hour_key = guardian_contract._get_period_key(base_time, "hour") + assert hour_key == "2023-06-15-14" + + # Day key should be YYYY-MM-DD + day_key = guardian_contract._get_period_key(base_time, "day") + assert day_key == "2023-06-15" + + # Week key should be YYYY-WW (ISO week) + week_key = guardian_contract._get_period_key(base_time, "week") + assert week_key.startswith("2023-") # Should be 2023-W24 for this date + + def test_operation_hash_creation(self, guardian_contract: GuardianContract) -> None: + """Test operation hash creation.""" + operation = { + "to": "0xrecipient", + "amount": 1000, + "nonce": 1 + } + + hash1 = guardian_contract._create_operation_hash(operation) + hash2 = guardian_contract._create_operation_hash(operation) + + # Same operation should produce same hash + assert hash1 == hash2 + assert len(hash1) == 64 # 64 hex chars (no 0x prefix) + + def test_persistence_across_instances( + self, + agent_address: str, + guardian_config: GuardianConfig, + temp_storage_dir: Path + ) -> None: + """Test that contract state persists across instances.""" + # Create first instance and add data + contract1 = GuardianContract( + agent_address=agent_address, + config=guardian_config, + storage_path=str(temp_storage_dir) + ) + + contract1.initiate_transaction( + to_address="0xrecipient", + amount=1000, + data="persistence test" + ) + + # Create second instance and check data is loaded + contract2 = GuardianContract( + agent_address=agent_address, + config=guardian_config, + storage_path=str(temp_storage_dir) + ) + + assert len(contract2.pending_operations) == 1 + assert contract2.nonce == 1 + + def test_config_properties(self, guardian_contract: GuardianContract) -> None: + """Test that configuration properties are properly set.""" + assert guardian_contract.config.limits.per_transaction == 1000 + assert guardian_contract.config.limits.per_hour == 5000 + assert guardian_contract.config.limits.per_day == 20000 + assert guardian_contract.config.limits.per_week == 100000 + assert guardian_contract.config.time_lock.threshold == 5000 + assert guardian_contract.config.time_lock.delay_hours == 24 + assert guardian_contract.config.time_lock.max_delay_hours == 168 + assert guardian_contract.config.guardians == ["0xguardian1", "0xguardian2", "0xguardian3"] + + def test_nonce_increment(self, guardian_contract: GuardianContract) -> None: + """Test that nonce increments properly.""" + initial_nonce = guardian_contract.nonce + + # Initiate transaction + guardian_contract.initiate_transaction( + to_address="0xrecipient", + amount=1000, + data="nonce test" + ) + + assert guardian_contract.nonce == initial_nonce + 1 + + def test_get_pending_operations(self, guardian_contract: GuardianContract) -> None: + """Test getting list of pending operations.""" + # Add some pending operations + result1 = guardian_contract.initiate_transaction( + to_address="0xrecipient1", + amount=1000, + data="pending 1" + ) + + result2 = guardian_contract.initiate_transaction( + to_address="0xrecipient2", + amount=2000, + data="pending 2" + ) + + pending = guardian_contract.get_pending_operations() + + assert len(pending) == 2 + assert result1["operation_id"] in pending + assert result2["operation_id"] in pending