Files
aitbc/aitbc/database_service.py
aitbc ce57b1b1dc
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled
refactor: add service layer pattern for blockchain and database interactions
- Create BlockchainService abstract base class with RPC implementation
- Create DatabaseService abstract base class with SQLite implementation
- Add connection pooling for SQLite database service
- Implement factory pattern for service creation
- Provide high-level abstractions over RPC calls and database operations
- Improve testability with interface definitions
2026-05-09 12:28:08 +02:00

187 lines
5.6 KiB
Python

"""
Database service layer for AITBC
Provides high-level database interaction services with connection pooling
"""
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from contextlib import contextmanager
import sqlite3
from pathlib import Path
from aitbc.aitbc_logging import get_logger
logger = get_logger(__name__)
class DatabaseService(ABC):
"""Abstract base class for database service implementations"""
@abstractmethod
def execute_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""Execute a SELECT query"""
pass
@abstractmethod
def execute_update(self, query: str, params: tuple = ()) -> int:
"""Execute an INSERT/UPDATE/DELETE query"""
pass
@abstractmethod
def execute_transaction(self, queries: List[tuple]) -> bool:
"""Execute multiple queries in a transaction"""
pass
class SQLiteDatabaseService(DatabaseService):
"""SQLite database service with connection pooling"""
def __init__(self, db_path: Path, pool_size: int = 5):
"""
Initialize SQLite database service
Args:
db_path: Path to SQLite database file
pool_size: Connection pool size
"""
self.db_path = db_path
self.pool_size = pool_size
self._connections: List[sqlite3.Connection] = []
self._current_connection_index = 0
# Ensure database exists
self._ensure_database()
logger.info(f"Initialized SQLite database service for {db_path}")
def _ensure_database(self) -> None:
"""Ensure database file and directory exist"""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
if not self.db_path.exists():
self.db_path.touch()
def _get_connection(self) -> sqlite3.Connection:
"""Get a connection from the pool"""
if self._connections:
conn = self._connections[self._current_connection_index]
self._current_connection_index = (self._current_connection_index + 1) % len(self._connections)
return conn
# Create new connection if pool is empty
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
self._connections.append(conn)
return conn
@contextmanager
def get_connection(self):
"""Context manager for database connections"""
conn = self._get_connection()
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
logger.error(f"Database error: {e}")
raise
def execute_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""
Execute a SELECT query
Args:
query: SQL query string
params: Query parameters
Returns:
List of dictionaries with query results
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def execute_update(self, query: str, params: tuple = ()) -> int:
"""
Execute an INSERT/UPDATE/DELETE query
Args:
query: SQL query string
params: Query parameters
Returns:
Number of rows affected
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.rowcount
def execute_transaction(self, queries: List[tuple]) -> bool:
"""
Execute multiple queries in a transaction
Args:
queries: List of (query, params) tuples
Returns:
True if transaction succeeded
Raises:
Exception: If transaction fails
"""
with self.get_connection() as conn:
cursor = conn.cursor()
try:
for query, params in queries:
cursor.execute(query, params)
return True
except Exception as e:
logger.error(f"Transaction failed: {e}")
raise
def close(self) -> None:
"""Close all database connections"""
for conn in self._connections:
conn.close()
self._connections.clear()
logger.info("Closed all database connections")
class DatabaseServiceFactory:
"""Factory for creating database service instances"""
@staticmethod
def create_sqlite_service(db_path: Path, pool_size: int = 5) -> SQLiteDatabaseService:
"""
Create SQLite database service
Args:
db_path: Path to SQLite database file
pool_size: Connection pool size
Returns:
SQLiteDatabaseService instance
"""
return SQLiteDatabaseService(db_path, pool_size)
@staticmethod
def create_service(db_type: str = "sqlite", **kwargs) -> DatabaseService:
"""
Create database service by type
Args:
db_type: Type of database ("sqlite")
**kwargs: Database-specific configuration
Returns:
DatabaseService instance
Raises:
ValueError: If database type is unknown
"""
if db_type == "sqlite":
return DatabaseServiceFactory.create_sqlite_service(**kwargs)
else:
raise ValueError(f"Unknown database type: {db_type}")