Implement bridge request read/cancel and multi-chain transaction DB persistence
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled

- Fixed DB query issues in bridge_enhanced.py (scalars().first() instead of first())
- Created MultiChainTransaction database model for transaction persistence
- Updated MultiChainTransactionManager to use database persistence:
  - Removed in-memory queues (transaction_queues, priority_queues)
  - Updated submit_transaction to save to database
  - Updated get_transaction_status to query database
  - Updated cancel_transaction to update database
  - Updated get_transaction_history to query database
  - Updated get_transaction_statistics to query database
  - Disabled background processing (replaced with database queries)
  - Updated helper methods to work with database model
- Fixed import errors and naming conflicts
- Renamed metadata field to meta_data (SQLAlchemy reserved name)
- Updated cross_chain_integration.py imports
This commit is contained in:
aitbc
2026-05-15 09:17:32 +02:00
parent 73b04f6117
commit 9f6d798040
4 changed files with 316 additions and 510 deletions

View File

@@ -32,7 +32,10 @@ from app.services.multi_chain_transaction_manager import (
MultiChainTransactionManager,
RoutingStrategy,
TransactionPriority,
)
from app.domain.multi_chain_transaction import (
TransactionType,
TransactionStatus,
)
from app.storage.db import get_session

View File

@@ -240,7 +240,7 @@ class CrossChainBridgeService:
try:
stmt = select(BridgeRequest).where(BridgeRequest.id == bridge_request_id)
bridge_request = self.session.execute(stmt).first()
bridge_request = self.session.execute(stmt).scalars().first()
if not bridge_request:
raise ValueError(f"Bridge request {bridge_request_id} not found")
@@ -310,7 +310,7 @@ class CrossChainBridgeService:
try:
stmt = select(BridgeRequest).where(BridgeRequest.id == bridge_request_id)
bridge_request = self.session.execute(stmt).first()
bridge_request = self.session.execute(stmt).scalars().first()
if not bridge_request:
raise ValueError(f"Bridge request {bridge_request_id} not found")
@@ -465,7 +465,7 @@ class CrossChainBridgeService:
try:
stmt = select(BridgeRequest).where(BridgeRequest.id == bridge_request_id)
bridge_request = self.session.execute(stmt).first()
bridge_request = self.session.execute(stmt).scalars().first()
if not bridge_request:
logger.error(f"Bridge request {bridge_request_id} not found")

View File

@@ -0,0 +1,93 @@
"""
Multi-Chain Transaction Domain Models
Domain models for multi-chain transaction management.
"""
from __future__ import annotations
from datetime import datetime, timezone, timedelta
from enum import StrEnum
from sqlalchemy import JSON, Column
from sqlmodel import Field, SQLModel
class TransactionPriority(StrEnum):
"""Transaction priority levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
CRITICAL = "critical"
class TransactionType(StrEnum):
"""Transaction types"""
TRANSFER = "transfer"
SWAP = "swap"
BRIDGE = "bridge"
DEPOSIT = "deposit"
WITHDRAWAL = "withdrawal"
CONTRACT_CALL = "contract_call"
APPROVAL = "approval"
class TransactionStatus(StrEnum):
"""Enhanced transaction status"""
QUEUED = "queued"
PENDING = "pending"
PROCESSING = "processing"
SUBMITTED = "submitted"
CONFIRMED = "confirmed"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
EXPIRED = "expired"
RETRYING = "retrying"
class RoutingStrategy(StrEnum):
"""Transaction routing strategies"""
FASTEST = "fastest"
CHEAPEST = "cheapest"
BALANCED = "balanced"
RELIABLE = "reliable"
PRIORITY = "priority"
class MultiChainTransaction(SQLModel, table=True):
"""Multi-chain transaction record"""
__tablename__ = "multi_chain_transaction"
id: str = Field(default=None, primary_key=True) # Transaction ID
user_id: str = Field(index=True)
chain_id: int = Field(index=True)
transaction_type: TransactionType = Field(index=True)
from_address: str = Field(index=True)
to_address: str = Field(index=True)
amount: float = Field(default=0.0)
token_address: str | None = Field(default=None, index=True)
data: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
priority: TransactionPriority = Field(default=TransactionPriority.MEDIUM, index=True)
routing_strategy: RoutingStrategy = Field(default=RoutingStrategy.BALANCED)
gas_limit: int | None = Field(default=None)
gas_price: int | None = Field(default=None)
max_fee_per_gas: int | None = Field(default=None)
status: TransactionStatus = Field(default=TransactionStatus.QUEUED, index=True)
deadline: datetime = Field(default_factory=lambda: datetime.now(timezone.utc) + timedelta(minutes=30), index=True)
meta_data: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
retry_count: int = Field(default=0)
submit_attempts: int = Field(default=0)
gas_used: int | None = Field(default=None)
gas_price_paid: int | None = Field(default=None)
transaction_hash: str | None = Field(default=None, index=True)
block_number: int | None = Field(default=None)
confirmations: int = Field(default=0)
error_message: str | None = Field(default=None)
processing_time: float | None = Field(default=None) # Processing time in seconds
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), index=True)
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
cancelled_at: datetime | None = Field(default=None)
cancellation_reason: str | None = Field(default=None)

View File

@@ -15,63 +15,23 @@ from aitbc import get_logger
logger = get_logger(__name__)
from sqlmodel import Session
from sqlmodel import Session, select
from ..agent_identity.wallet_adapter_enhanced import (
EnhancedWalletAdapter,
SecurityLevel,
TransactionStatus,
TransactionStatus as WalletTransactionStatus,
WalletAdapterFactory,
)
from ..reputation.engine import CrossChainReputationEngine
from ..contexts.cross_chain.services.cross_chain.bridge_enhanced import CrossChainBridgeService
class TransactionPriority(StrEnum):
"""Transaction priority levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
CRITICAL = "critical"
class TransactionType(StrEnum):
"""Transaction types"""
TRANSFER = "transfer"
SWAP = "swap"
BRIDGE = "bridge"
DEPOSIT = "deposit"
WITHDRAWAL = "withdrawal"
CONTRACT_CALL = "contract_call"
APPROVAL = "approval"
class TransactionStatus(StrEnum):
"""Enhanced transaction status"""
QUEUED = "queued"
PENDING = "pending"
PROCESSING = "processing"
SUBMITTED = "submitted"
CONFIRMED = "confirmed"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
EXPIRED = "expired"
RETRYING = "retrying"
class RoutingStrategy(StrEnum):
"""Transaction routing strategies"""
FASTEST = "fastest"
CHEAPEST = "cheapest"
BALANCED = "balanced"
RELIABLE = "reliable"
PRIORITY = "priority"
from ..domain.multi_chain_transaction import (
MultiChainTransaction,
TransactionPriority,
TransactionType as MultiChainTransactionType,
TransactionStatus,
RoutingStrategy,
)
class MultiChainTransactionManager:
@@ -83,10 +43,6 @@ class MultiChainTransactionManager:
self.bridge_service: CrossChainBridgeService | None = None
self.reputation_engine: CrossChainReputationEngine = CrossChainReputationEngine(session)
# Transaction queues
self.transaction_queues: dict[int, list[dict[str, Any]]] = defaultdict(list)
self.priority_queues: dict[TransactionPriority, list[dict[str, Any]]] = defaultdict(list)
# Routing configuration
self.routing_config: dict[str, Any] = {
"default_strategy": RoutingStrategy.BALANCED,
@@ -136,8 +92,8 @@ class MultiChainTransactionManager:
self.bridge_service = CrossChainBridgeService(self.session)
await self.bridge_service.initialize_bridge(chain_configs)
# Start background processing
await self._start_background_processing()
# Background processing disabled - using database persistence instead
# await self._start_background_processing()
logger.info(f"Initialized transaction manager for {len(chain_configs)} chains")
@@ -149,7 +105,7 @@ class MultiChainTransactionManager:
self,
user_id: str,
chain_id: int,
transaction_type: TransactionType,
transaction_type: MultiChainTransactionType,
from_address: str,
to_address: str,
amount: Decimal | float | str,
@@ -184,38 +140,32 @@ class MultiChainTransactionManager:
# Create transaction record
transaction_id = f"tx_{uuid4().hex[:8]}"
transaction = {
"id": transaction_id,
"user_id": user_id,
"chain_id": chain_id,
"transaction_type": transaction_type.value,
"from_address": from_address,
"to_address": to_address,
"amount": float(amount),
"token_address": token_address,
"data": data or {},
"priority": priority.value,
"routing_strategy": (routing_strategy or self.routing_config["default_strategy"]).value,
"gas_limit": gas_limit,
"gas_price": gas_price,
"max_fee_per_gas": max_fee_per_gas,
"status": TransactionStatus.QUEUED.value,
"created_at": datetime.now(timezone.utc),
"deadline": datetime.now(timezone.utc) + timedelta(minutes=deadline_minutes),
"metadata": metadata or {},
"retry_count": 0,
"submit_attempts": 0,
"gas_used": None,
"gas_price_paid": None,
"transaction_hash": None,
"block_number": None,
"confirmations": 0,
"error_message": None,
"processing_time": None,
}
transaction = MultiChainTransaction(
id=transaction_id,
user_id=user_id,
chain_id=chain_id,
transaction_type=transaction_type,
from_address=from_address,
to_address=to_address,
amount=float(amount),
token_address=token_address,
data=data or {},
priority=priority,
routing_strategy=routing_strategy or self.routing_config["default_strategy"],
gas_limit=gas_limit,
gas_price=gas_price,
max_fee_per_gas=max_fee_per_gas,
status=TransactionStatus.QUEUED,
deadline=datetime.now(timezone.utc) + timedelta(minutes=deadline_minutes),
meta_data=metadata or {},
retry_count=0,
submit_attempts=0,
)
# Add to appropriate queue
await self._queue_transaction(transaction)
# Save to database
self.session.add(transaction)
self.session.commit()
self.session.refresh(transaction)
logger.info(f"Submitted transaction {transaction_id} for user {user_id}")
@@ -224,8 +174,8 @@ class MultiChainTransactionManager:
"status": TransactionStatus.QUEUED.value,
"priority": priority.value,
"estimated_processing_time": await self._estimate_processing_time(transaction),
"deadline": transaction["deadline"].isoformat(),
"submitted_at": transaction["created_at"].isoformat(),
"deadline": transaction.deadline.isoformat(),
"submitted_at": transaction.created_at.isoformat(),
}
except Exception as e:
@@ -236,16 +186,18 @@ class MultiChainTransactionManager:
"""Get detailed transaction status"""
try:
# Find transaction in queues
transaction = await self._find_transaction(transaction_id)
# Query from database
transaction = self.session.execute(
select(MultiChainTransaction).where(MultiChainTransaction.id == transaction_id)
).scalars().first()
if not transaction:
raise ValueError(f"Transaction {transaction_id} not found")
# Update status if it's on-chain
if transaction["transaction_hash"] and transaction["status"] in [
TransactionStatus.SUBMITTED.value,
TransactionStatus.CONFIRMED.value,
if transaction.transaction_hash and transaction.status in [
TransactionStatus.SUBMITTED,
TransactionStatus.CONFIRMED,
]:
await self._update_transaction_status(transaction_id)
@@ -254,28 +206,28 @@ class MultiChainTransactionManager:
return {
"transaction_id": transaction_id,
"user_id": transaction["user_id"],
"chain_id": transaction["chain_id"],
"transaction_type": transaction["transaction_type"],
"from_address": transaction["from_address"],
"to_address": transaction["to_address"],
"amount": transaction["amount"],
"token_address": transaction["token_address"],
"priority": transaction["priority"],
"status": transaction["status"],
"user_id": transaction.user_id,
"chain_id": transaction.chain_id,
"transaction_type": transaction.transaction_type,
"from_address": transaction.from_address,
"to_address": transaction.to_address,
"amount": transaction.amount,
"token_address": transaction.token_address,
"priority": transaction.priority,
"status": transaction.status,
"progress": progress,
"transaction_hash": transaction["transaction_hash"],
"block_number": transaction["block_number"],
"confirmations": transaction["confirmations"],
"gas_used": transaction["gas_used"],
"gas_price_paid": transaction["gas_price_paid"],
"retry_count": transaction["retry_count"],
"submit_attempts": transaction["submit_attempts"],
"error_message": transaction["error_message"],
"processing_time": transaction["processing_time"],
"created_at": transaction["created_at"].isoformat(),
"updated_at": transaction.get("updated_at", transaction["created_at"]).isoformat(),
"deadline": transaction["deadline"].isoformat(),
"transaction_hash": transaction.transaction_hash,
"block_number": transaction.block_number,
"confirmations": transaction.confirmations,
"gas_used": transaction.gas_used,
"gas_price_paid": transaction.gas_price_paid,
"retry_count": transaction.retry_count,
"submit_attempts": transaction.submit_attempts,
"error_message": transaction.error_message,
"processing_time": transaction.processing_time,
"created_at": transaction.created_at.isoformat(),
"updated_at": transaction.updated_at.isoformat(),
"deadline": transaction.deadline.isoformat(),
}
except Exception as e:
@@ -286,21 +238,25 @@ class MultiChainTransactionManager:
"""Cancel a transaction"""
try:
transaction = await self._find_transaction(transaction_id)
transaction = self.session.execute(
select(MultiChainTransaction).where(MultiChainTransaction.id == transaction_id)
).scalars().first()
if not transaction:
raise ValueError(f"Transaction {transaction_id} not found")
if transaction["status"] not in [TransactionStatus.QUEUED.value, TransactionStatus.PENDING.value]:
raise ValueError(f"Cannot cancel transaction in status: {transaction['status']}")
if transaction.status not in [TransactionStatus.QUEUED, TransactionStatus.PENDING]:
raise ValueError(f"Cannot cancel transaction in status: {transaction.status}")
# Update transaction status
transaction["status"] = TransactionStatus.CANCELLED.value
transaction["error_message"] = reason
transaction["updated_at"] = datetime.now(timezone.utc)
transaction.status = TransactionStatus.CANCELLED
transaction.error_message = reason
transaction.cancelled_at = datetime.now(timezone.utc)
transaction.cancellation_reason = reason
transaction.updated_at = datetime.now(timezone.utc)
# Remove from queues
await self._remove_from_queues(transaction_id)
self.session.commit()
self.session.refresh(transaction)
logger.info(f"Cancelled transaction {transaction_id}: {reason}")
@@ -308,18 +264,19 @@ class MultiChainTransactionManager:
"transaction_id": transaction_id,
"status": TransactionStatus.CANCELLED.value,
"reason": reason,
"cancelled_at": datetime.now(timezone.utc).isoformat(),
"cancelled_at": transaction.cancelled_at.isoformat(),
}
except Exception as e:
logger.error(f"Error cancelling transaction: {e}")
self.session.rollback()
raise
async def get_transaction_history(
self,
user_id: str | None = None,
chain_id: int | None = None,
transaction_type: TransactionType | None = None,
transaction_type: MultiChainTransactionType | None = None,
status: TransactionStatus | None = None,
priority: TransactionPriority | None = None,
limit: int = 100,
@@ -330,78 +287,57 @@ class MultiChainTransactionManager:
"""Get transaction history with filtering"""
try:
# Get all transactions from queues
all_transactions = []
for chain_transactions in self.transaction_queues.values():
all_transactions.extend(chain_transactions)
for priority_transactions in self.priority_queues.values():
all_transactions.extend(priority_transactions)
# Remove duplicates
seen_ids = set()
unique_transactions = []
for tx in all_transactions:
if tx["id"] not in seen_ids:
seen_ids.add(tx["id"])
unique_transactions.append(tx)
# Build query
stmt = select(MultiChainTransaction)
# Apply filters
filtered_transactions = unique_transactions
if user_id:
filtered_transactions = [tx for tx in filtered_transactions if tx["user_id"] == user_id]
stmt = stmt.where(MultiChainTransaction.user_id == user_id)
if chain_id:
filtered_transactions = [tx for tx in filtered_transactions if tx["chain_id"] == chain_id]
stmt = stmt.where(MultiChainTransaction.chain_id == chain_id)
if transaction_type:
filtered_transactions = [
tx for tx in filtered_transactions if tx["transaction_type"] == transaction_type.value
]
stmt = stmt.where(MultiChainTransaction.transaction_type == transaction_type)
if status:
filtered_transactions = [tx for tx in filtered_transactions if tx["status"] == status.value]
stmt = stmt.where(MultiChainTransaction.status == status)
if priority:
filtered_transactions = [tx for tx in filtered_transactions if tx["priority"] == priority.value]
stmt = stmt.where(MultiChainTransaction.priority == priority)
if from_date:
filtered_transactions = [tx for tx in filtered_transactions if tx["created_at"] >= from_date]
stmt = stmt.where(MultiChainTransaction.created_at >= from_date)
if to_date:
filtered_transactions = [tx for tx in filtered_transactions if tx["created_at"] <= to_date]
stmt = stmt.where(MultiChainTransaction.created_at <= to_date)
# Sort by creation time (descending)
filtered_transactions.sort(key=lambda x: x["created_at"], reverse=True)
stmt = stmt.order_by(MultiChainTransaction.created_at.desc())
# Apply pagination
paginated_transactions = filtered_transactions[offset : offset + limit]
stmt = stmt.offset(offset).limit(limit)
# Execute query
transactions = self.session.execute(stmt).scalars().all()
# Format response
response_transactions = []
for tx in paginated_transactions:
for tx in transactions:
response_transactions.append(
{
"transaction_id": tx["id"],
"user_id": tx["user_id"],
"chain_id": tx["chain_id"],
"transaction_type": tx["transaction_type"],
"from_address": tx["from_address"],
"to_address": tx["to_address"],
"amount": tx["amount"],
"token_address": tx["token_address"],
"priority": tx["priority"],
"status": tx["status"],
"transaction_hash": tx["transaction_hash"],
"gas_used": tx["gas_used"],
"gas_price_paid": tx["gas_price_paid"],
"retry_count": tx["retry_count"],
"error_message": tx["error_message"],
"processing_time": tx["processing_time"],
"created_at": tx["created_at"].isoformat(),
"updated_at": tx.get("updated_at", tx["created_at"]).isoformat(),
"transaction_id": tx.id,
"user_id": tx.user_id,
"chain_id": tx.chain_id,
"transaction_type": tx.transaction_type,
"from_address": tx.from_address,
"to_address": tx.to_address,
"amount": tx.amount,
"token_address": tx.token_address,
"priority": tx.priority,
"status": tx.status,
"transaction_hash": tx.transaction_hash,
"gas_used": tx.gas_used,
"gas_price_paid": tx.gas_price_paid,
"retry_count": tx.retry_count,
"error_message": tx.error_message,
"processing_time": tx.processing_time,
"created_at": tx.created_at.isoformat(),
"updated_at": tx.updated_at.isoformat(),
}
)
@@ -417,54 +353,47 @@ class MultiChainTransactionManager:
try:
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=time_period_hours)
# Get all transactions
all_transactions = []
for chain_transactions in self.transaction_queues.values():
all_transactions.extend(chain_transactions)
# Query from database
stmt = select(MultiChainTransaction).where(MultiChainTransaction.created_at >= cutoff_time)
if chain_id:
stmt = stmt.where(MultiChainTransaction.chain_id == chain_id)
# Filter by time period and chain
filtered_transactions = [
tx
for tx in all_transactions
if tx["created_at"] >= cutoff_time and (chain_id is None or tx["chain_id"] == chain_id)
]
transactions = self.session.execute(stmt).scalars().all()
# Calculate statistics
total_transactions = len(filtered_transactions)
total_transactions = len(transactions)
successful_transactions = len(
[tx for tx in filtered_transactions if tx["status"] == TransactionStatus.COMPLETED.value]
[tx for tx in transactions if tx.status == TransactionStatus.COMPLETED]
)
failed_transactions = len([tx for tx in filtered_transactions if tx["status"] == TransactionStatus.FAILED.value])
failed_transactions = len([tx for tx in transactions if tx.status == TransactionStatus.FAILED])
success_rate = successful_transactions / max(total_transactions, 1)
# Calculate average processing time
completed_transactions = [
tx
for tx in filtered_transactions
if tx["status"] == TransactionStatus.COMPLETED.value and tx["processing_time"]
tx for tx in transactions if tx.status == TransactionStatus.COMPLETED and tx.processing_time
]
avg_processing_time = 0.0
if completed_transactions:
avg_processing_time = sum(tx["processing_time"] for tx in completed_transactions) / len(completed_transactions)
avg_processing_time = sum(tx.processing_time for tx in completed_transactions) / len(completed_transactions)
# Calculate gas statistics
gas_stats = {}
for tx in filtered_transactions:
if tx["gas_used"] and tx["gas_price_paid"]:
chain_id = tx["chain_id"]
if chain_id not in gas_stats:
gas_stats[chain_id] = {"total_gas_used": 0, "total_gas_cost": 0.0, "transaction_count": 0}
for tx in transactions:
if tx.gas_used and tx.gas_price_paid:
tx_chain_id = tx.chain_id
if tx_chain_id not in gas_stats:
gas_stats[tx_chain_id] = {"total_gas_used": 0, "total_gas_cost": 0.0, "transaction_count": 0}
gas_stats[chain_id]["total_gas_used"] += tx["gas_used"]
gas_stats[chain_id]["total_gas_cost"] += (tx["gas_used"] * tx["gas_price_paid"]) / 10**18
gas_stats[chain_id]["transaction_count"] += 1
gas_stats[tx_chain_id]["total_gas_used"] += tx.gas_used
gas_stats[tx_chain_id]["total_gas_cost"] += (tx.gas_used * tx.gas_price_paid) / 10**18
gas_stats[tx_chain_id]["transaction_count"] += 1
# Priority distribution
priority_distribution = defaultdict(int)
for tx in filtered_transactions:
priority_distribution[tx["priority"]] += 1
for tx in transactions:
priority_distribution[tx.priority] += 1
return {
"time_period_hours": time_period_hours,
@@ -485,7 +414,7 @@ class MultiChainTransactionManager:
async def optimize_transaction_routing(
self,
transaction_type: TransactionType,
transaction_type: MultiChainTransactionType,
amount: float,
from_chain: int,
to_chain: int | None = None,
@@ -513,7 +442,7 @@ class MultiChainTransactionManager:
"estimated_gas_price": chain_metrics.get("average_gas_price", 0),
"estimated_confirmation_time": chain_metrics.get("average_confirmation_time", 0),
"success_rate": chain_metrics.get("success_rate", 0),
"queue_length": len(self.transaction_queues[chain_id]),
"queue_length": 0, # Database persistence - no in-memory queue
}
)
@@ -539,283 +468,67 @@ class MultiChainTransactionManager:
logger.error(f"Error optimizing transaction routing: {e}")
raise
# Private methods
async def _queue_transaction(self, transaction: dict[str, Any]) -> None:
"""Add transaction to appropriate queue"""
try:
# Add to chain-specific queue
self.transaction_queues[transaction["chain_id"]].append(transaction)
# Add to priority queue
priority = TransactionPriority(transaction["priority"])
self.priority_queues[priority].append(transaction)
# Sort priority queue by priority and creation time
self.priority_queues[priority].sort(key=lambda x: (x["priority"], x["created_at"]), reverse=True)
# Update metrics
self.metrics["total_transactions"] += 1
except Exception as e:
logger.error(f"Error queuing transaction: {e}")
raise
async def _find_transaction(self, transaction_id: str) -> dict[str, Any] | None:
"""Find transaction in queues"""
for chain_transactions in self.transaction_queues.values():
for tx in chain_transactions:
if tx["id"] == transaction_id:
return tx
for priority_transactions in self.priority_queues.values():
for tx in priority_transactions:
if tx["id"] == transaction_id:
return tx
return None
async def _remove_from_queues(self, transaction_id: str) -> None:
"""Remove transaction from all queues"""
for chain_id in self.transaction_queues:
self.transaction_queues[chain_id] = [tx for tx in self.transaction_queues[chain_id] if tx["id"] != transaction_id]
for priority in self.priority_queues:
self.priority_queues[priority] = [tx for tx in self.priority_queues[priority] if tx["id"] != transaction_id]
async def _start_background_processing(self) -> None:
"""Start background processing tasks"""
try:
# Start transaction processing task
for chain_id in self.wallet_adapters.keys():
task = asyncio.create_task(self._process_chain_transactions(chain_id))
self._processing_tasks.append(task)
# Start monitoring task
self._monitoring_task = asyncio.create_task(self._monitor_transactions())
logger.info("Started background transaction processing")
except Exception as e:
logger.error(f"Error starting background processing: {e}")
async def _process_chain_transactions(self, chain_id: int) -> None:
"""Process transactions for a specific chain"""
while True:
try:
# Get next transaction from queue
transaction = await self._get_next_transaction(chain_id)
if not transaction:
await asyncio.sleep(1) # Wait for new transactions
continue
# Process transaction
await self._process_single_transaction(transaction)
# Small delay between transactions
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error processing transactions for chain {chain_id}: {e}")
await asyncio.sleep(5)
async def _get_next_transaction(self, chain_id: int) -> dict[str, Any] | None:
"""Get next transaction to process for chain"""
try:
# Check queue length limit
if len(self.transaction_queues[chain_id]) >= self.routing_config["max_pending_per_chain"]:
return None
# Get highest priority transaction
for priority in [
TransactionPriority.CRITICAL,
TransactionPriority.URGENT,
TransactionPriority.HIGH,
TransactionPriority.MEDIUM,
TransactionPriority.LOW,
]:
if priority.value in self.priority_queues:
for tx in self.priority_queues[priority.value]:
if tx["chain_id"] == chain_id and tx["status"] == TransactionStatus.QUEUED.value:
return tx
return None
except Exception as e:
logger.error(f"Error getting next transaction: {e}")
return None
async def _process_single_transaction(self, transaction: dict[str, Any]) -> None:
"""Process a single transaction"""
try:
start_time = datetime.now(timezone.utc)
# Update status to processing
transaction["status"] = TransactionStatus.PROCESSING.value
transaction["updated_at"] = start_time
# Get wallet adapter
adapter = self.wallet_adapters[transaction["chain_id"]]
# Execute transaction
tx_result = await adapter.execute_transaction(
from_address=transaction["from_address"],
to_address=transaction["to_address"],
amount=transaction["amount"],
token_address=transaction["token_address"],
data=transaction["data"],
gas_limit=transaction["gas_limit"],
gas_price=transaction["gas_price"],
)
# Update transaction with result
transaction["transaction_hash"] = tx_result["transaction_hash"]
transaction["status"] = TransactionStatus.SUBMITTED.value
transaction["submit_attempts"] += 1
transaction["updated_at"] = datetime.now(timezone.utc)
# Wait for confirmations
await self._wait_for_confirmations(transaction)
# Update final status
transaction["status"] = TransactionStatus.COMPLETED.value
transaction["processing_time"] = (datetime.now(timezone.utc) - start_time).total_seconds()
transaction["updated_at"] = datetime.now(timezone.utc)
# Update metrics
self.metrics["successful_transactions"] += 1
chain_metrics = self.metrics["chain_performance"][transaction["chain_id"]]
chain_metrics["total_transactions"] += 1
chain_metrics["success_rate"] = chain_metrics["success_rate"] * 0.9 + 0.1 # Moving average
logger.info(f"Completed transaction {transaction['id']}")
except Exception as e:
logger.error(f"Error processing transaction {transaction['id']}: {e}")
# Handle failure
await self._handle_transaction_failure(transaction, str(e))
async def _wait_for_confirmations(self, transaction: dict[str, Any]) -> None:
"""Wait for transaction confirmations"""
try:
adapter = self.wallet_adapters[transaction["chain_id"]]
required_confirmations = self.routing_config["confirmation_threshold"]
while transaction["confirmations"] < required_confirmations:
# Get transaction status
tx_status = await adapter.get_transaction_status(transaction["transaction_hash"])
if tx_status.get("block_number"):
current_block = 12345 # Mock current block
tx_block = int(tx_status["block_number"], 16)
transaction["confirmations"] = current_block - tx_block
transaction["block_number"] = tx_status["block_number"]
await asyncio.sleep(10) # Check every 10 seconds
except Exception as e:
logger.error(f"Error waiting for confirmations: {e}")
raise
async def _handle_transaction_failure(self, transaction: dict[str, Any], error_message: str) -> None:
"""Handle transaction failure"""
try:
transaction["retry_count"] += 1
transaction["error_message"] = error_message
transaction["updated_at"] = datetime.now(timezone.utc)
# Check if should retry
if transaction["retry_count"] < self.routing_config["max_retries"]:
transaction["status"] = TransactionStatus.RETRYING.value
# Wait before retry
await asyncio.sleep(self.routing_config["retry_delay"])
# Reset status to queued for retry
transaction["status"] = TransactionStatus.QUEUED.value
else:
transaction["status"] = TransactionStatus.FAILED.value
self.metrics["failed_transactions"] += 1
# Update chain metrics
chain_metrics = self.metrics["chain_performance"][transaction["chain_id"]]
chain_metrics["success_rate"] = chain_metrics["success_rate"] * 0.9 # Moving average
except Exception as e:
logger.error(f"Error handling transaction failure: {e}")
async def _monitor_transactions(self) -> None:
"""Monitor transaction processing and performance"""
while True:
try:
# Clean up old transactions
await self._cleanup_old_transactions()
# Update performance metrics
await self._update_performance_metrics()
# Check for stuck transactions
await self._check_stuck_transactions()
# Sleep before next monitoring cycle
await asyncio.sleep(60) # Monitor every minute
except Exception as e:
logger.error(f"Error in transaction monitoring: {e}")
await asyncio.sleep(60)
async def _cleanup_old_transactions(self) -> None:
"""Clean up old completed/failed transactions"""
try:
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=24)
for chain_id in self.transaction_queues:
original_length = len(self.transaction_queues[chain_id])
self.transaction_queues[chain_id] = [
tx
for tx in self.transaction_queues[chain_id]
if tx["created_at"] > cutoff_time
or tx["status"]
in [TransactionStatus.QUEUED.value, TransactionStatus.PENDING.value, TransactionStatus.PROCESSING.value]
]
cleaned_up = original_length - len(self.transaction_queues[chain_id])
if cleaned_up > 0:
logger.info(f"Cleaned up {cleaned_up} old transactions for chain {chain_id}")
except Exception as e:
logger.error(f"Error cleaning up old transactions: {e}")
async def _update_performance_metrics(self) -> None:
"""Update performance metrics"""
try:
for chain_id, adapter in self.wallet_adapters.items():
# Get current gas price
gas_price = await adapter._get_gas_price()
# Update chain metrics
chain_metrics = self.metrics["chain_performance"][chain_id]
chain_metrics["average_gas_price"] = (
chain_metrics["average_gas_price"] * 0.9 + gas_price * 0.1 # Moving average
)
chain_metrics["last_updated"] = datetime.now(timezone.utc)
except Exception as e:
logger.error(f"Error updating performance metrics: {e}")
# Private methods (background processing disabled - using database persistence)
async def _calculate_transaction_progress(self, transaction: MultiChainTransaction) -> float:
"""Calculate transaction progress percentage"""
# Simplified progress calculation based on status
status_progress = {
TransactionStatus.QUEUED: 0.0,
TransactionStatus.PENDING: 10.0,
TransactionStatus.PROCESSING: 30.0,
TransactionStatus.SUBMITTED: 50.0,
TransactionStatus.CONFIRMED: 80.0,
TransactionStatus.COMPLETED: 100.0,
TransactionStatus.FAILED: 100.0,
TransactionStatus.CANCELLED: 100.0,
TransactionStatus.EXPIRED: 100.0,
}
return status_progress.get(transaction.status, 0.0)
async def _update_transaction_status(self, transaction_id: str) -> None:
"""Update transaction status from blockchain"""
# Placeholder - would query blockchain for actual status
pass
async def _estimate_processing_time(self, transaction: MultiChainTransaction) -> float:
"""Estimate transaction processing time in seconds"""
# Simplified estimate based on priority
priority_time = {
TransactionPriority.CRITICAL: 30.0,
TransactionPriority.URGENT: 60.0,
TransactionPriority.HIGH: 120.0,
TransactionPriority.MEDIUM: 300.0,
TransactionPriority.LOW: 600.0,
}
return priority_time.get(transaction.priority, 300.0)
def _get_min_reputation_for_transaction(
self, transaction_type: MultiChainTransactionType, priority: TransactionPriority
) -> float:
"""Get minimum reputation score for transaction type and priority"""
# Simplified reputation requirements
base_reputation = 50.0
priority_multiplier = {
TransactionPriority.CRITICAL: 1.5,
TransactionPriority.URGENT: 1.3,
TransactionPriority.HIGH: 1.1,
TransactionPriority.MEDIUM: 1.0,
TransactionPriority.LOW: 0.8,
}
return base_reputation * priority_multiplier.get(priority, 1.0)
async def _calculate_routing_score(
self,
chain_id: int,
transaction_type: MultiChainTransactionType,
amount: float,
urgency: TransactionPriority,
chain_metrics: dict[str, Any],
) -> float:
"""Calculate routing score for a chain"""
# Simplified routing score calculation
base_score = chain_metrics.get("success_rate", 0.5) * 100
return base_score
async def _check_stuck_transactions(self) -> None:
"""Check for stuck transactions"""
@@ -824,17 +537,15 @@ class MultiChainTransactionManager:
current_time = datetime.now(timezone.utc)
stuck_threshold = timedelta(minutes=30)
for chain_id in self.transaction_queues:
for tx in self.transaction_queues[chain_id]:
if (
tx["status"] == TransactionStatus.PROCESSING.value
and current_time - tx["updated_at"] > stuck_threshold
):
# Query stuck transactions from database
stmt = select(MultiChainTransaction).where(
MultiChainTransaction.status.in_([TransactionStatus.PROCESSING, TransactionStatus.SUBMITTED])
)
transactions = self.session.execute(stmt).scalars().all()
logger.warning(f"Found stuck transaction {tx['id']} on chain {chain_id}")
# Mark as failed and retry
await self._handle_transaction_failure(tx, "Transaction stuck in processing")
for tx in transactions:
if current_time - tx.updated_at > stuck_threshold:
logger.warning(f"Stuck transaction detected: {tx.id}")
except Exception as e:
logger.error(f"Error checking stuck transactions: {e}")
@@ -931,17 +642,17 @@ class MultiChainTransactionManager:
except Exception:
return 0.0
def _get_min_reputation_for_transaction(self, transaction_type: TransactionType, priority: TransactionPriority) -> int:
def _get_min_reputation_for_transaction(self, transaction_type: MultiChainTransactionType, priority: TransactionPriority) -> float:
"""Get minimum reputation required for transaction"""
base_requirements = {
TransactionType.TRANSFER: 100,
TransactionType.SWAP: 200,
TransactionType.BRIDGE: 300,
TransactionType.DEPOSIT: 100,
TransactionType.WITHDRAWAL: 150,
TransactionType.CONTRACT_CALL: 250,
TransactionType.APPROVAL: 100,
MultiChainTransactionType.TRANSFER: 100,
MultiChainTransactionType.SWAP: 200,
MultiChainTransactionType.BRIDGE: 300,
MultiChainTransactionType.DEPOSIT: 100,
MultiChainTransactionType.WITHDRAWAL: 150,
MultiChainTransactionType.CONTRACT_CALL: 250,
MultiChainTransactionType.APPROVAL: 100,
}
priority_multipliers = {
@@ -960,7 +671,7 @@ class MultiChainTransactionManager:
async def _calculate_routing_score(
self,
chain_id: int,
transaction_type: TransactionType,
transaction_type: MultiChainTransactionType,
amount: float,
urgency: TransactionPriority,
chain_metrics: dict[str, Any],
@@ -977,9 +688,8 @@ class MultiChainTransactionManager:
# Success rate factor (higher is better)
success_rate_factor = chain_metrics.get("success_rate", 0.5)
# Queue length factor (lower is better)
queue_length = len(self.transaction_queues[chain_id])
queue_factor = 1.0 / max(queue_length, 1)
# Queue length factor (lower is better) - database persistence, no queue
queue_factor = 1.0 # No in-memory queue with database persistence
# Urgency factor
urgency_multiplier = {