- Changed bare except clauses to specific exception types in web3_utils.py, testing.py, messages.py, and message_storage.py - Replaced print() calls with logger in testing.py, agent_discovery.py, compliance_agent.py, coordinator.py, trading_agent.py, keys.py, escrow.py, persistent_spending_tracker.py, sync_cli.py, and client.py - Added logger initialization using get_logger(__name__) in compliance_agent.py, coordinator.py, trading_agent.py, keys.py, escrow.py, persistent_spending_tracker.py, and client.py - Removed hardcoded secret
402 lines
13 KiB
Python
402 lines
13 KiB
Python
"""
|
|
Testing utilities for AITBC
|
|
Provides mock factories, test data generators, and test helpers
|
|
"""
|
|
|
|
import secrets
|
|
import json
|
|
from typing import Any, Dict, List, Optional, Type, TypeVar, Callable
|
|
from datetime import datetime, timezone, timedelta
|
|
from dataclasses import dataclass, field
|
|
from decimal import Decimal
|
|
import uuid
|
|
|
|
|
|
T = TypeVar('T')
|
|
|
|
|
|
class MockFactory:
|
|
"""Factory for creating mock objects for testing"""
|
|
|
|
@staticmethod
|
|
def generate_string(length: int = 10, prefix: str = "") -> str:
|
|
"""Generate a random string"""
|
|
random_part = secrets.token_urlsafe(length)[:length]
|
|
return f"{prefix}{random_part}"
|
|
|
|
@staticmethod
|
|
def generate_email() -> str:
|
|
"""Generate a random email address"""
|
|
return f"{MockFactory.generate_string(8)}@example.com"
|
|
|
|
@staticmethod
|
|
def generate_url() -> str:
|
|
"""Generate a random URL"""
|
|
return f"https://example.com/{MockFactory.generate_string(8)}"
|
|
|
|
@staticmethod
|
|
def generate_ip_address() -> str:
|
|
"""Generate a random IP address"""
|
|
return f"192.168.{secrets.randbelow(256)}.{secrets.randbelow(256)}"
|
|
|
|
@staticmethod
|
|
def generate_ethereum_address() -> str:
|
|
"""Generate a random Ethereum address"""
|
|
return f"0x{''.join(secrets.choice('0123456789abcdef') for _ in range(40))}"
|
|
|
|
@staticmethod
|
|
def generate_bitcoin_address() -> str:
|
|
"""Generate a random Bitcoin-like address"""
|
|
return f"1{''.join(secrets.choice('123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz') for _ in range(33))}"
|
|
|
|
@staticmethod
|
|
def generate_uuid() -> str:
|
|
"""Generate a UUID"""
|
|
return str(uuid.uuid4())
|
|
|
|
@staticmethod
|
|
def generate_hash(length: int = 64) -> str:
|
|
"""Generate a random hash string"""
|
|
return secrets.token_hex(length)[:length]
|
|
|
|
|
|
class TestDataGenerator:
|
|
"""Generate test data for various use cases"""
|
|
|
|
@staticmethod
|
|
def generate_user_data(**overrides) -> Dict[str, Any]:
|
|
"""Generate mock user data"""
|
|
data = {
|
|
"id": MockFactory.generate_uuid(),
|
|
"email": MockFactory.generate_email(),
|
|
"username": MockFactory.generate_string(8),
|
|
"first_name": MockFactory.generate_string(6),
|
|
"last_name": MockFactory.generate_string(6),
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"is_active": True,
|
|
"role": "user"
|
|
}
|
|
data.update(overrides)
|
|
return data
|
|
|
|
@staticmethod
|
|
def generate_transaction_data(**overrides) -> Dict[str, Any]:
|
|
"""Generate mock transaction data"""
|
|
data = {
|
|
"id": MockFactory.generate_uuid(),
|
|
"from_address": MockFactory.generate_ethereum_address(),
|
|
"to_address": MockFactory.generate_ethereum_address(),
|
|
"amount": str(secrets.randbelow(1000000000000000000)),
|
|
"gas_price": str(secrets.randbelow(100000000000)),
|
|
"gas_limit": secrets.randbelow(100000),
|
|
"nonce": secrets.randbelow(1000),
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"status": "pending"
|
|
}
|
|
data.update(overrides)
|
|
return data
|
|
|
|
@staticmethod
|
|
def generate_block_data(**overrides) -> Dict[str, Any]:
|
|
"""Generate mock block data"""
|
|
data = {
|
|
"number": secrets.randbelow(10000000),
|
|
"hash": MockFactory.generate_hash(),
|
|
"parent_hash": MockFactory.generate_hash(),
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"transactions": [],
|
|
"gas_used": str(secrets.randbelow(10000000)),
|
|
"gas_limit": str(15000000),
|
|
"miner": MockFactory.generate_ethereum_address()
|
|
}
|
|
data.update(overrides)
|
|
return data
|
|
|
|
@staticmethod
|
|
def generate_api_key_data(**overrides) -> Dict[str, Any]:
|
|
"""Generate mock API key data"""
|
|
data = {
|
|
"id": MockFactory.generate_uuid(),
|
|
"api_key": f"aitbc_{secrets.token_urlsafe(32)}",
|
|
"user_id": MockFactory.generate_uuid(),
|
|
"name": MockFactory.generate_string(10),
|
|
"scopes": ["read", "write"],
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"last_used": None,
|
|
"is_active": True
|
|
}
|
|
data.update(overrides)
|
|
return data
|
|
|
|
@staticmethod
|
|
def generate_wallet_data(**overrides) -> Dict[str, Any]:
|
|
"""Generate mock wallet data"""
|
|
data = {
|
|
"id": MockFactory.generate_uuid(),
|
|
"address": MockFactory.generate_ethereum_address(),
|
|
"chain_id": 1,
|
|
"balance": str(secrets.randbelow(1000000000000000000)),
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"is_active": True
|
|
}
|
|
data.update(overrides)
|
|
return data
|
|
|
|
|
|
class TestHelpers:
|
|
"""Helper functions for testing"""
|
|
|
|
@staticmethod
|
|
def assert_dict_contains(subset: Dict[str, Any], superset: Dict[str, Any]) -> bool:
|
|
"""Check if superset contains all key-value pairs from subset"""
|
|
for key, value in subset.items():
|
|
if key not in superset:
|
|
return False
|
|
if superset[key] != value:
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def assert_lists_equal_unordered(list1: List[Any], list2: List[Any]) -> bool:
|
|
"""Check if two lists contain the same elements regardless of order"""
|
|
return sorted(list1) == sorted(list2)
|
|
|
|
@staticmethod
|
|
def compare_json_objects(obj1: Any, obj2: Any) -> bool:
|
|
"""Compare two JSON-serializable objects"""
|
|
return json.dumps(obj1, sort_keys=True) == json.dumps(obj2, sort_keys=True)
|
|
|
|
@staticmethod
|
|
def wait_for_condition(
|
|
condition: Callable[[], bool],
|
|
timeout: float = 10.0,
|
|
interval: float = 0.1
|
|
) -> bool:
|
|
"""Wait for a condition to become true"""
|
|
import time
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
if condition():
|
|
return True
|
|
time.sleep(interval)
|
|
return False
|
|
|
|
@staticmethod
|
|
def measure_execution_time(func: Callable, *args, **kwargs) -> tuple[Any, float]:
|
|
"""Measure execution time of a function"""
|
|
import time
|
|
start = time.time()
|
|
result = func(*args, **kwargs)
|
|
elapsed = time.time() - start
|
|
return result, elapsed
|
|
|
|
@staticmethod
|
|
def generate_test_file_path(extension: str = ".tmp") -> str:
|
|
"""Generate a unique test file path"""
|
|
return f"/tmp/test_{secrets.token_hex(8)}{extension}"
|
|
|
|
@staticmethod
|
|
def cleanup_test_files(prefix: str = "test_") -> int:
|
|
"""Clean up test files in /tmp"""
|
|
import os
|
|
import glob
|
|
count = 0
|
|
for file_path in glob.glob(f"/tmp/{prefix}*"):
|
|
try:
|
|
os.remove(file_path)
|
|
count += 1
|
|
except (OSError, IOError):
|
|
pass
|
|
return count
|
|
|
|
|
|
class MockResponse:
|
|
"""Mock HTTP response for testing"""
|
|
|
|
def __init__(
|
|
self,
|
|
status_code: int = 200,
|
|
json_data: Optional[Dict[str, Any]] = None,
|
|
text: Optional[str] = None,
|
|
headers: Optional[Dict[str, str]] = None
|
|
):
|
|
"""Initialize mock response"""
|
|
self.status_code = status_code
|
|
self._json_data = json_data
|
|
self._text = text
|
|
self.headers = headers or {}
|
|
|
|
def json(self) -> Dict[str, Any]:
|
|
"""Return JSON data"""
|
|
if self._json_data is None:
|
|
raise ValueError("No JSON data available")
|
|
return self._json_data
|
|
|
|
def text(self) -> str:
|
|
"""Return text data"""
|
|
if self._text is None:
|
|
return ""
|
|
return self._text
|
|
|
|
def raise_for_status(self) -> None:
|
|
"""Raise exception if status code indicates error"""
|
|
if self.status_code >= 400:
|
|
raise Exception(f"HTTP Error: {self.status_code}")
|
|
|
|
|
|
class MockDatabase:
|
|
"""Mock database for testing"""
|
|
|
|
def __init__(self):
|
|
"""Initialize mock database"""
|
|
self.data: Dict[str, List[Dict[str, Any]]] = {}
|
|
self.tables: List[str] = []
|
|
|
|
def create_table(self, table_name: str) -> None:
|
|
"""Create a table"""
|
|
if table_name not in self.tables:
|
|
self.tables.append(table_name)
|
|
self.data[table_name] = []
|
|
|
|
def insert(self, table_name: str, record: Dict[str, Any]) -> None:
|
|
"""Insert a record"""
|
|
if table_name not in self.tables:
|
|
self.create_table(table_name)
|
|
record['id'] = record.get('id', MockFactory.generate_uuid())
|
|
self.data[table_name].append(record)
|
|
|
|
def select(self, table_name: str, **filters) -> List[Dict[str, Any]]:
|
|
"""Select records with optional filters"""
|
|
if table_name not in self.tables:
|
|
return []
|
|
|
|
records = self.data[table_name]
|
|
if not filters:
|
|
return records
|
|
|
|
filtered = []
|
|
for record in records:
|
|
match = True
|
|
for key, value in filters.items():
|
|
if record.get(key) != value:
|
|
match = False
|
|
break
|
|
if match:
|
|
filtered.append(record)
|
|
|
|
return filtered
|
|
|
|
def update(self, table_name: str, record_id: str, updates: Dict[str, Any]) -> bool:
|
|
"""Update a record"""
|
|
if table_name not in self.tables:
|
|
return False
|
|
|
|
for record in self.data[table_name]:
|
|
if record.get('id') == record_id:
|
|
record.update(updates)
|
|
return True
|
|
return False
|
|
|
|
def delete(self, table_name: str, record_id: str) -> bool:
|
|
"""Delete a record"""
|
|
if table_name not in self.tables:
|
|
return False
|
|
|
|
for i, record in enumerate(self.data[table_name]):
|
|
if record.get('id') == record_id:
|
|
del self.data[table_name][i]
|
|
return True
|
|
return False
|
|
|
|
def clear(self) -> None:
|
|
"""Clear all data"""
|
|
self.data.clear()
|
|
self.tables.clear()
|
|
|
|
|
|
class MockCache:
|
|
"""Mock cache for testing"""
|
|
|
|
def __init__(self, ttl: int = 3600):
|
|
"""Initialize mock cache"""
|
|
self.cache: Dict[str, tuple[Any, float]] = {}
|
|
self.ttl = ttl
|
|
|
|
def get(self, key: str) -> Optional[Any]:
|
|
"""Get value from cache"""
|
|
if key not in self.cache:
|
|
return None
|
|
|
|
value, timestamp = self.cache[key]
|
|
if time.time() - timestamp > self.ttl:
|
|
del self.cache[key]
|
|
return None
|
|
|
|
return value
|
|
|
|
def set(self, key: str, value: Any) -> None:
|
|
"""Set value in cache"""
|
|
self.cache[key] = (value, time.time())
|
|
|
|
def delete(self, key: str) -> bool:
|
|
"""Delete value from cache"""
|
|
if key in self.cache:
|
|
del self.cache[key]
|
|
return True
|
|
return False
|
|
|
|
def clear(self) -> None:
|
|
"""Clear cache"""
|
|
self.cache.clear()
|
|
|
|
def size(self) -> int:
|
|
"""Get cache size"""
|
|
return len(self.cache)
|
|
|
|
|
|
def mock_async_call(return_value: Any = None, delay: float = 0):
|
|
"""Decorator to mock async calls with optional delay"""
|
|
def decorator(func: Callable) -> Callable:
|
|
async def wrapper(*args, **kwargs):
|
|
if delay > 0:
|
|
await asyncio.sleep(delay)
|
|
return return_value
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def create_mock_config(**overrides) -> Dict[str, Any]:
|
|
"""Create mock configuration"""
|
|
config = {
|
|
"debug": False,
|
|
"log_level": "INFO",
|
|
"database_url": "sqlite:///test.db",
|
|
"redis_url": "redis://localhost:6379",
|
|
"api_host": "localhost",
|
|
"api_port": 8080,
|
|
"secret_key": MockFactory.generate_string(32),
|
|
"max_workers": 4,
|
|
"timeout": 30
|
|
}
|
|
config.update(overrides)
|
|
return config
|
|
|
|
|
|
import time
|
|
|
|
|
|
def create_test_scenario(name: str, steps: List[Callable]) -> Callable:
|
|
"""Create a test scenario with multiple steps"""
|
|
def scenario():
|
|
logger.info("Running test scenario", name=name)
|
|
results = []
|
|
for i, step in enumerate(steps):
|
|
try:
|
|
result = step()
|
|
results.append({"step": i + 1, "status": "passed", "result": result})
|
|
except Exception as e:
|
|
results.append({"step": i + 1, "status": "failed", "error": str(e)})
|
|
return results
|
|
return scenario
|