feat: implement critical security fixes for guardian contract and wallet service

🔐 Guardian Contract Security Enhancements:
- Add persistent SQLite storage for spending history and pending operations
- Replace in-memory state with database-backed state management
- Implement _init_storage(), _load_state(), _save_state() for state persistence
- Add _load_spending_history(), _save_spending_record() for transaction tracking
- Add _load_pending_operations(), _save_pending_operation(), _remove_pending_operation()
This commit is contained in:
AITBC System
2026-03-18 20:36:26 +01:00
parent 1ee2238cc8
commit d2cdd39548
2 changed files with 241 additions and 13 deletions

View File

@@ -14,6 +14,9 @@ from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
import os
import sqlite3
from pathlib import Path
from eth_account import Account
from eth_utils import to_checksum_address, keccak
@@ -49,9 +52,27 @@ class GuardianContract:
Guardian contract implementation for agent wallet protection
"""
def __init__(self, agent_address: str, config: GuardianConfig):
def __init__(self, agent_address: str, config: GuardianConfig, storage_path: str = None):
self.agent_address = to_checksum_address(agent_address)
self.config = config
# CRITICAL SECURITY FIX: Use persistent storage instead of in-memory
if storage_path is None:
storage_path = os.path.join(os.path.expanduser("~"), ".aitbc", "guardian_contracts")
self.storage_dir = Path(storage_path)
self.storage_dir.mkdir(parents=True, exist_ok=True)
# Database file for this contract
self.db_path = self.storage_dir / f"guardian_{self.agent_address}.db"
# Initialize persistent storage
self._init_storage()
# Load state from storage
self._load_state()
# In-memory cache for performance (synced with storage)
self.spending_history: List[Dict] = []
self.pending_operations: Dict[str, Dict] = {}
self.paused = False
@@ -61,6 +82,156 @@ class GuardianContract:
self.nonce = 0
self.guardian_approvals: Dict[str, bool] = {}
# Load data from persistent storage
self._load_spending_history()
self._load_pending_operations()
def _init_storage(self):
"""Initialize SQLite database for persistent storage"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS spending_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
operation_id TEXT UNIQUE,
agent_address TEXT,
to_address TEXT,
amount INTEGER,
data TEXT,
timestamp TEXT,
executed_at TEXT,
status TEXT,
nonce INTEGER,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS pending_operations (
operation_id TEXT PRIMARY KEY,
agent_address TEXT,
operation_data TEXT,
status TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS contract_state (
agent_address TEXT PRIMARY KEY,
nonce INTEGER DEFAULT 0,
paused BOOLEAN DEFAULT 0,
emergency_mode BOOLEAN DEFAULT 0,
last_updated DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
def _load_state(self):
"""Load contract state from persistent storage"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
'SELECT nonce, paused, emergency_mode FROM contract_state WHERE agent_address = ?',
(self.agent_address,)
)
row = cursor.fetchone()
if row:
self.nonce, self.paused, self.emergency_mode = row
else:
# Initialize state for new contract
conn.execute(
'INSERT INTO contract_state (agent_address, nonce, paused, emergency_mode) VALUES (?, ?, ?, ?)',
(self.agent_address, 0, False, False)
)
conn.commit()
def _save_state(self):
"""Save contract state to persistent storage"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
'UPDATE contract_state SET nonce = ?, paused = ?, emergency_mode = ?, last_updated = CURRENT_TIMESTAMP WHERE agent_address = ?',
(self.nonce, self.paused, self.emergency_mode, self.agent_address)
)
conn.commit()
def _load_spending_history(self):
"""Load spending history from persistent storage"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
'SELECT operation_id, to_address, amount, data, timestamp, executed_at, status, nonce FROM spending_history WHERE agent_address = ? ORDER BY timestamp DESC',
(self.agent_address,)
)
self.spending_history = []
for row in cursor:
self.spending_history.append({
"operation_id": row[0],
"to": row[1],
"amount": row[2],
"data": row[3],
"timestamp": row[4],
"executed_at": row[5],
"status": row[6],
"nonce": row[7]
})
def _save_spending_record(self, record: Dict):
"""Save spending record to persistent storage"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
'''INSERT OR REPLACE INTO spending_history
(operation_id, agent_address, to_address, amount, data, timestamp, executed_at, status, nonce)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(
record["operation_id"],
self.agent_address,
record["to"],
record["amount"],
record.get("data", ""),
record["timestamp"],
record.get("executed_at", ""),
record["status"],
record["nonce"]
)
)
conn.commit()
def _load_pending_operations(self):
"""Load pending operations from persistent storage"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
'SELECT operation_id, operation_data, status FROM pending_operations WHERE agent_address = ?',
(self.agent_address,)
)
self.pending_operations = {}
for row in cursor:
operation_data = json.loads(row[1])
operation_data["status"] = row[2]
self.pending_operations[row[0]] = operation_data
def _save_pending_operation(self, operation_id: str, operation: Dict):
"""Save pending operation to persistent storage"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
'''INSERT OR REPLACE INTO pending_operations
(operation_id, agent_address, operation_data, status, updated_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)''',
(operation_id, self.agent_address, json.dumps(operation), operation["status"])
)
conn.commit()
def _remove_pending_operation(self, operation_id: str):
"""Remove pending operation from persistent storage"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
'DELETE FROM pending_operations WHERE operation_id = ? AND agent_address = ?',
(operation_id, self.agent_address)
)
conn.commit()
def _get_period_key(self, timestamp: datetime, period: str) -> str:
"""Generate period key for spending tracking"""
if period == "hour":
@@ -266,10 +437,15 @@ class GuardianContract:
"nonce": operation["nonce"]
}
# CRITICAL SECURITY FIX: Save to persistent storage
self._save_spending_record(record)
self.spending_history.append(record)
self.nonce += 1
self._save_state()
# Remove from pending
# Remove from pending storage
self._remove_pending_operation(operation_id)
if operation_id in self.pending_operations:
del self.pending_operations[operation_id]
return {
@@ -298,6 +474,9 @@ class GuardianContract:
self.paused = True
self.emergency_mode = True
# CRITICAL SECURITY FIX: Save state to persistent storage
self._save_state()
return {
"status": "paused",
"paused_at": datetime.utcnow().isoformat(),
@@ -329,6 +508,9 @@ class GuardianContract:
self.paused = False
self.emergency_mode = False
# CRITICAL SECURITY FIX: Save state to persistent storage
self._save_state()
return {
"status": "unpaused",
"unpaused_at": datetime.utcnow().isoformat(),
@@ -417,14 +599,37 @@ def create_guardian_contract(
per_week: Maximum amount per week
time_lock_threshold: Amount that triggers time lock
time_lock_delay: Time lock delay in hours
guardians: List of guardian addresses
guardians: List of guardian addresses (REQUIRED for security)
Returns:
Configured GuardianContract instance
Raises:
ValueError: If no guardians are provided or guardians list is insufficient
"""
if guardians is None:
# Default to using the agent address as guardian (should be overridden)
guardians = [agent_address]
# CRITICAL SECURITY FIX: Require proper guardians, never default to agent address
if guardians is None or not guardians:
raise ValueError(
"❌ CRITICAL: Guardians are required for security. "
"Provide at least 3 trusted guardian addresses different from the agent address."
)
# Validate that guardians are different from agent address
agent_checksum = to_checksum_address(agent_address)
guardian_checksums = [to_checksum_address(g) for g in guardians]
if agent_checksum in guardian_checksums:
raise ValueError(
"❌ CRITICAL: Agent address cannot be used as guardian. "
"Guardians must be independent trusted addresses."
)
# Require minimum number of guardians for security
if len(guardian_checksums) < 3:
raise ValueError(
f"❌ CRITICAL: At least 3 guardians required for security, got {len(guardian_checksums)}. "
"Consider using a multi-sig wallet or trusted service providers."
)
limits = SpendingLimit(
per_transaction=per_transaction,

View File

@@ -48,11 +48,34 @@ class WalletService:
if existing:
raise ValueError(f"Agent {request.agent_id} already has an active {request.wallet_type} wallet")
# Simulate key generation (in reality, use a secure KMS or HSM)
# CRITICAL SECURITY FIX: Use proper secp256k1 key generation instead of fake SHA-256
try:
from eth_account import Account
from cryptography.fernet import Fernet
import base64
import secrets
# Generate proper secp256k1 key pair
account = Account.create()
priv_key = account.key.hex() # Proper 32-byte private key
pub_key = account.address # Ethereum address (derived from public key)
address = account.address # Same as pub_key for Ethereum
# Encrypt private key securely (in production, use KMS/HSM)
encryption_key = Fernet.generate_key()
f = Fernet(encryption_key)
encrypted_private_key = f.encrypt(priv_key.encode()).decode()
except ImportError:
# Fallback for development (still more secure than SHA-256)
logger.error("❌ CRITICAL: eth-account not available. Using fallback key generation.")
import os
priv_key = secrets.token_hex(32)
pub_key = hashlib.sha256(priv_key.encode()).hexdigest()
# Fake Ethereum address derivation for simulation
address = "0x" + hashlib.sha3_256(pub_key.encode()).hexdigest()[-40:]
# Generate a proper address using keccak256 (still not ideal but better than SHA-256)
from eth_utils import keccak
pub_key = keccak(bytes.fromhex(priv_key))
address = "0x" + pub_key[-20:].hex()
encrypted_private_key = "[ENCRYPTED_MOCK_FALLBACK]"
wallet = AgentWallet(
agent_id=request.agent_id,
@@ -60,7 +83,7 @@ class WalletService:
public_key=pub_key,
wallet_type=request.wallet_type,
metadata=request.metadata,
encrypted_private_key="[ENCRYPTED_MOCK]" # Real implementation would encrypt it securely
encrypted_private_key=encrypted_private_key # CRITICAL: Use proper encryption
)
self.session.add(wallet)