feat: add SQLCipher database encryption support and consolidate agent documentation
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Failing after 3s
CLI Tests / test-cli (push) Failing after 3s
Cross-Chain Functionality Tests / test-cross-chain-sync (push) Successful in 2s
Cross-Chain Functionality Tests / test-cross-chain-transactions (push) Successful in 3s
Cross-Chain Functionality Tests / test-cross-chain-bridge (push) Has been skipped
Cross-Chain Functionality Tests / test-multi-chain-consensus (push) Successful in 2s
Cross-Chain Functionality Tests / aggregate-results (push) Has been skipped
Deploy to Testnet / deploy-testnet (push) Successful in 1m12s
Documentation Validation / validate-docs (push) Failing after 8s
Documentation Validation / validate-policies-strict (push) Successful in 3s
Integration Tests / test-service-integration (push) Successful in 2m6s
Multi-Chain Island Architecture Tests / test-multi-chain-island (push) Successful in 2s
Multi-Node Blockchain Health Monitoring / health-check (push) Failing after 4s
P2P Network Verification / p2p-verification (push) Successful in 4s
Package Tests / Python package - aitbc-agent-sdk (push) Successful in 32s
Package Tests / Python package - aitbc-core (push) Successful in 14s
Package Tests / Python package - aitbc-crypto (push) Successful in 12s
Package Tests / Python package - aitbc-sdk (push) Successful in 9s
Package Tests / JavaScript package - aitbc-sdk-js (push) Successful in 8s
Package Tests / JavaScript package - aitbc-token (push) Successful in 17s
Python Tests / test-python (push) Successful in 15s
Security Scanning / security-scan (push) Successful in 27s
Node Failover Simulation / failover-test (push) Successful in 7s
Multi-Node Stress Testing / stress-test (push) Successful in 6s
Cross-Node Transaction Testing / transaction-test (push) Successful in 4s

- Add SQLCipher encryption for ait-mainnet database with configurable flag
- Add db_encryption_enabled and db_encryption_key_path config settings
- Implement encryption key loading and PRAGMA key setup via connection events
- Add shutdown_db function for proper database cleanup
- Export middleware classes in aitbc/__init__.py
- Fix import path in sync.py for settings
- Remove duplicate agent documentation from docs
This commit is contained in:
aitbc
2026-05-03 12:00:38 +02:00
parent 8f6a2f208c
commit 19d415a235
361 changed files with 6432 additions and 4521 deletions

View File

@@ -0,0 +1,293 @@
#!/usr/bin/env python3
"""Database encryption migration tool for AITBC blockchain node.
This CLI tool provides commands to encrypt and decrypt SQLite database files
for the Phase 2 database encryption implementation.
"""
import argparse
import sys
import shutil
from pathlib import Path
# Add the src directory to the path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
# Add the repo root to the path for aitbc module
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
from aitbc_chain.database_encryption import (
KeyManager,
DatabaseEncryptor,
is_database_encrypted,
get_encryption_key,
)
def encrypt_database(
db_path: Path,
key_path: Path,
backup: bool = True,
dry_run: bool = False,
) -> None:
"""Encrypt a database file.
Args:
db_path: Path to the database file.
key_path: Path to the encryption key file.
backup: Whether to create a backup before encryption.
dry_run: If True, only print what would be done without executing.
"""
print(f"Encrypting database: {db_path}")
print(f"Using key file: {key_path}")
if not db_path.exists():
print(f"Error: Database file not found: {db_path}")
sys.exit(1)
if is_database_encrypted(db_path):
print("Error: Database is already encrypted")
sys.exit(1)
if backup:
backup_path = db_path.with_suffix('.db.backup')
if dry_run:
print(f"[DRY RUN] Would create backup: {backup_path}")
else:
print(f"Creating backup: {backup_path}")
shutil.copy2(db_path, backup_path)
key_manager = KeyManager(key_path)
key = key_manager.get_or_generate_key()
if dry_run:
print(f"[DRY RUN] Would encrypt {db_path}")
print(f"[DRY RUN] Key file exists: {key_path.exists()}")
else:
encryptor = DatabaseEncryptor(key)
encrypted_path = db_path.with_suffix('.db.encrypted')
encryptor.encrypt_file(db_path, encrypted_path)
# Replace original with encrypted
encrypted_path.replace(db_path)
print(f"Database encrypted successfully: {db_path}")
print(f"Backup created at: {backup_path if backup else 'None'}")
def decrypt_database(
db_path: Path,
key_path: Path,
output_path: Path = None,
backup: bool = True,
dry_run: bool = False,
) -> None:
"""Decrypt an encrypted database file.
Args:
db_path: Path to the encrypted database file.
key_path: Path to the encryption key file.
output_path: Optional output path for decrypted database.
backup: Whether to create a backup before decryption.
dry_run: If True, only print what would be done without executing.
"""
print(f"Decrypting database: {db_path}")
print(f"Using key file: {key_path}")
if not db_path.exists():
print(f"Error: Database file not found: {db_path}")
sys.exit(1)
if not is_database_encrypted(db_path):
print("Error: Database is not encrypted")
sys.exit(1)
if not key_path.exists():
print(f"Error: Key file not found: {key_path}")
sys.exit(1)
if backup:
backup_path = db_path.with_suffix('.db.encrypted.backup')
if dry_run:
print(f"[DRY RUN] Would create backup: {backup_path}")
else:
print(f"Creating backup: {backup_path}")
shutil.copy2(db_path, backup_path)
key_manager = KeyManager(key_path)
key = key_manager.load_key()
if key is None:
print(f"Error: Failed to load key from: {key_path}")
sys.exit(1)
if output_path is None:
output_path = db_path.with_suffix('').with_suffix('.db')
if dry_run:
print(f"[DRY RUN] Would decrypt {db_path} to {output_path}")
else:
encryptor = DatabaseEncryptor(key)
encryptor.decrypt_file(db_path, output_path)
# Replace original with decrypted if output_path is derived from db_path
if str(output_path) == str(db_path.with_suffix('').with_suffix('.db')):
output_path.replace(db_path)
print(f"Database decrypted successfully: {db_path}")
else:
print(f"Database decrypted to: {output_path}")
print(f"Backup created at: {backup_path if backup else 'None'}")
def generate_key(key_path: Path, dry_run: bool = False) -> None:
"""Generate a new encryption key.
Args:
key_path: Path where the key should be saved.
dry_run: If True, only print what would be done without executing.
"""
print(f"Generating encryption key: {key_path}")
if dry_run:
print(f"[DRY RUN] Would generate new key at: {key_path}")
else:
key_manager = KeyManager(key_path)
key = key_manager.get_or_generate_key()
print(f"Key generated successfully: {key_path}")
print(f"Key length: {len(key)} bytes")
def check_encryption(db_path: Path) -> None:
"""Check if a database is encrypted.
Args:
db_path: Path to the database file.
"""
print(f"Checking encryption status: {db_path}")
if not db_path.exists():
print(f"Error: Database file not found: {db_path}")
sys.exit(1)
if is_database_encrypted(db_path):
print("Status: ENCRYPTED")
else:
print("Status: NOT ENCRYPTED")
def main():
parser = argparse.ArgumentParser(
description="Database encryption migration tool for AITBC blockchain node"
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Encrypt command
encrypt_parser = subparsers.add_parser("encrypt", help="Encrypt a database file")
encrypt_parser.add_argument(
"--db-path",
type=Path,
required=True,
help="Path to the database file"
)
encrypt_parser.add_argument(
"--key-path",
type=Path,
default=Path("/etc/aitbc/secrets/db_encryption.key"),
help="Path to the encryption key file (default: /etc/aitbc/secrets/db_encryption.key)"
)
encrypt_parser.add_argument(
"--no-backup",
action="store_true",
help="Skip creating a backup before encryption"
)
encrypt_parser.add_argument(
"--dry-run",
action="store_true",
help="Print what would be done without executing"
)
# Decrypt command
decrypt_parser = subparsers.add_parser("decrypt", help="Decrypt an encrypted database file")
decrypt_parser.add_argument(
"--db-path",
type=Path,
required=True,
help="Path to the encrypted database file"
)
decrypt_parser.add_argument(
"--key-path",
type=Path,
default=Path("/etc/aitbc/secrets/db_encryption.key"),
help="Path to the encryption key file (default: /etc/aitbc/secrets/db_encryption.key)"
)
decrypt_parser.add_argument(
"--output-path",
type=Path,
help="Output path for decrypted database (default: replaces original)"
)
decrypt_parser.add_argument(
"--no-backup",
action="store_true",
help="Skip creating a backup before decryption"
)
decrypt_parser.add_argument(
"--dry-run",
action="store_true",
help="Print what would be done without executing"
)
# Generate key command
generate_parser = subparsers.add_parser("generate-key", help="Generate a new encryption key")
generate_parser.add_argument(
"--key-path",
type=Path,
default=Path("/etc/aitbc/secrets/db_encryption.key"),
help="Path where the key should be saved (default: /etc/aitbc/secrets/db_encryption.key)"
)
generate_parser.add_argument(
"--dry-run",
action="store_true",
help="Print what would be done without executing"
)
# Check command
check_parser = subparsers.add_parser("check", help="Check if a database is encrypted")
check_parser.add_argument(
"--db-path",
type=Path,
required=True,
help="Path to the database file"
)
args = parser.parse_args()
if args.command == "encrypt":
encrypt_database(
db_path=args.db_path,
key_path=args.key_path,
backup=not args.no_backup,
dry_run=args.dry_run,
)
elif args.command == "decrypt":
decrypt_database(
db_path=args.db_path,
key_path=args.key_path,
output_path=args.output_path,
backup=not args.no_backup,
dry_run=args.dry_run,
)
elif args.command == "generate-key":
generate_key(
key_path=args.key_path,
dry_run=args.dry_run,
)
elif args.command == "check":
check_encryption(
db_path=args.db_path,
)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python3
"""
Migrate existing SQLite database to SQLCipher encrypted format.
This script converts an existing unencrypted SQLite database to SQLCipher
encrypted format using the built-in sqlcipher_export function.
"""
import sys
import os
from pathlib import Path
# Add repo root to path for imports
repo_root = Path(__file__).parent.parent.parent.parent
sys.path.insert(0, str(repo_root))
try:
import sqlcipher3 as sqlite3
except ImportError:
print("ERROR: sqlcipher3-binary not installed")
print("Run: pip install sqlcipher3-binary")
sys.exit(1)
def migrate_to_sqlcipher(db_path: Path, key_path: Path) -> None:
"""Migrate database to SQLCipher encrypted format.
Uses SQLCipher's built-in sqlcipher_export function to properly
encrypt the database while maintaining SQLite's internal structure.
Args:
db_path: Path to the existing SQLite database
key_path: Path to the encryption key file
"""
if not db_path.exists():
print(f"ERROR: Database file not found: {db_path}")
sys.exit(1)
if not key_path.exists():
print(f"ERROR: Key file not found: {key_path}")
sys.exit(1)
# Read encryption key (stored as raw binary bytes)
with open(key_path, 'rb') as f:
key_bytes = f.read()
# Convert raw bytes to hex for SQLCipher
key_hex = key_bytes.hex()
# Create backup
backup_path = db_path.with_suffix('.db.backup')
print(f"Creating backup: {backup_path}")
import shutil
shutil.copy2(db_path, backup_path)
# Create temporary encrypted database
temp_encrypted_path = db_path.with_suffix('.db.encrypted')
# Open unencrypted database
print(f"Opening unencrypted database: {db_path}")
conn_unencrypted = sqlite3.connect(str(db_path))
# Attach encrypted database
print(f"Creating encrypted database: {temp_encrypted_path}")
conn_unencrypted.execute(f"ATTACH DATABASE '{temp_encrypted_path}' AS encrypted KEY '{key_hex}'")
# Export data to encrypted database
print("Exporting data to encrypted database...")
conn_unencrypted.execute("SELECT sqlcipher_export('encrypted')")
conn_unencrypted.commit()
# Detach encrypted database
conn_unencrypted.execute("DETACH DATABASE encrypted")
conn_unencrypted.close()
# Replace original with encrypted
print(f"Replacing original with encrypted database")
temp_encrypted_path.replace(db_path)
print(f"Database migrated successfully to SQLCipher format")
print(f"Backup available at: {backup_path}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Migrate SQLite database to SQLCipher encrypted format")
parser.add_argument("--db-path", type=Path, required=True, help="Path to the SQLite database")
parser.add_argument("--key-path", type=Path, default=Path("/etc/aitbc/secrets/db_encryption.key"), help="Path to the encryption key file")
args = parser.parse_args()
migrate_to_sqlcipher(args.db_path, args.key_path)

View File

@@ -27,6 +27,8 @@ class ChainSettings(BaseSettings):
supported_chains: str = "ait-mainnet" # Comma-separated list of supported chain IDs
db_path: Path = DATA_DIR / "data" / "chain.db"
enforce_state_root_validation: bool = False # Phase 1.3 enforcement flag
db_encryption_enabled: bool = False # Phase 2: SQLCipher database encryption flag (ait-mainnet only)
db_encryption_key_path: Path = Path("/etc/aitbc/secrets/db_encryption.key") # Phase 2: Encryption key file path
def get_db_path(self, chain_id: str = "") -> Path:
"""Get database path for a specific chain.

View File

@@ -21,9 +21,13 @@ _DB_ENCRYPTION_KEY = os.environ.get("AITBC_DB_KEY", "default_encryption_key_chan
_engines: dict[str, object] = {}
_default_chain_id: str = ""
def get_engine(chain_id: str = "") -> object:
"""Get database engine for a specific chain.
Uses SQLCipher for encryption when enabled (ait-mainnet only).
SQLCipher maintains SQLite's internal format while encrypting data at rest.
Args:
chain_id: Chain ID to get engine for. If empty, uses default chain.
@@ -34,7 +38,45 @@ def get_engine(chain_id: str = "") -> object:
if resolved_chain_id not in _engines:
db_path = settings.get_db_path(resolved_chain_id)
_engines[resolved_chain_id] = create_engine(f"sqlite:///{db_path}", echo=False)
# Check if SQLCipher encryption is enabled for this chain (only ait-mainnet)
encryption_enabled = (
settings.db_encryption_enabled and
settings.db_encryption_key_path.exists() and
resolved_chain_id == "ait-mainnet"
)
if encryption_enabled:
# Use SQLCipher with encryption key
try:
import sqlcipher3 as sqlite3
except ImportError:
raise RuntimeError(
"SQLCipher encryption enabled but sqlcipher3-binary not installed. "
"Run: pip install sqlcipher3-binary"
)
# Load encryption key from file (raw binary bytes, convert to hex)
with open(settings.db_encryption_key_path, 'rb') as f:
key_bytes = f.read()
key_hex = key_bytes.hex()
# Create engine with SQLCipher
engine = create_engine(
f"sqlite:///{db_path}",
module=sqlite3,
echo=False
)
# Set encryption key via connection event
@event.listens_for(engine, "connect")
def set_encryption_key(dbapi_connection, connection_record):
dbapi_connection.execute(f"PRAGMA key = '{key_hex}'")
else:
# Use standard SQLite
engine = create_engine(f"sqlite:///{db_path}", echo=False)
_engines[resolved_chain_id] = engine
return _engines[resolved_chain_id]
@@ -149,6 +191,48 @@ def init_db(chain_id: str = "") -> None:
except OSError:
pass
def shutdown_db(chain_id: str = "") -> None:
"""Shutdown database connection and encrypt if needed.
Args:
chain_id: Chain ID to shutdown. If empty, uses default chain.
"""
resolved_chain_id = chain_id or _default_chain_id or settings.chain_id or "ait-mainnet"
# Check if we need to encrypt the database back
if resolved_chain_id in _db_temp_paths:
temp_path = _db_temp_paths[resolved_chain_id]
db_path = settings.get_db_path(resolved_chain_id)
# Check if encryption is enabled for this chain
encryption_enabled = (
settings.db_encryption_enabled and
resolved_chain_id == "ait-mainnet"
)
if encryption_enabled and temp_path.exists():
# Encrypt the temporary file back to the original location
key = get_encryption_key(settings.db_encryption_key_path)
if key is None:
raise RuntimeError(f"Database encryption enabled but key not found at {settings.db_encryption_key_path}")
try:
encrypt_database(temp_path, key)
# Move encrypted file to original location
encrypted_path = temp_path.with_suffix('.db.encrypted')
encrypted_path.replace(db_path)
# Clean up temporary file
temp_path.unlink(missing_ok=True)
del _db_temp_paths[resolved_chain_id]
except Exception as e:
raise RuntimeError(f"Failed to encrypt database for chain {resolved_chain_id}: {e}")
# Dispose of engine
if resolved_chain_id in _engines:
_engines[resolved_chain_id].dispose()
del _engines[resolved_chain_id]
# Backward compatibility - expose engine for escrow routes (to be removed in Phase 1.3)
# TODO: Remove this in Phase 1.3 when escrow routes are updated
engine = _engine_internal

View File

@@ -0,0 +1,282 @@
"""Database encryption module for AITBC blockchain node.
This module provides AES-GCM encryption for SQLite database files at rest,
using the existing cryptography library. It supports key management,
encryption/decryption operations, and detection of encrypted databases.
"""
from __future__ import annotations
import os
import stat
from pathlib import Path
from typing import Optional
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
import secrets
# Magic header to identify encrypted databases
ENCRYPTION_MAGIC = b"AITBCENC"
ENCRYPTION_VERSION = 1
class KeyManager:
"""Manages encryption key generation, storage, and retrieval."""
def __init__(self, key_path: Path):
"""Initialize key manager.
Args:
key_path: Path to the key file.
"""
self.key_path = key_path
self._key: Optional[bytes] = None
def generate_key(self, password: Optional[str] = None) -> bytes:
"""Generate a new encryption key.
Args:
password: Optional password for key derivation. If None, generates random key.
Returns:
256-bit encryption key.
"""
if password:
# Derive key from password using PBKDF2
salt = secrets.token_bytes(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100_000,
backend=default_backend()
)
key = kdf.derive(password.encode('utf-8'))
# Store salt with key for later derivation
return salt + key
else:
# Generate random key
return secrets.token_bytes(32)
def save_key(self, key: bytes) -> None:
"""Save encryption key to file with restricted permissions.
Args:
key: Encryption key to save.
"""
# Ensure parent directory exists
self.key_path.parent.mkdir(parents=True, exist_ok=True)
# Write key with restricted permissions
with open(self.key_path, 'wb') as f:
f.write(key)
# Set file permissions to 600 (owner read/write only)
os.chmod(self.key_path, stat.S_IRUSR | stat.S_IWUSR)
def load_key(self) -> Optional[bytes]:
"""Load encryption key from file.
Returns:
Encryption key or None if file doesn't exist.
"""
if not self.key_path.exists():
return None
with open(self.key_path, 'rb') as f:
return f.read()
def get_or_generate_key(self, password: Optional[str] = None) -> bytes:
"""Get existing key or generate a new one.
Args:
password: Optional password for key derivation.
Returns:
Encryption key.
"""
key = self.load_key()
if key is None:
key = self.generate_key(password)
self.save_key(key)
return key
def ensure_key_permissions(self) -> bool:
"""Ensure key file has restricted permissions.
Returns:
True if permissions are correct or file doesn't exist, False otherwise.
"""
if not self.key_path.exists():
return True
mode = self.key_path.stat().st_mode
return mode & 0o777 == 0o600
class DatabaseEncryptor:
"""Handles encryption and decryption of database files."""
def __init__(self, key: bytes):
"""Initialize encryptor with encryption key.
Args:
key: 256-bit encryption key.
"""
if len(key) < 32:
# If key has salt prefix (first 16 bytes), extract actual key
if len(key) >= 48:
salt = key[:16]
actual_key = key[16:48]
else:
raise ValueError("Encryption key must be at least 32 bytes")
else:
salt = key[:16] if len(key) > 32 else b''
actual_key = key[:32] if len(key) >= 32 else key
self.key = actual_key
self.salt = salt if len(key) > 32 else None
self.aesgcm = AESGCM(actual_key)
def encrypt_file(self, input_path: Path, output_path: Path) -> None:
"""Encrypt a database file.
Args:
input_path: Path to input database file.
output_path: Path to write encrypted database.
"""
# Read plaintext database
with open(input_path, 'rb') as f:
plaintext = f.read()
# Generate nonce
nonce = secrets.token_bytes(12)
# Encrypt data
ciphertext = self.aesgcm.encrypt(nonce, plaintext, None)
# Write encrypted file with magic header
with open(output_path, 'wb') as f:
f.write(ENCRYPTION_MAGIC)
f.write(bytes([ENCRYPTION_VERSION]))
f.write(nonce)
f.write(ciphertext)
def decrypt_file(self, input_path: Path, output_path: Path) -> None:
"""Decrypt an encrypted database file.
Args:
input_path: Path to encrypted database file.
output_path: Path to write decrypted database.
"""
# Read encrypted file
with open(input_path, 'rb') as f:
data = f.read()
# Verify magic header
if not data.startswith(ENCRYPTION_MAGIC):
raise ValueError("File is not an encrypted database")
# Extract version, nonce, and ciphertext
version = data[len(ENCRYPTION_MAGIC)]
if version != ENCRYPTION_VERSION:
raise ValueError(f"Unsupported encryption version: {version}")
nonce_start = len(ENCRYPTION_MAGIC) + 1
nonce_end = nonce_start + 12
nonce = data[nonce_start:nonce_end]
ciphertext = data[nonce_end:]
# Decrypt data
plaintext = self.aesgcm.decrypt(nonce, ciphertext, None)
# Write decrypted file
with open(output_path, 'wb') as f:
f.write(plaintext)
def is_encrypted(self, file_path: Path) -> bool:
"""Check if a database file is encrypted.
Args:
file_path: Path to database file.
Returns:
True if file is encrypted, False otherwise.
"""
if not file_path.exists():
return False
with open(file_path, 'rb') as f:
header = f.read(len(ENCRYPTION_MAGIC))
return header == ENCRYPTION_MAGIC
def get_encryption_key(key_path: Path) -> Optional[bytes]:
"""Get encryption key from file or generate new one.
Args:
key_path: Path to key file.
Returns:
Encryption key or None if encryption is disabled.
"""
key_manager = KeyManager(key_path)
return key_manager.get_or_generate_key()
def encrypt_database(db_path: Path, key: bytes) -> Path:
"""Encrypt a database file.
Args:
db_path: Path to database file.
key: Encryption key.
Returns:
Path to encrypted database file.
"""
encryptor = DatabaseEncryptor(key)
encrypted_path = db_path.with_suffix('.db.encrypted')
encryptor.encrypt_file(db_path, encrypted_path)
return encrypted_path
def decrypt_database(encrypted_path: Path, key: bytes, output_path: Optional[Path] = None) -> Path:
"""Decrypt an encrypted database file.
Args:
encrypted_path: Path to encrypted database file.
key: Encryption key.
output_path: Optional output path. If None, removes .encrypted suffix.
Returns:
Path to decrypted database file.
"""
encryptor = DatabaseEncryptor(key)
if output_path is None:
output_path = encrypted_path.with_suffix('').with_suffix('.db')
encryptor.decrypt_file(encrypted_path, output_path)
return output_path
def is_database_encrypted(db_path: Path) -> bool:
"""Check if a database file is encrypted.
Args:
db_path: Path to database file.
Returns:
True if database is encrypted, False otherwise.
"""
if not db_path.exists():
return False
# Check for magic header
with open(db_path, 'rb') as f:
header = f.read(len(ENCRYPTION_MAGIC))
return header == ENCRYPTION_MAGIC

View File

@@ -489,7 +489,7 @@ class ChainSync:
# Verify state root if provided
if block_data.get("state_root"):
from ..config import settings
from aitbc_chain.config import settings
state_manager = StateManager()
accounts = session.exec(
select(Account).where(Account.chain_id == self._chain_id)

View File

@@ -0,0 +1,272 @@
"""Unit tests for database encryption module."""
import os
import stat
import tempfile
from pathlib import Path
import pytest
from aitbc_chain.database_encryption import (
KeyManager,
DatabaseEncryptor,
is_database_encrypted,
encrypt_database,
decrypt_database,
get_encryption_key,
ENCRYPTION_MAGIC,
ENCRYPTION_VERSION,
)
class TestKeyManager:
"""Tests for KeyManager class."""
def test_generate_key_without_password(self, tmp_path: Path):
"""Test key generation without password."""
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
assert len(key) == 32
assert isinstance(key, bytes)
def test_generate_key_with_password(self, tmp_path: Path):
"""Test key generation with password."""
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key(password="test_password")
# Key with salt should be longer (16 bytes salt + 32 bytes key)
assert len(key) == 48
assert isinstance(key, bytes)
def test_save_and_load_key(self, tmp_path: Path):
"""Test saving and loading key."""
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
key_manager.save_key(key)
loaded_key = key_manager.load_key()
assert loaded_key == key
def test_get_or_generate_key_new(self, tmp_path: Path):
"""Test get_or_generate_key when key doesn't exist."""
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.get_or_generate_key()
assert len(key) == 32
assert key_manager.load_key() == key
def test_get_or_generate_key_existing(self, tmp_path: Path):
"""Test get_or_generate_key when key already exists."""
key_manager = KeyManager(tmp_path / "test.key")
original_key = key_manager.generate_key()
key_manager.save_key(original_key)
retrieved_key = key_manager.get_or_generate_key()
assert retrieved_key == original_key
def test_ensure_key_permissions_correct(self, tmp_path: Path):
"""Test ensure_key_permissions with correct permissions."""
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
key_manager.save_key(key)
assert key_manager.ensure_key_permissions() is True
def test_ensure_key_permissions_incorrect(self, tmp_path: Path):
"""Test ensure_key_permissions with incorrect permissions."""
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
key_manager.save_key(key)
# Set incorrect permissions
os.chmod(tmp_path / "test.key", 0o644)
assert key_manager.ensure_key_permissions() is False
def test_ensure_key_permissions_nonexistent(self, tmp_path: Path):
"""Test ensure_key_permissions when file doesn't exist."""
key_manager = KeyManager(tmp_path / "nonexistent.key")
assert key_manager.ensure_key_permissions() is True
class TestDatabaseEncryptor:
"""Tests for DatabaseEncryptor class."""
def test_encrypt_decrypt_file(self, tmp_path: Path):
"""Test encrypting and decrypting a file."""
# Create test file
test_file = tmp_path / "test.db"
test_file.write_bytes(b"test database content")
# Generate key
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
# Encrypt
encryptor = DatabaseEncryptor(key)
encrypted_file = tmp_path / "test.db.encrypted"
encryptor.encrypt_file(test_file, encrypted_file)
assert encrypted_file.exists()
assert is_database_encrypted(encrypted_file)
# Decrypt
decrypted_file = tmp_path / "test_decrypted.db"
encryptor.decrypt_file(encrypted_file, decrypted_file)
assert decrypted_file.read_bytes() == b"test database content"
def test_is_encrypted_true(self, tmp_path: Path):
"""Test is_encrypted with encrypted file."""
test_file = tmp_path / "test.db"
test_file.write_bytes(b"test content")
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
encryptor = DatabaseEncryptor(key)
encrypted_file = tmp_path / "test.db.encrypted"
encryptor.encrypt_file(test_file, encrypted_file)
assert encryptor.is_encrypted(encrypted_file) is True
def test_is_encrypted_false(self, tmp_path: Path):
"""Test is_encrypted with unencrypted file."""
test_file = tmp_path / "test.db"
test_file.write_bytes(b"test content")
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
encryptor = DatabaseEncryptor(key)
assert encryptor.is_encrypted(test_file) is False
def test_is_encrypted_nonexistent(self, tmp_path: Path):
"""Test is_encrypted with nonexistent file."""
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
encryptor = DatabaseEncryptor(key)
assert encryptor.is_encrypted(tmp_path / "nonexistent.db") is False
def test_decrypt_with_magic_header_verification(self, tmp_path: Path):
"""Test that decrypt verifies magic header."""
test_file = tmp_path / "test.db"
test_file.write_bytes(b"not encrypted content")
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
encryptor = DatabaseEncryptor(key)
decrypted_file = tmp_path / "test_decrypted.db"
with pytest.raises(ValueError, match="not an encrypted database"):
encryptor.decrypt_file(test_file, decrypted_file)
def test_key_too_short(self, tmp_path: Path):
"""Test that short keys are rejected."""
with pytest.raises(ValueError, match="at least 32 bytes"):
DatabaseEncryptor(b"short_key")
class TestModuleFunctions:
"""Tests for module-level functions."""
def test_is_database_encrypted_true(self, tmp_path: Path):
"""Test is_database_encrypted with encrypted database."""
test_file = tmp_path / "test.db"
test_file.write_bytes(b"test content")
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
encryptor = DatabaseEncryptor(key)
encrypted_file = tmp_path / "test.db.encrypted"
encryptor.encrypt_file(test_file, encrypted_file)
assert is_database_encrypted(encrypted_file) is True
def test_is_database_encrypted_false(self, tmp_path: Path):
"""Test is_database_encrypted with unencrypted database."""
test_file = tmp_path / "test.db"
test_file.write_bytes(b"test content")
assert is_database_encrypted(test_file) is False
def test_is_database_encrypted_nonexistent(self, tmp_path: Path):
"""Test is_database_encrypted with nonexistent file."""
assert is_database_encrypted(tmp_path / "nonexistent.db") is False
def test_encrypt_decrypt_database(self, tmp_path: Path):
"""Test encrypt_database and decrypt_database functions."""
# Create test database
test_db = tmp_path / "test.db"
test_db.write_bytes(b"SQLite database content")
# Generate key
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
# Encrypt
encrypted_db = encrypt_database(test_db, key)
assert encrypted_db.exists()
assert is_database_encrypted(encrypted_db)
# Decrypt
decrypted_db = decrypt_database(encrypted_db, key)
assert decrypted_db.read_bytes() == b"SQLite database content"
def test_get_encryption_key(self, tmp_path: Path):
"""Test get_encryption_key function."""
key_path = tmp_path / "test.key"
# First call should generate key
key = get_encryption_key(key_path)
assert len(key) == 32
assert key_path.exists()
# Second call should load existing key
key2 = get_encryption_key(key_path)
assert key == key2
class TestEncryptionIntegration:
"""Integration tests for encryption with actual database-like content."""
def test_encrypt_decrypt_sqlite_like_content(self, tmp_path: Path):
"""Test encryption/decryption with SQLite-like content."""
# Create a file with SQLite-like content
test_db = tmp_path / "test.db"
sqlite_header = b"SQLite format 3\x00"
test_db.write_bytes(sqlite_header + b"\x00" * 100)
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
# Encrypt
encrypted_db = encrypt_database(test_db, key)
assert is_database_encrypted(encrypted_db)
# Decrypt
decrypted_db = decrypt_database(encrypted_db, key)
assert decrypted_db.read_bytes() == test_db.read_bytes()
def test_multiple_encrypt_decrypt_cycles(self, tmp_path: Path):
"""Test multiple encryption/decryption cycles."""
test_db = tmp_path / "test.db"
test_db.write_bytes(b"test content" * 1000)
key_manager = KeyManager(tmp_path / "test.key")
key = key_manager.generate_key()
# Multiple cycles
for i in range(3):
encrypted = encrypt_database(test_db, key)
decrypted = decrypt_database(encrypted, key)
test_db = decrypted
assert test_db.read_bytes() == b"test content" * 1000
if __name__ == "__main__":
pytest.main([__file__, "-v"])