diff --git a/aitbc/blockchain_service.py b/aitbc/blockchain_service.py new file mode 100644 index 00000000..754fcd0c --- /dev/null +++ b/aitbc/blockchain_service.py @@ -0,0 +1,311 @@ +""" +Blockchain service layer for AITBC +Provides high-level blockchain interaction services with abstraction over RPC calls +""" + +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any, List +from dataclasses import dataclass +from datetime import datetime + +from aitbc.network.http_client import AITBCHTTPClient +from aitbc.aitbc_logging import get_logger + +logger = get_logger(__name__) + + +@dataclass +class Block: + """Block data structure""" + height: int + hash: str + parent_hash: str + timestamp: int + transactions: List[Dict[str, Any]] + miner: Optional[str] = None + gas_used: Optional[int] = None + gas_limit: Optional[int] = None + + +@dataclass +class Transaction: + """Transaction data structure""" + hash: str + from_address: str + to_address: str + value: str + nonce: int + gas: int + gas_price: Optional[str] = None + input_data: Optional[str] = None + block_hash: Optional[str] = None + block_number: Optional[int] = None + status: Optional[str] = None + + +@dataclass +class Account: + """Account data structure""" + address: str + balance: int + nonce: int + + +class BlockchainService(ABC): + """Abstract base class for blockchain service implementations""" + + @abstractmethod + def get_block(self, block_identifier: int | str) -> Block: + """Get block by height or hash""" + pass + + @abstractmethod + def get_head_block(self) -> Block: + """Get current head block""" + pass + + @abstractmethod + def get_transaction(self, tx_hash: str) -> Transaction: + """Get transaction by hash""" + pass + + @abstractmethod + def get_account_balance(self, address: str) -> Account: + """Get account information including balance""" + pass + + @abstractmethod + def send_transaction(self, tx_data: Dict[str, Any]) -> str: + """Send transaction and return transaction hash""" + pass + + @abstractmethod + def get_status(self) -> Dict[str, Any]: + """Get blockchain node status""" + pass + + +class RPCBlockchainService(BlockchainService): + """RPC-based blockchain service implementation""" + + def __init__(self, rpc_url: str, timeout: int = 30): + """ + Initialize RPC blockchain service + + Args: + rpc_url: Blockchain RPC endpoint URL + timeout: Request timeout in seconds + """ + self.rpc_url = rpc_url + self.client = AITBCHTTPClient(base_url=rpc_url, timeout=timeout) + logger.info(f"Initialized RPC blockchain service for {rpc_url}") + + def get_block(self, block_identifier: int | str) -> Block: + """ + Get block by height or hash + + Args: + block_identifier: Block height (int) or hash (str) + + Returns: + Block object with block data + + Raises: + ValueError: If block not found + NetworkError: If RPC call fails + """ + try: + if isinstance(block_identifier, int): + endpoint = f"/rpc/blocks/{block_identifier}" + else: + endpoint = f"/rpc/block/{block_identifier}" + + response = self.client.get(endpoint) + data = response.json() + + return Block( + height=data.get("height", 0), + hash=data.get("hash", ""), + parent_hash=data.get("parent_hash", ""), + timestamp=data.get("timestamp", 0), + transactions=data.get("transactions", []), + miner=data.get("miner"), + gas_used=data.get("gas_used"), + gas_limit=data.get("gas_limit") + ) + except Exception as e: + logger.error(f"Failed to get block {block_identifier}: {e}") + raise + + def get_head_block(self) -> Block: + """ + Get current head block + + Returns: + Block object with head block data + + Raises: + NetworkError: If RPC call fails + """ + try: + response = self.client.get("/rpc/head") + data = response.json() + + return Block( + height=data.get("height", 0), + hash=data.get("hash", ""), + parent_hash=data.get("parent_hash", ""), + timestamp=data.get("timestamp", 0), + transactions=data.get("transactions", []), + miner=data.get("miner"), + gas_used=data.get("gas_used"), + gas_limit=data.get("gas_limit") + ) + except Exception as e: + logger.error(f"Failed to get head block: {e}") + raise + + def get_transaction(self, tx_hash: str) -> Transaction: + """ + Get transaction by hash + + Args: + tx_hash: Transaction hash + + Returns: + Transaction object with transaction data + + Raises: + ValueError: If transaction not found + NetworkError: If RPC call fails + """ + try: + response = self.client.get(f"/rpc/transaction/{tx_hash}") + data = response.json() + + return Transaction( + hash=data.get("hash", ""), + from_address=data.get("from", ""), + to_address=data.get("to", ""), + value=data.get("value", "0"), + nonce=data.get("nonce", 0), + gas=data.get("gas", 0), + gas_price=data.get("gas_price"), + input_data=data.get("input"), + block_hash=data.get("block_hash"), + block_number=data.get("block_number"), + status=data.get("status") + ) + except Exception as e: + logger.error(f"Failed to get transaction {tx_hash}: {e}") + raise + + def get_account_balance(self, address: str) -> Account: + """ + Get account information including balance + + Args: + address: Account address + + Returns: + Account object with account data + + Raises: + ValueError: If address is invalid + NetworkError: If RPC call fails + """ + try: + response = self.client.get(f"/rpc/account/{address}") + data = response.json() + + return Account( + address=address, + balance=int(data.get("balance", 0)), + nonce=data.get("nonce", 0) + ) + except Exception as e: + logger.error(f"Failed to get account balance for {address}: {e}") + raise + + def send_transaction(self, tx_data: Dict[str, Any]) -> str: + """ + Send transaction and return transaction hash + + Args: + tx_data: Transaction data dictionary + + Returns: + Transaction hash + + Raises: + ValueError: If transaction data is invalid + NetworkError: If RPC call fails + """ + try: + response = self.client.post("/rpc/sendTx", json=tx_data) + data = response.json() + + tx_hash = data.get("hash") or data.get("tx_hash") + if not tx_hash: + raise ValueError("Transaction hash not found in response") + + logger.info(f"Transaction sent successfully: {tx_hash}") + return tx_hash + except Exception as e: + logger.error(f"Failed to send transaction: {e}") + raise + + def get_status(self) -> Dict[str, Any]: + """ + Get blockchain node status + + Returns: + Dictionary with node status information + + Raises: + NetworkError: If RPC call fails + """ + try: + response = self.client.get("/rpc/status") + return response.json() + except Exception as e: + logger.error(f"Failed to get node status: {e}") + raise + + +class BlockchainServiceFactory: + """Factory for creating blockchain service instances""" + + @staticmethod + def create_rpc_service(rpc_url: str, timeout: int = 30) -> RPCBlockchainService: + """ + Create RPC blockchain service + + Args: + rpc_url: Blockchain RPC endpoint URL + timeout: Request timeout in seconds + + Returns: + RPCBlockchainService instance + """ + return RPCBlockchainService(rpc_url, timeout) + + @staticmethod + def create_service(service_type: str = "rpc", **kwargs) -> BlockchainService: + """ + Create blockchain service by type + + Args: + service_type: Type of service ("rpc") + **kwargs: Service-specific configuration + + Returns: + BlockchainService instance + + Raises: + ValueError: If service type is unknown + """ + if service_type == "rpc": + return BlockchainServiceFactory.create_rpc_service(**kwargs) + else: + raise ValueError(f"Unknown service type: {service_type}") diff --git a/aitbc/database_service.py b/aitbc/database_service.py new file mode 100644 index 00000000..8e3bc0f7 --- /dev/null +++ b/aitbc/database_service.py @@ -0,0 +1,186 @@ +""" +Database service layer for AITBC +Provides high-level database interaction services with connection pooling +""" + +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any, List +from contextlib import contextmanager +import sqlite3 +from pathlib import Path + +from aitbc.aitbc_logging import get_logger + +logger = get_logger(__name__) + + +class DatabaseService(ABC): + """Abstract base class for database service implementations""" + + @abstractmethod + def execute_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]: + """Execute a SELECT query""" + pass + + @abstractmethod + def execute_update(self, query: str, params: tuple = ()) -> int: + """Execute an INSERT/UPDATE/DELETE query""" + pass + + @abstractmethod + def execute_transaction(self, queries: List[tuple]) -> bool: + """Execute multiple queries in a transaction""" + pass + + +class SQLiteDatabaseService(DatabaseService): + """SQLite database service with connection pooling""" + + def __init__(self, db_path: Path, pool_size: int = 5): + """ + Initialize SQLite database service + + Args: + db_path: Path to SQLite database file + pool_size: Connection pool size + """ + self.db_path = db_path + self.pool_size = pool_size + self._connections: List[sqlite3.Connection] = [] + self._current_connection_index = 0 + + # Ensure database exists + self._ensure_database() + + logger.info(f"Initialized SQLite database service for {db_path}") + + def _ensure_database(self) -> None: + """Ensure database file and directory exist""" + self.db_path.parent.mkdir(parents=True, exist_ok=True) + if not self.db_path.exists(): + self.db_path.touch() + + def _get_connection(self) -> sqlite3.Connection: + """Get a connection from the pool""" + if self._connections: + conn = self._connections[self._current_connection_index] + self._current_connection_index = (self._current_connection_index + 1) % len(self._connections) + return conn + + # Create new connection if pool is empty + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + self._connections.append(conn) + return conn + + @contextmanager + def get_connection(self): + """Context manager for database connections""" + conn = self._get_connection() + try: + yield conn + conn.commit() + except Exception as e: + conn.rollback() + logger.error(f"Database error: {e}") + raise + + def execute_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]: + """ + Execute a SELECT query + + Args: + query: SQL query string + params: Query parameters + + Returns: + List of dictionaries with query results + """ + with self.get_connection() as conn: + cursor = conn.cursor() + cursor.execute(query, params) + return [dict(row) for row in cursor.fetchall()] + + def execute_update(self, query: str, params: tuple = ()) -> int: + """ + Execute an INSERT/UPDATE/DELETE query + + Args: + query: SQL query string + params: Query parameters + + Returns: + Number of rows affected + """ + with self.get_connection() as conn: + cursor = conn.cursor() + cursor.execute(query, params) + return cursor.rowcount + + def execute_transaction(self, queries: List[tuple]) -> bool: + """ + Execute multiple queries in a transaction + + Args: + queries: List of (query, params) tuples + + Returns: + True if transaction succeeded + + Raises: + Exception: If transaction fails + """ + with self.get_connection() as conn: + cursor = conn.cursor() + try: + for query, params in queries: + cursor.execute(query, params) + return True + except Exception as e: + logger.error(f"Transaction failed: {e}") + raise + + def close(self) -> None: + """Close all database connections""" + for conn in self._connections: + conn.close() + self._connections.clear() + logger.info("Closed all database connections") + + +class DatabaseServiceFactory: + """Factory for creating database service instances""" + + @staticmethod + def create_sqlite_service(db_path: Path, pool_size: int = 5) -> SQLiteDatabaseService: + """ + Create SQLite database service + + Args: + db_path: Path to SQLite database file + pool_size: Connection pool size + + Returns: + SQLiteDatabaseService instance + """ + return SQLiteDatabaseService(db_path, pool_size) + + @staticmethod + def create_service(db_type: str = "sqlite", **kwargs) -> DatabaseService: + """ + Create database service by type + + Args: + db_type: Type of database ("sqlite") + **kwargs: Database-specific configuration + + Returns: + DatabaseService instance + + Raises: + ValueError: If database type is unknown + """ + if db_type == "sqlite": + return DatabaseServiceFactory.create_sqlite_service(**kwargs) + else: + raise ValueError(f"Unknown database type: {db_type}")