refactor: add service layer pattern for blockchain and database interactions
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled

- Create BlockchainService abstract base class with RPC implementation
- Create DatabaseService abstract base class with SQLite implementation
- Add connection pooling for SQLite database service
- Implement factory pattern for service creation
- Provide high-level abstractions over RPC calls and database operations
- Improve testability with interface definitions
This commit is contained in:
aitbc
2026-05-09 12:28:08 +02:00
parent 2713951a1b
commit ce57b1b1dc
2 changed files with 497 additions and 0 deletions

311
aitbc/blockchain_service.py Normal file
View File

@@ -0,0 +1,311 @@
"""
Blockchain service layer for AITBC
Provides high-level blockchain interaction services with abstraction over RPC calls
"""
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from datetime import datetime
from aitbc.network.http_client import AITBCHTTPClient
from aitbc.aitbc_logging import get_logger
logger = get_logger(__name__)
@dataclass
class Block:
"""Block data structure"""
height: int
hash: str
parent_hash: str
timestamp: int
transactions: List[Dict[str, Any]]
miner: Optional[str] = None
gas_used: Optional[int] = None
gas_limit: Optional[int] = None
@dataclass
class Transaction:
"""Transaction data structure"""
hash: str
from_address: str
to_address: str
value: str
nonce: int
gas: int
gas_price: Optional[str] = None
input_data: Optional[str] = None
block_hash: Optional[str] = None
block_number: Optional[int] = None
status: Optional[str] = None
@dataclass
class Account:
"""Account data structure"""
address: str
balance: int
nonce: int
class BlockchainService(ABC):
"""Abstract base class for blockchain service implementations"""
@abstractmethod
def get_block(self, block_identifier: int | str) -> Block:
"""Get block by height or hash"""
pass
@abstractmethod
def get_head_block(self) -> Block:
"""Get current head block"""
pass
@abstractmethod
def get_transaction(self, tx_hash: str) -> Transaction:
"""Get transaction by hash"""
pass
@abstractmethod
def get_account_balance(self, address: str) -> Account:
"""Get account information including balance"""
pass
@abstractmethod
def send_transaction(self, tx_data: Dict[str, Any]) -> str:
"""Send transaction and return transaction hash"""
pass
@abstractmethod
def get_status(self) -> Dict[str, Any]:
"""Get blockchain node status"""
pass
class RPCBlockchainService(BlockchainService):
"""RPC-based blockchain service implementation"""
def __init__(self, rpc_url: str, timeout: int = 30):
"""
Initialize RPC blockchain service
Args:
rpc_url: Blockchain RPC endpoint URL
timeout: Request timeout in seconds
"""
self.rpc_url = rpc_url
self.client = AITBCHTTPClient(base_url=rpc_url, timeout=timeout)
logger.info(f"Initialized RPC blockchain service for {rpc_url}")
def get_block(self, block_identifier: int | str) -> Block:
"""
Get block by height or hash
Args:
block_identifier: Block height (int) or hash (str)
Returns:
Block object with block data
Raises:
ValueError: If block not found
NetworkError: If RPC call fails
"""
try:
if isinstance(block_identifier, int):
endpoint = f"/rpc/blocks/{block_identifier}"
else:
endpoint = f"/rpc/block/{block_identifier}"
response = self.client.get(endpoint)
data = response.json()
return Block(
height=data.get("height", 0),
hash=data.get("hash", ""),
parent_hash=data.get("parent_hash", ""),
timestamp=data.get("timestamp", 0),
transactions=data.get("transactions", []),
miner=data.get("miner"),
gas_used=data.get("gas_used"),
gas_limit=data.get("gas_limit")
)
except Exception as e:
logger.error(f"Failed to get block {block_identifier}: {e}")
raise
def get_head_block(self) -> Block:
"""
Get current head block
Returns:
Block object with head block data
Raises:
NetworkError: If RPC call fails
"""
try:
response = self.client.get("/rpc/head")
data = response.json()
return Block(
height=data.get("height", 0),
hash=data.get("hash", ""),
parent_hash=data.get("parent_hash", ""),
timestamp=data.get("timestamp", 0),
transactions=data.get("transactions", []),
miner=data.get("miner"),
gas_used=data.get("gas_used"),
gas_limit=data.get("gas_limit")
)
except Exception as e:
logger.error(f"Failed to get head block: {e}")
raise
def get_transaction(self, tx_hash: str) -> Transaction:
"""
Get transaction by hash
Args:
tx_hash: Transaction hash
Returns:
Transaction object with transaction data
Raises:
ValueError: If transaction not found
NetworkError: If RPC call fails
"""
try:
response = self.client.get(f"/rpc/transaction/{tx_hash}")
data = response.json()
return Transaction(
hash=data.get("hash", ""),
from_address=data.get("from", ""),
to_address=data.get("to", ""),
value=data.get("value", "0"),
nonce=data.get("nonce", 0),
gas=data.get("gas", 0),
gas_price=data.get("gas_price"),
input_data=data.get("input"),
block_hash=data.get("block_hash"),
block_number=data.get("block_number"),
status=data.get("status")
)
except Exception as e:
logger.error(f"Failed to get transaction {tx_hash}: {e}")
raise
def get_account_balance(self, address: str) -> Account:
"""
Get account information including balance
Args:
address: Account address
Returns:
Account object with account data
Raises:
ValueError: If address is invalid
NetworkError: If RPC call fails
"""
try:
response = self.client.get(f"/rpc/account/{address}")
data = response.json()
return Account(
address=address,
balance=int(data.get("balance", 0)),
nonce=data.get("nonce", 0)
)
except Exception as e:
logger.error(f"Failed to get account balance for {address}: {e}")
raise
def send_transaction(self, tx_data: Dict[str, Any]) -> str:
"""
Send transaction and return transaction hash
Args:
tx_data: Transaction data dictionary
Returns:
Transaction hash
Raises:
ValueError: If transaction data is invalid
NetworkError: If RPC call fails
"""
try:
response = self.client.post("/rpc/sendTx", json=tx_data)
data = response.json()
tx_hash = data.get("hash") or data.get("tx_hash")
if not tx_hash:
raise ValueError("Transaction hash not found in response")
logger.info(f"Transaction sent successfully: {tx_hash}")
return tx_hash
except Exception as e:
logger.error(f"Failed to send transaction: {e}")
raise
def get_status(self) -> Dict[str, Any]:
"""
Get blockchain node status
Returns:
Dictionary with node status information
Raises:
NetworkError: If RPC call fails
"""
try:
response = self.client.get("/rpc/status")
return response.json()
except Exception as e:
logger.error(f"Failed to get node status: {e}")
raise
class BlockchainServiceFactory:
"""Factory for creating blockchain service instances"""
@staticmethod
def create_rpc_service(rpc_url: str, timeout: int = 30) -> RPCBlockchainService:
"""
Create RPC blockchain service
Args:
rpc_url: Blockchain RPC endpoint URL
timeout: Request timeout in seconds
Returns:
RPCBlockchainService instance
"""
return RPCBlockchainService(rpc_url, timeout)
@staticmethod
def create_service(service_type: str = "rpc", **kwargs) -> BlockchainService:
"""
Create blockchain service by type
Args:
service_type: Type of service ("rpc")
**kwargs: Service-specific configuration
Returns:
BlockchainService instance
Raises:
ValueError: If service type is unknown
"""
if service_type == "rpc":
return BlockchainServiceFactory.create_rpc_service(**kwargs)
else:
raise ValueError(f"Unknown service type: {service_type}")

186
aitbc/database_service.py Normal file
View File

@@ -0,0 +1,186 @@
"""
Database service layer for AITBC
Provides high-level database interaction services with connection pooling
"""
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from contextlib import contextmanager
import sqlite3
from pathlib import Path
from aitbc.aitbc_logging import get_logger
logger = get_logger(__name__)
class DatabaseService(ABC):
"""Abstract base class for database service implementations"""
@abstractmethod
def execute_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""Execute a SELECT query"""
pass
@abstractmethod
def execute_update(self, query: str, params: tuple = ()) -> int:
"""Execute an INSERT/UPDATE/DELETE query"""
pass
@abstractmethod
def execute_transaction(self, queries: List[tuple]) -> bool:
"""Execute multiple queries in a transaction"""
pass
class SQLiteDatabaseService(DatabaseService):
"""SQLite database service with connection pooling"""
def __init__(self, db_path: Path, pool_size: int = 5):
"""
Initialize SQLite database service
Args:
db_path: Path to SQLite database file
pool_size: Connection pool size
"""
self.db_path = db_path
self.pool_size = pool_size
self._connections: List[sqlite3.Connection] = []
self._current_connection_index = 0
# Ensure database exists
self._ensure_database()
logger.info(f"Initialized SQLite database service for {db_path}")
def _ensure_database(self) -> None:
"""Ensure database file and directory exist"""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
if not self.db_path.exists():
self.db_path.touch()
def _get_connection(self) -> sqlite3.Connection:
"""Get a connection from the pool"""
if self._connections:
conn = self._connections[self._current_connection_index]
self._current_connection_index = (self._current_connection_index + 1) % len(self._connections)
return conn
# Create new connection if pool is empty
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
self._connections.append(conn)
return conn
@contextmanager
def get_connection(self):
"""Context manager for database connections"""
conn = self._get_connection()
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
logger.error(f"Database error: {e}")
raise
def execute_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""
Execute a SELECT query
Args:
query: SQL query string
params: Query parameters
Returns:
List of dictionaries with query results
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def execute_update(self, query: str, params: tuple = ()) -> int:
"""
Execute an INSERT/UPDATE/DELETE query
Args:
query: SQL query string
params: Query parameters
Returns:
Number of rows affected
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.rowcount
def execute_transaction(self, queries: List[tuple]) -> bool:
"""
Execute multiple queries in a transaction
Args:
queries: List of (query, params) tuples
Returns:
True if transaction succeeded
Raises:
Exception: If transaction fails
"""
with self.get_connection() as conn:
cursor = conn.cursor()
try:
for query, params in queries:
cursor.execute(query, params)
return True
except Exception as e:
logger.error(f"Transaction failed: {e}")
raise
def close(self) -> None:
"""Close all database connections"""
for conn in self._connections:
conn.close()
self._connections.clear()
logger.info("Closed all database connections")
class DatabaseServiceFactory:
"""Factory for creating database service instances"""
@staticmethod
def create_sqlite_service(db_path: Path, pool_size: int = 5) -> SQLiteDatabaseService:
"""
Create SQLite database service
Args:
db_path: Path to SQLite database file
pool_size: Connection pool size
Returns:
SQLiteDatabaseService instance
"""
return SQLiteDatabaseService(db_path, pool_size)
@staticmethod
def create_service(db_type: str = "sqlite", **kwargs) -> DatabaseService:
"""
Create database service by type
Args:
db_type: Type of database ("sqlite")
**kwargs: Database-specific configuration
Returns:
DatabaseService instance
Raises:
ValueError: If database type is unknown
"""
if db_type == "sqlite":
return DatabaseServiceFactory.create_sqlite_service(**kwargs)
else:
raise ValueError(f"Unknown database type: {db_type}")