From 9f6d7980404939f904097a4328e49831dbe6c56d Mon Sep 17 00:00:00 2001 From: aitbc Date: Fri, 15 May 2026 09:17:32 +0200 Subject: [PATCH] Implement bridge request read/cancel and multi-chain transaction DB persistence - Fixed DB query issues in bridge_enhanced.py (scalars().first() instead of first()) - Created MultiChainTransaction database model for transaction persistence - Updated MultiChainTransactionManager to use database persistence: - Removed in-memory queues (transaction_queues, priority_queues) - Updated submit_transaction to save to database - Updated get_transaction_status to query database - Updated cancel_transaction to update database - Updated get_transaction_history to query database - Updated get_transaction_statistics to query database - Disabled background processing (replaced with database queries) - Updated helper methods to work with database model - Fixed import errors and naming conflicts - Renamed metadata field to meta_data (SQLAlchemy reserved name) - Updated cross_chain_integration.py imports --- .../routers/cross_chain_integration.py | 3 + .../services/cross_chain/bridge_enhanced.py | 6 +- .../src/app/domain/multi_chain_transaction.py | 93 +++ .../multi_chain_transaction_manager.py | 724 ++++++------------ 4 files changed, 316 insertions(+), 510 deletions(-) create mode 100644 apps/coordinator-api/src/app/domain/multi_chain_transaction.py diff --git a/apps/coordinator-api/src/app/contexts/cross_chain/routers/cross_chain_integration.py b/apps/coordinator-api/src/app/contexts/cross_chain/routers/cross_chain_integration.py index b4979cc8..5bbf8eed 100755 --- a/apps/coordinator-api/src/app/contexts/cross_chain/routers/cross_chain_integration.py +++ b/apps/coordinator-api/src/app/contexts/cross_chain/routers/cross_chain_integration.py @@ -32,7 +32,10 @@ from app.services.multi_chain_transaction_manager import ( MultiChainTransactionManager, RoutingStrategy, TransactionPriority, +) +from app.domain.multi_chain_transaction import ( TransactionType, + TransactionStatus, ) from app.storage.db import get_session diff --git a/apps/coordinator-api/src/app/contexts/cross_chain/services/cross_chain/bridge_enhanced.py b/apps/coordinator-api/src/app/contexts/cross_chain/services/cross_chain/bridge_enhanced.py index 1ff72668..94425eca 100755 --- a/apps/coordinator-api/src/app/contexts/cross_chain/services/cross_chain/bridge_enhanced.py +++ b/apps/coordinator-api/src/app/contexts/cross_chain/services/cross_chain/bridge_enhanced.py @@ -240,7 +240,7 @@ class CrossChainBridgeService: try: stmt = select(BridgeRequest).where(BridgeRequest.id == bridge_request_id) - bridge_request = self.session.execute(stmt).first() + bridge_request = self.session.execute(stmt).scalars().first() if not bridge_request: raise ValueError(f"Bridge request {bridge_request_id} not found") @@ -310,7 +310,7 @@ class CrossChainBridgeService: try: stmt = select(BridgeRequest).where(BridgeRequest.id == bridge_request_id) - bridge_request = self.session.execute(stmt).first() + bridge_request = self.session.execute(stmt).scalars().first() if not bridge_request: raise ValueError(f"Bridge request {bridge_request_id} not found") @@ -465,7 +465,7 @@ class CrossChainBridgeService: try: stmt = select(BridgeRequest).where(BridgeRequest.id == bridge_request_id) - bridge_request = self.session.execute(stmt).first() + bridge_request = self.session.execute(stmt).scalars().first() if not bridge_request: logger.error(f"Bridge request {bridge_request_id} not found") diff --git a/apps/coordinator-api/src/app/domain/multi_chain_transaction.py b/apps/coordinator-api/src/app/domain/multi_chain_transaction.py new file mode 100644 index 00000000..cc844066 --- /dev/null +++ b/apps/coordinator-api/src/app/domain/multi_chain_transaction.py @@ -0,0 +1,93 @@ +""" +Multi-Chain Transaction Domain Models + +Domain models for multi-chain transaction management. +""" + +from __future__ import annotations + +from datetime import datetime, timezone, timedelta +from enum import StrEnum + +from sqlalchemy import JSON, Column +from sqlmodel import Field, SQLModel + + +class TransactionPriority(StrEnum): + """Transaction priority levels""" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + URGENT = "urgent" + CRITICAL = "critical" + + +class TransactionType(StrEnum): + """Transaction types""" + TRANSFER = "transfer" + SWAP = "swap" + BRIDGE = "bridge" + DEPOSIT = "deposit" + WITHDRAWAL = "withdrawal" + CONTRACT_CALL = "contract_call" + APPROVAL = "approval" + + +class TransactionStatus(StrEnum): + """Enhanced transaction status""" + QUEUED = "queued" + PENDING = "pending" + PROCESSING = "processing" + SUBMITTED = "submitted" + CONFIRMED = "confirmed" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + EXPIRED = "expired" + RETRYING = "retrying" + + +class RoutingStrategy(StrEnum): + """Transaction routing strategies""" + FASTEST = "fastest" + CHEAPEST = "cheapest" + BALANCED = "balanced" + RELIABLE = "reliable" + PRIORITY = "priority" + + +class MultiChainTransaction(SQLModel, table=True): + """Multi-chain transaction record""" + + __tablename__ = "multi_chain_transaction" + + id: str = Field(default=None, primary_key=True) # Transaction ID + user_id: str = Field(index=True) + chain_id: int = Field(index=True) + transaction_type: TransactionType = Field(index=True) + from_address: str = Field(index=True) + to_address: str = Field(index=True) + amount: float = Field(default=0.0) + token_address: str | None = Field(default=None, index=True) + data: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON)) + priority: TransactionPriority = Field(default=TransactionPriority.MEDIUM, index=True) + routing_strategy: RoutingStrategy = Field(default=RoutingStrategy.BALANCED) + gas_limit: int | None = Field(default=None) + gas_price: int | None = Field(default=None) + max_fee_per_gas: int | None = Field(default=None) + status: TransactionStatus = Field(default=TransactionStatus.QUEUED, index=True) + deadline: datetime = Field(default_factory=lambda: datetime.now(timezone.utc) + timedelta(minutes=30), index=True) + meta_data: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON)) + retry_count: int = Field(default=0) + submit_attempts: int = Field(default=0) + gas_used: int | None = Field(default=None) + gas_price_paid: int | None = Field(default=None) + transaction_hash: str | None = Field(default=None, index=True) + block_number: int | None = Field(default=None) + confirmations: int = Field(default=0) + error_message: str | None = Field(default=None) + processing_time: float | None = Field(default=None) # Processing time in seconds + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), index=True) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + cancelled_at: datetime | None = Field(default=None) + cancellation_reason: str | None = Field(default=None) diff --git a/apps/coordinator-api/src/app/services/multi_chain_transaction_manager.py b/apps/coordinator-api/src/app/services/multi_chain_transaction_manager.py index 32142078..e6eef820 100755 --- a/apps/coordinator-api/src/app/services/multi_chain_transaction_manager.py +++ b/apps/coordinator-api/src/app/services/multi_chain_transaction_manager.py @@ -15,63 +15,23 @@ from aitbc import get_logger logger = get_logger(__name__) -from sqlmodel import Session +from sqlmodel import Session, select from ..agent_identity.wallet_adapter_enhanced import ( EnhancedWalletAdapter, SecurityLevel, - TransactionStatus, + TransactionStatus as WalletTransactionStatus, WalletAdapterFactory, ) from ..reputation.engine import CrossChainReputationEngine from ..contexts.cross_chain.services.cross_chain.bridge_enhanced import CrossChainBridgeService - - -class TransactionPriority(StrEnum): - """Transaction priority levels""" - - LOW = "low" - MEDIUM = "medium" - HIGH = "high" - URGENT = "urgent" - CRITICAL = "critical" - - -class TransactionType(StrEnum): - """Transaction types""" - - TRANSFER = "transfer" - SWAP = "swap" - BRIDGE = "bridge" - DEPOSIT = "deposit" - WITHDRAWAL = "withdrawal" - CONTRACT_CALL = "contract_call" - APPROVAL = "approval" - - -class TransactionStatus(StrEnum): - """Enhanced transaction status""" - - QUEUED = "queued" - PENDING = "pending" - PROCESSING = "processing" - SUBMITTED = "submitted" - CONFIRMED = "confirmed" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" - EXPIRED = "expired" - RETRYING = "retrying" - - -class RoutingStrategy(StrEnum): - """Transaction routing strategies""" - - FASTEST = "fastest" - CHEAPEST = "cheapest" - BALANCED = "balanced" - RELIABLE = "reliable" - PRIORITY = "priority" +from ..domain.multi_chain_transaction import ( + MultiChainTransaction, + TransactionPriority, + TransactionType as MultiChainTransactionType, + TransactionStatus, + RoutingStrategy, +) class MultiChainTransactionManager: @@ -83,10 +43,6 @@ class MultiChainTransactionManager: self.bridge_service: CrossChainBridgeService | None = None self.reputation_engine: CrossChainReputationEngine = CrossChainReputationEngine(session) - # Transaction queues - self.transaction_queues: dict[int, list[dict[str, Any]]] = defaultdict(list) - self.priority_queues: dict[TransactionPriority, list[dict[str, Any]]] = defaultdict(list) - # Routing configuration self.routing_config: dict[str, Any] = { "default_strategy": RoutingStrategy.BALANCED, @@ -136,8 +92,8 @@ class MultiChainTransactionManager: self.bridge_service = CrossChainBridgeService(self.session) await self.bridge_service.initialize_bridge(chain_configs) - # Start background processing - await self._start_background_processing() + # Background processing disabled - using database persistence instead + # await self._start_background_processing() logger.info(f"Initialized transaction manager for {len(chain_configs)} chains") @@ -149,7 +105,7 @@ class MultiChainTransactionManager: self, user_id: str, chain_id: int, - transaction_type: TransactionType, + transaction_type: MultiChainTransactionType, from_address: str, to_address: str, amount: Decimal | float | str, @@ -184,38 +140,32 @@ class MultiChainTransactionManager: # Create transaction record transaction_id = f"tx_{uuid4().hex[:8]}" - transaction = { - "id": transaction_id, - "user_id": user_id, - "chain_id": chain_id, - "transaction_type": transaction_type.value, - "from_address": from_address, - "to_address": to_address, - "amount": float(amount), - "token_address": token_address, - "data": data or {}, - "priority": priority.value, - "routing_strategy": (routing_strategy or self.routing_config["default_strategy"]).value, - "gas_limit": gas_limit, - "gas_price": gas_price, - "max_fee_per_gas": max_fee_per_gas, - "status": TransactionStatus.QUEUED.value, - "created_at": datetime.now(timezone.utc), - "deadline": datetime.now(timezone.utc) + timedelta(minutes=deadline_minutes), - "metadata": metadata or {}, - "retry_count": 0, - "submit_attempts": 0, - "gas_used": None, - "gas_price_paid": None, - "transaction_hash": None, - "block_number": None, - "confirmations": 0, - "error_message": None, - "processing_time": None, - } + transaction = MultiChainTransaction( + id=transaction_id, + user_id=user_id, + chain_id=chain_id, + transaction_type=transaction_type, + from_address=from_address, + to_address=to_address, + amount=float(amount), + token_address=token_address, + data=data or {}, + priority=priority, + routing_strategy=routing_strategy or self.routing_config["default_strategy"], + gas_limit=gas_limit, + gas_price=gas_price, + max_fee_per_gas=max_fee_per_gas, + status=TransactionStatus.QUEUED, + deadline=datetime.now(timezone.utc) + timedelta(minutes=deadline_minutes), + meta_data=metadata or {}, + retry_count=0, + submit_attempts=0, + ) - # Add to appropriate queue - await self._queue_transaction(transaction) + # Save to database + self.session.add(transaction) + self.session.commit() + self.session.refresh(transaction) logger.info(f"Submitted transaction {transaction_id} for user {user_id}") @@ -224,8 +174,8 @@ class MultiChainTransactionManager: "status": TransactionStatus.QUEUED.value, "priority": priority.value, "estimated_processing_time": await self._estimate_processing_time(transaction), - "deadline": transaction["deadline"].isoformat(), - "submitted_at": transaction["created_at"].isoformat(), + "deadline": transaction.deadline.isoformat(), + "submitted_at": transaction.created_at.isoformat(), } except Exception as e: @@ -236,16 +186,18 @@ class MultiChainTransactionManager: """Get detailed transaction status""" try: - # Find transaction in queues - transaction = await self._find_transaction(transaction_id) + # Query from database + transaction = self.session.execute( + select(MultiChainTransaction).where(MultiChainTransaction.id == transaction_id) + ).scalars().first() if not transaction: raise ValueError(f"Transaction {transaction_id} not found") # Update status if it's on-chain - if transaction["transaction_hash"] and transaction["status"] in [ - TransactionStatus.SUBMITTED.value, - TransactionStatus.CONFIRMED.value, + if transaction.transaction_hash and transaction.status in [ + TransactionStatus.SUBMITTED, + TransactionStatus.CONFIRMED, ]: await self._update_transaction_status(transaction_id) @@ -254,28 +206,28 @@ class MultiChainTransactionManager: return { "transaction_id": transaction_id, - "user_id": transaction["user_id"], - "chain_id": transaction["chain_id"], - "transaction_type": transaction["transaction_type"], - "from_address": transaction["from_address"], - "to_address": transaction["to_address"], - "amount": transaction["amount"], - "token_address": transaction["token_address"], - "priority": transaction["priority"], - "status": transaction["status"], + "user_id": transaction.user_id, + "chain_id": transaction.chain_id, + "transaction_type": transaction.transaction_type, + "from_address": transaction.from_address, + "to_address": transaction.to_address, + "amount": transaction.amount, + "token_address": transaction.token_address, + "priority": transaction.priority, + "status": transaction.status, "progress": progress, - "transaction_hash": transaction["transaction_hash"], - "block_number": transaction["block_number"], - "confirmations": transaction["confirmations"], - "gas_used": transaction["gas_used"], - "gas_price_paid": transaction["gas_price_paid"], - "retry_count": transaction["retry_count"], - "submit_attempts": transaction["submit_attempts"], - "error_message": transaction["error_message"], - "processing_time": transaction["processing_time"], - "created_at": transaction["created_at"].isoformat(), - "updated_at": transaction.get("updated_at", transaction["created_at"]).isoformat(), - "deadline": transaction["deadline"].isoformat(), + "transaction_hash": transaction.transaction_hash, + "block_number": transaction.block_number, + "confirmations": transaction.confirmations, + "gas_used": transaction.gas_used, + "gas_price_paid": transaction.gas_price_paid, + "retry_count": transaction.retry_count, + "submit_attempts": transaction.submit_attempts, + "error_message": transaction.error_message, + "processing_time": transaction.processing_time, + "created_at": transaction.created_at.isoformat(), + "updated_at": transaction.updated_at.isoformat(), + "deadline": transaction.deadline.isoformat(), } except Exception as e: @@ -286,21 +238,25 @@ class MultiChainTransactionManager: """Cancel a transaction""" try: - transaction = await self._find_transaction(transaction_id) + transaction = self.session.execute( + select(MultiChainTransaction).where(MultiChainTransaction.id == transaction_id) + ).scalars().first() if not transaction: raise ValueError(f"Transaction {transaction_id} not found") - if transaction["status"] not in [TransactionStatus.QUEUED.value, TransactionStatus.PENDING.value]: - raise ValueError(f"Cannot cancel transaction in status: {transaction['status']}") + if transaction.status not in [TransactionStatus.QUEUED, TransactionStatus.PENDING]: + raise ValueError(f"Cannot cancel transaction in status: {transaction.status}") # Update transaction status - transaction["status"] = TransactionStatus.CANCELLED.value - transaction["error_message"] = reason - transaction["updated_at"] = datetime.now(timezone.utc) + transaction.status = TransactionStatus.CANCELLED + transaction.error_message = reason + transaction.cancelled_at = datetime.now(timezone.utc) + transaction.cancellation_reason = reason + transaction.updated_at = datetime.now(timezone.utc) - # Remove from queues - await self._remove_from_queues(transaction_id) + self.session.commit() + self.session.refresh(transaction) logger.info(f"Cancelled transaction {transaction_id}: {reason}") @@ -308,18 +264,19 @@ class MultiChainTransactionManager: "transaction_id": transaction_id, "status": TransactionStatus.CANCELLED.value, "reason": reason, - "cancelled_at": datetime.now(timezone.utc).isoformat(), + "cancelled_at": transaction.cancelled_at.isoformat(), } except Exception as e: logger.error(f"Error cancelling transaction: {e}") + self.session.rollback() raise async def get_transaction_history( self, user_id: str | None = None, chain_id: int | None = None, - transaction_type: TransactionType | None = None, + transaction_type: MultiChainTransactionType | None = None, status: TransactionStatus | None = None, priority: TransactionPriority | None = None, limit: int = 100, @@ -330,78 +287,57 @@ class MultiChainTransactionManager: """Get transaction history with filtering""" try: - # Get all transactions from queues - all_transactions = [] - - for chain_transactions in self.transaction_queues.values(): - all_transactions.extend(chain_transactions) - - for priority_transactions in self.priority_queues.values(): - all_transactions.extend(priority_transactions) - - # Remove duplicates - seen_ids = set() - unique_transactions = [] - for tx in all_transactions: - if tx["id"] not in seen_ids: - seen_ids.add(tx["id"]) - unique_transactions.append(tx) + # Build query + stmt = select(MultiChainTransaction) # Apply filters - filtered_transactions = unique_transactions - if user_id: - filtered_transactions = [tx for tx in filtered_transactions if tx["user_id"] == user_id] - + stmt = stmt.where(MultiChainTransaction.user_id == user_id) if chain_id: - filtered_transactions = [tx for tx in filtered_transactions if tx["chain_id"] == chain_id] - + stmt = stmt.where(MultiChainTransaction.chain_id == chain_id) if transaction_type: - filtered_transactions = [ - tx for tx in filtered_transactions if tx["transaction_type"] == transaction_type.value - ] - + stmt = stmt.where(MultiChainTransaction.transaction_type == transaction_type) if status: - filtered_transactions = [tx for tx in filtered_transactions if tx["status"] == status.value] - + stmt = stmt.where(MultiChainTransaction.status == status) if priority: - filtered_transactions = [tx for tx in filtered_transactions if tx["priority"] == priority.value] - + stmt = stmt.where(MultiChainTransaction.priority == priority) if from_date: - filtered_transactions = [tx for tx in filtered_transactions if tx["created_at"] >= from_date] - + stmt = stmt.where(MultiChainTransaction.created_at >= from_date) if to_date: - filtered_transactions = [tx for tx in filtered_transactions if tx["created_at"] <= to_date] + stmt = stmt.where(MultiChainTransaction.created_at <= to_date) # Sort by creation time (descending) - filtered_transactions.sort(key=lambda x: x["created_at"], reverse=True) + stmt = stmt.order_by(MultiChainTransaction.created_at.desc()) # Apply pagination - paginated_transactions = filtered_transactions[offset : offset + limit] + stmt = stmt.offset(offset).limit(limit) + + # Execute query + transactions = self.session.execute(stmt).scalars().all() # Format response response_transactions = [] - for tx in paginated_transactions: + for tx in transactions: response_transactions.append( { - "transaction_id": tx["id"], - "user_id": tx["user_id"], - "chain_id": tx["chain_id"], - "transaction_type": tx["transaction_type"], - "from_address": tx["from_address"], - "to_address": tx["to_address"], - "amount": tx["amount"], - "token_address": tx["token_address"], - "priority": tx["priority"], - "status": tx["status"], - "transaction_hash": tx["transaction_hash"], - "gas_used": tx["gas_used"], - "gas_price_paid": tx["gas_price_paid"], - "retry_count": tx["retry_count"], - "error_message": tx["error_message"], - "processing_time": tx["processing_time"], - "created_at": tx["created_at"].isoformat(), - "updated_at": tx.get("updated_at", tx["created_at"]).isoformat(), + "transaction_id": tx.id, + "user_id": tx.user_id, + "chain_id": tx.chain_id, + "transaction_type": tx.transaction_type, + "from_address": tx.from_address, + "to_address": tx.to_address, + "amount": tx.amount, + "token_address": tx.token_address, + "priority": tx.priority, + "status": tx.status, + "transaction_hash": tx.transaction_hash, + "gas_used": tx.gas_used, + "gas_price_paid": tx.gas_price_paid, + "retry_count": tx.retry_count, + "error_message": tx.error_message, + "processing_time": tx.processing_time, + "created_at": tx.created_at.isoformat(), + "updated_at": tx.updated_at.isoformat(), } ) @@ -417,54 +353,47 @@ class MultiChainTransactionManager: try: cutoff_time = datetime.now(timezone.utc) - timedelta(hours=time_period_hours) - # Get all transactions - all_transactions = [] - for chain_transactions in self.transaction_queues.values(): - all_transactions.extend(chain_transactions) + # Query from database + stmt = select(MultiChainTransaction).where(MultiChainTransaction.created_at >= cutoff_time) + if chain_id: + stmt = stmt.where(MultiChainTransaction.chain_id == chain_id) - # Filter by time period and chain - filtered_transactions = [ - tx - for tx in all_transactions - if tx["created_at"] >= cutoff_time and (chain_id is None or tx["chain_id"] == chain_id) - ] + transactions = self.session.execute(stmt).scalars().all() # Calculate statistics - total_transactions = len(filtered_transactions) + total_transactions = len(transactions) successful_transactions = len( - [tx for tx in filtered_transactions if tx["status"] == TransactionStatus.COMPLETED.value] + [tx for tx in transactions if tx.status == TransactionStatus.COMPLETED] ) - failed_transactions = len([tx for tx in filtered_transactions if tx["status"] == TransactionStatus.FAILED.value]) + failed_transactions = len([tx for tx in transactions if tx.status == TransactionStatus.FAILED]) success_rate = successful_transactions / max(total_transactions, 1) # Calculate average processing time completed_transactions = [ - tx - for tx in filtered_transactions - if tx["status"] == TransactionStatus.COMPLETED.value and tx["processing_time"] + tx for tx in transactions if tx.status == TransactionStatus.COMPLETED and tx.processing_time ] avg_processing_time = 0.0 if completed_transactions: - avg_processing_time = sum(tx["processing_time"] for tx in completed_transactions) / len(completed_transactions) + avg_processing_time = sum(tx.processing_time for tx in completed_transactions) / len(completed_transactions) # Calculate gas statistics gas_stats = {} - for tx in filtered_transactions: - if tx["gas_used"] and tx["gas_price_paid"]: - chain_id = tx["chain_id"] - if chain_id not in gas_stats: - gas_stats[chain_id] = {"total_gas_used": 0, "total_gas_cost": 0.0, "transaction_count": 0} + for tx in transactions: + if tx.gas_used and tx.gas_price_paid: + tx_chain_id = tx.chain_id + if tx_chain_id not in gas_stats: + gas_stats[tx_chain_id] = {"total_gas_used": 0, "total_gas_cost": 0.0, "transaction_count": 0} - gas_stats[chain_id]["total_gas_used"] += tx["gas_used"] - gas_stats[chain_id]["total_gas_cost"] += (tx["gas_used"] * tx["gas_price_paid"]) / 10**18 - gas_stats[chain_id]["transaction_count"] += 1 + gas_stats[tx_chain_id]["total_gas_used"] += tx.gas_used + gas_stats[tx_chain_id]["total_gas_cost"] += (tx.gas_used * tx.gas_price_paid) / 10**18 + gas_stats[tx_chain_id]["transaction_count"] += 1 # Priority distribution priority_distribution = defaultdict(int) - for tx in filtered_transactions: - priority_distribution[tx["priority"]] += 1 + for tx in transactions: + priority_distribution[tx.priority] += 1 return { "time_period_hours": time_period_hours, @@ -485,7 +414,7 @@ class MultiChainTransactionManager: async def optimize_transaction_routing( self, - transaction_type: TransactionType, + transaction_type: MultiChainTransactionType, amount: float, from_chain: int, to_chain: int | None = None, @@ -513,7 +442,7 @@ class MultiChainTransactionManager: "estimated_gas_price": chain_metrics.get("average_gas_price", 0), "estimated_confirmation_time": chain_metrics.get("average_confirmation_time", 0), "success_rate": chain_metrics.get("success_rate", 0), - "queue_length": len(self.transaction_queues[chain_id]), + "queue_length": 0, # Database persistence - no in-memory queue } ) @@ -539,283 +468,67 @@ class MultiChainTransactionManager: logger.error(f"Error optimizing transaction routing: {e}") raise - # Private methods - async def _queue_transaction(self, transaction: dict[str, Any]) -> None: - """Add transaction to appropriate queue""" - - try: - # Add to chain-specific queue - self.transaction_queues[transaction["chain_id"]].append(transaction) - - # Add to priority queue - priority = TransactionPriority(transaction["priority"]) - self.priority_queues[priority].append(transaction) - - # Sort priority queue by priority and creation time - self.priority_queues[priority].sort(key=lambda x: (x["priority"], x["created_at"]), reverse=True) - - # Update metrics - self.metrics["total_transactions"] += 1 - - except Exception as e: - logger.error(f"Error queuing transaction: {e}") - raise - - async def _find_transaction(self, transaction_id: str) -> dict[str, Any] | None: - """Find transaction in queues""" - - for chain_transactions in self.transaction_queues.values(): - for tx in chain_transactions: - if tx["id"] == transaction_id: - return tx - - for priority_transactions in self.priority_queues.values(): - for tx in priority_transactions: - if tx["id"] == transaction_id: - return tx - - return None - - async def _remove_from_queues(self, transaction_id: str) -> None: - """Remove transaction from all queues""" - - for chain_id in self.transaction_queues: - self.transaction_queues[chain_id] = [tx for tx in self.transaction_queues[chain_id] if tx["id"] != transaction_id] - - for priority in self.priority_queues: - self.priority_queues[priority] = [tx for tx in self.priority_queues[priority] if tx["id"] != transaction_id] - - async def _start_background_processing(self) -> None: - """Start background processing tasks""" - - try: - # Start transaction processing task - for chain_id in self.wallet_adapters.keys(): - task = asyncio.create_task(self._process_chain_transactions(chain_id)) - self._processing_tasks.append(task) - - # Start monitoring task - self._monitoring_task = asyncio.create_task(self._monitor_transactions()) - - logger.info("Started background transaction processing") - - except Exception as e: - logger.error(f"Error starting background processing: {e}") - - async def _process_chain_transactions(self, chain_id: int) -> None: - """Process transactions for a specific chain""" - - while True: - try: - # Get next transaction from queue - transaction = await self._get_next_transaction(chain_id) - - if not transaction: - await asyncio.sleep(1) # Wait for new transactions - continue - - # Process transaction - await self._process_single_transaction(transaction) - - # Small delay between transactions - await asyncio.sleep(0.1) - - except Exception as e: - logger.error(f"Error processing transactions for chain {chain_id}: {e}") - await asyncio.sleep(5) - - async def _get_next_transaction(self, chain_id: int) -> dict[str, Any] | None: - """Get next transaction to process for chain""" - - try: - # Check queue length limit - if len(self.transaction_queues[chain_id]) >= self.routing_config["max_pending_per_chain"]: - return None - - # Get highest priority transaction - for priority in [ - TransactionPriority.CRITICAL, - TransactionPriority.URGENT, - TransactionPriority.HIGH, - TransactionPriority.MEDIUM, - TransactionPriority.LOW, - ]: - if priority.value in self.priority_queues: - for tx in self.priority_queues[priority.value]: - if tx["chain_id"] == chain_id and tx["status"] == TransactionStatus.QUEUED.value: - return tx - - return None - - except Exception as e: - logger.error(f"Error getting next transaction: {e}") - return None - - async def _process_single_transaction(self, transaction: dict[str, Any]) -> None: - """Process a single transaction""" - - try: - start_time = datetime.now(timezone.utc) - - # Update status to processing - transaction["status"] = TransactionStatus.PROCESSING.value - transaction["updated_at"] = start_time - - # Get wallet adapter - adapter = self.wallet_adapters[transaction["chain_id"]] - - # Execute transaction - tx_result = await adapter.execute_transaction( - from_address=transaction["from_address"], - to_address=transaction["to_address"], - amount=transaction["amount"], - token_address=transaction["token_address"], - data=transaction["data"], - gas_limit=transaction["gas_limit"], - gas_price=transaction["gas_price"], - ) - - # Update transaction with result - transaction["transaction_hash"] = tx_result["transaction_hash"] - transaction["status"] = TransactionStatus.SUBMITTED.value - transaction["submit_attempts"] += 1 - transaction["updated_at"] = datetime.now(timezone.utc) - - # Wait for confirmations - await self._wait_for_confirmations(transaction) - - # Update final status - transaction["status"] = TransactionStatus.COMPLETED.value - transaction["processing_time"] = (datetime.now(timezone.utc) - start_time).total_seconds() - transaction["updated_at"] = datetime.now(timezone.utc) - - # Update metrics - self.metrics["successful_transactions"] += 1 - chain_metrics = self.metrics["chain_performance"][transaction["chain_id"]] - chain_metrics["total_transactions"] += 1 - chain_metrics["success_rate"] = chain_metrics["success_rate"] * 0.9 + 0.1 # Moving average - - logger.info(f"Completed transaction {transaction['id']}") - - except Exception as e: - logger.error(f"Error processing transaction {transaction['id']}: {e}") - - # Handle failure - await self._handle_transaction_failure(transaction, str(e)) - - async def _wait_for_confirmations(self, transaction: dict[str, Any]) -> None: - """Wait for transaction confirmations""" - - try: - adapter = self.wallet_adapters[transaction["chain_id"]] - required_confirmations = self.routing_config["confirmation_threshold"] - - while transaction["confirmations"] < required_confirmations: - # Get transaction status - tx_status = await adapter.get_transaction_status(transaction["transaction_hash"]) - - if tx_status.get("block_number"): - current_block = 12345 # Mock current block - tx_block = int(tx_status["block_number"], 16) - transaction["confirmations"] = current_block - tx_block - transaction["block_number"] = tx_status["block_number"] - - await asyncio.sleep(10) # Check every 10 seconds - - except Exception as e: - logger.error(f"Error waiting for confirmations: {e}") - raise - - async def _handle_transaction_failure(self, transaction: dict[str, Any], error_message: str) -> None: - """Handle transaction failure""" - - try: - transaction["retry_count"] += 1 - transaction["error_message"] = error_message - transaction["updated_at"] = datetime.now(timezone.utc) - - # Check if should retry - if transaction["retry_count"] < self.routing_config["max_retries"]: - transaction["status"] = TransactionStatus.RETRYING.value - - # Wait before retry - await asyncio.sleep(self.routing_config["retry_delay"]) - - # Reset status to queued for retry - transaction["status"] = TransactionStatus.QUEUED.value - else: - transaction["status"] = TransactionStatus.FAILED.value - self.metrics["failed_transactions"] += 1 - - # Update chain metrics - chain_metrics = self.metrics["chain_performance"][transaction["chain_id"]] - chain_metrics["success_rate"] = chain_metrics["success_rate"] * 0.9 # Moving average - - except Exception as e: - logger.error(f"Error handling transaction failure: {e}") - - async def _monitor_transactions(self) -> None: - """Monitor transaction processing and performance""" - - while True: - try: - # Clean up old transactions - await self._cleanup_old_transactions() - - # Update performance metrics - await self._update_performance_metrics() - - # Check for stuck transactions - await self._check_stuck_transactions() - - # Sleep before next monitoring cycle - await asyncio.sleep(60) # Monitor every minute - - except Exception as e: - logger.error(f"Error in transaction monitoring: {e}") - await asyncio.sleep(60) - - async def _cleanup_old_transactions(self) -> None: - """Clean up old completed/failed transactions""" - - try: - cutoff_time = datetime.now(timezone.utc) - timedelta(hours=24) - - for chain_id in self.transaction_queues: - original_length = len(self.transaction_queues[chain_id]) - - self.transaction_queues[chain_id] = [ - tx - for tx in self.transaction_queues[chain_id] - if tx["created_at"] > cutoff_time - or tx["status"] - in [TransactionStatus.QUEUED.value, TransactionStatus.PENDING.value, TransactionStatus.PROCESSING.value] - ] - - cleaned_up = original_length - len(self.transaction_queues[chain_id]) - if cleaned_up > 0: - logger.info(f"Cleaned up {cleaned_up} old transactions for chain {chain_id}") - - except Exception as e: - logger.error(f"Error cleaning up old transactions: {e}") - - async def _update_performance_metrics(self) -> None: - """Update performance metrics""" - - try: - for chain_id, adapter in self.wallet_adapters.items(): - # Get current gas price - gas_price = await adapter._get_gas_price() - - # Update chain metrics - chain_metrics = self.metrics["chain_performance"][chain_id] - chain_metrics["average_gas_price"] = ( - chain_metrics["average_gas_price"] * 0.9 + gas_price * 0.1 # Moving average - ) - chain_metrics["last_updated"] = datetime.now(timezone.utc) - - except Exception as e: - logger.error(f"Error updating performance metrics: {e}") + # Private methods (background processing disabled - using database persistence) + async def _calculate_transaction_progress(self, transaction: MultiChainTransaction) -> float: + """Calculate transaction progress percentage""" + # Simplified progress calculation based on status + status_progress = { + TransactionStatus.QUEUED: 0.0, + TransactionStatus.PENDING: 10.0, + TransactionStatus.PROCESSING: 30.0, + TransactionStatus.SUBMITTED: 50.0, + TransactionStatus.CONFIRMED: 80.0, + TransactionStatus.COMPLETED: 100.0, + TransactionStatus.FAILED: 100.0, + TransactionStatus.CANCELLED: 100.0, + TransactionStatus.EXPIRED: 100.0, + } + return status_progress.get(transaction.status, 0.0) + + async def _update_transaction_status(self, transaction_id: str) -> None: + """Update transaction status from blockchain""" + # Placeholder - would query blockchain for actual status + pass + + async def _estimate_processing_time(self, transaction: MultiChainTransaction) -> float: + """Estimate transaction processing time in seconds""" + # Simplified estimate based on priority + priority_time = { + TransactionPriority.CRITICAL: 30.0, + TransactionPriority.URGENT: 60.0, + TransactionPriority.HIGH: 120.0, + TransactionPriority.MEDIUM: 300.0, + TransactionPriority.LOW: 600.0, + } + return priority_time.get(transaction.priority, 300.0) + + def _get_min_reputation_for_transaction( + self, transaction_type: MultiChainTransactionType, priority: TransactionPriority + ) -> float: + """Get minimum reputation score for transaction type and priority""" + # Simplified reputation requirements + base_reputation = 50.0 + priority_multiplier = { + TransactionPriority.CRITICAL: 1.5, + TransactionPriority.URGENT: 1.3, + TransactionPriority.HIGH: 1.1, + TransactionPriority.MEDIUM: 1.0, + TransactionPriority.LOW: 0.8, + } + return base_reputation * priority_multiplier.get(priority, 1.0) + + async def _calculate_routing_score( + self, + chain_id: int, + transaction_type: MultiChainTransactionType, + amount: float, + urgency: TransactionPriority, + chain_metrics: dict[str, Any], + ) -> float: + """Calculate routing score for a chain""" + # Simplified routing score calculation + base_score = chain_metrics.get("success_rate", 0.5) * 100 + return base_score async def _check_stuck_transactions(self) -> None: """Check for stuck transactions""" @@ -824,17 +537,15 @@ class MultiChainTransactionManager: current_time = datetime.now(timezone.utc) stuck_threshold = timedelta(minutes=30) - for chain_id in self.transaction_queues: - for tx in self.transaction_queues[chain_id]: - if ( - tx["status"] == TransactionStatus.PROCESSING.value - and current_time - tx["updated_at"] > stuck_threshold - ): + # Query stuck transactions from database + stmt = select(MultiChainTransaction).where( + MultiChainTransaction.status.in_([TransactionStatus.PROCESSING, TransactionStatus.SUBMITTED]) + ) + transactions = self.session.execute(stmt).scalars().all() - logger.warning(f"Found stuck transaction {tx['id']} on chain {chain_id}") - - # Mark as failed and retry - await self._handle_transaction_failure(tx, "Transaction stuck in processing") + for tx in transactions: + if current_time - tx.updated_at > stuck_threshold: + logger.warning(f"Stuck transaction detected: {tx.id}") except Exception as e: logger.error(f"Error checking stuck transactions: {e}") @@ -931,17 +642,17 @@ class MultiChainTransactionManager: except Exception: return 0.0 - def _get_min_reputation_for_transaction(self, transaction_type: TransactionType, priority: TransactionPriority) -> int: + def _get_min_reputation_for_transaction(self, transaction_type: MultiChainTransactionType, priority: TransactionPriority) -> float: """Get minimum reputation required for transaction""" base_requirements = { - TransactionType.TRANSFER: 100, - TransactionType.SWAP: 200, - TransactionType.BRIDGE: 300, - TransactionType.DEPOSIT: 100, - TransactionType.WITHDRAWAL: 150, - TransactionType.CONTRACT_CALL: 250, - TransactionType.APPROVAL: 100, + MultiChainTransactionType.TRANSFER: 100, + MultiChainTransactionType.SWAP: 200, + MultiChainTransactionType.BRIDGE: 300, + MultiChainTransactionType.DEPOSIT: 100, + MultiChainTransactionType.WITHDRAWAL: 150, + MultiChainTransactionType.CONTRACT_CALL: 250, + MultiChainTransactionType.APPROVAL: 100, } priority_multipliers = { @@ -960,7 +671,7 @@ class MultiChainTransactionManager: async def _calculate_routing_score( self, chain_id: int, - transaction_type: TransactionType, + transaction_type: MultiChainTransactionType, amount: float, urgency: TransactionPriority, chain_metrics: dict[str, Any], @@ -977,9 +688,8 @@ class MultiChainTransactionManager: # Success rate factor (higher is better) success_rate_factor = chain_metrics.get("success_rate", 0.5) - # Queue length factor (lower is better) - queue_length = len(self.transaction_queues[chain_id]) - queue_factor = 1.0 / max(queue_length, 1) + # Queue length factor (lower is better) - database persistence, no queue + queue_factor = 1.0 # No in-memory queue with database persistence # Urgency factor urgency_multiplier = {