feat(coordinator-api): integrate dynamic pricing engine with GPU marketplace and add agent identity router

- Add DynamicPricingEngine and MarketDataCollector dependencies to GPU marketplace endpoints
- Implement dynamic pricing calculation for GPU registration with market_balance strategy
- Calculate real-time dynamic prices at booking time with confidence scores and pricing factors
- Enhance /marketplace/pricing/{model} endpoint with comprehensive dynamic pricing analysis
  - Add static vs dynamic price
This commit is contained in:
oib
2026-02-28 22:57:10 +01:00
parent 85ae21a568
commit 0e6c9eda72
83 changed files with 30189 additions and 134 deletions

View File

@@ -0,0 +1,366 @@
"""
Agent Identity Domain Models for Cross-Chain Agent Identity Management
Implements SQLModel definitions for unified agent identity across multiple blockchains
"""
from datetime import datetime
from typing import Optional, Dict, List, Any
from uuid import uuid4
from enum import Enum
from sqlmodel import SQLModel, Field, Column, JSON
from sqlalchemy import DateTime, Index
class IdentityStatus(str, Enum):
"""Agent identity status enumeration"""
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
REVOKED = "revoked"
class VerificationType(str, Enum):
"""Identity verification type enumeration"""
BASIC = "basic"
ADVANCED = "advanced"
ZERO_KNOWLEDGE = "zero-knowledge"
MULTI_SIGNATURE = "multi-signature"
class ChainType(str, Enum):
"""Blockchain chain type enumeration"""
ETHEREUM = "ethereum"
POLYGON = "polygon"
BSC = "bsc"
ARBITRUM = "arbitrum"
OPTIMISM = "optimism"
AVALANCHE = "avalanche"
SOLANA = "solana"
CUSTOM = "custom"
class AgentIdentity(SQLModel, table=True):
"""Unified agent identity across blockchains"""
__tablename__ = "agent_identities"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"identity_{uuid4().hex[:8]}", primary_key=True)
agent_id: str = Field(index=True, unique=True) # Links to AIAgentWorkflow.id
owner_address: str = Field(index=True)
# Identity metadata
display_name: str = Field(max_length=100, default="")
description: str = Field(default="")
avatar_url: str = Field(default="")
# Status and verification
status: IdentityStatus = Field(default=IdentityStatus.ACTIVE)
verification_level: VerificationType = Field(default=VerificationType.BASIC)
is_verified: bool = Field(default=False)
verified_at: Optional[datetime] = Field(default=None)
# Cross-chain capabilities
supported_chains: List[str] = Field(default_factory=list, sa_column=Column(JSON))
primary_chain: int = Field(default=1) # Default to Ethereum mainnet
# Reputation and trust
reputation_score: float = Field(default=0.0)
total_transactions: int = Field(default=0)
successful_transactions: int = Field(default=0)
last_activity: Optional[datetime] = Field(default=None)
# Metadata and settings
identity_data: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
settings_data: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
tags: List[str] = Field(default_factory=list, sa_column=Column(JSON))
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Indexes for performance
__table_args__ = (
Index('idx_agent_identity_owner', 'owner_address'),
Index('idx_agent_identity_status', 'status'),
Index('idx_agent_identity_verified', 'is_verified'),
Index('idx_agent_identity_reputation', 'reputation_score'),
)
class CrossChainMapping(SQLModel, table=True):
"""Mapping of agent identity across different blockchains"""
__tablename__ = "cross_chain_mappings"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"mapping_{uuid4().hex[:8]}", primary_key=True)
agent_id: str = Field(index=True)
chain_id: int = Field(index=True)
chain_type: ChainType = Field(default=ChainType.ETHEREUM)
chain_address: str = Field(index=True)
# Verification and status
is_verified: bool = Field(default=False)
verified_at: Optional[datetime] = Field(default=None)
verification_proof: Optional[Dict[str, Any]] = Field(default=None, sa_column=Column(JSON))
# Wallet information
wallet_address: Optional[str] = Field(default=None)
wallet_type: str = Field(default="agent-wallet") # agent-wallet, external-wallet, etc.
# Chain-specific metadata
chain_metadata: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
nonce: Optional[int] = Field(default=None)
# Activity tracking
last_transaction: Optional[datetime] = Field(default=None)
transaction_count: int = Field(default=0)
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Unique constraint
__table_args__ = (
Index('idx_cross_chain_agent_chain', 'agent_id', 'chain_id'),
Index('idx_cross_chain_address', 'chain_address'),
Index('idx_cross_chain_verified', 'is_verified'),
)
class IdentityVerification(SQLModel, table=True):
"""Verification records for cross-chain identities"""
__tablename__ = "identity_verifications"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"verify_{uuid4().hex[:8]}", primary_key=True)
agent_id: str = Field(index=True)
chain_id: int = Field(index=True)
# Verification details
verification_type: VerificationType
verifier_address: str = Field(index=True) # Who performed the verification
proof_hash: str = Field(index=True)
proof_data: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
# Status and results
is_valid: bool = Field(default=True)
verification_result: str = Field(default="pending") # pending, approved, rejected
rejection_reason: Optional[str] = Field(default=None)
# Expiration and renewal
expires_at: Optional[datetime] = Field(default=None)
renewed_at: Optional[datetime] = Field(default=None)
# Metadata
verification_metadata: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Indexes
__table_args__ = (
Index('idx_identity_verify_agent_chain', 'agent_id', 'chain_id'),
Index('idx_identity_verify_verifier', 'verifier_address'),
Index('idx_identity_verify_hash', 'proof_hash'),
Index('idx_identity_verify_result', 'verification_result'),
)
class AgentWallet(SQLModel, table=True):
"""Agent wallet information for cross-chain operations"""
__tablename__ = "agent_wallets"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"wallet_{uuid4().hex[:8]}", primary_key=True)
agent_id: str = Field(index=True)
chain_id: int = Field(index=True)
chain_address: str = Field(index=True)
# Wallet details
wallet_type: str = Field(default="agent-wallet")
contract_address: Optional[str] = Field(default=None)
# Financial information
balance: float = Field(default=0.0)
spending_limit: float = Field(default=0.0)
total_spent: float = Field(default=0.0)
# Status and permissions
is_active: bool = Field(default=True)
permissions: List[str] = Field(default_factory=list, sa_column=Column(JSON))
# Security
requires_multisig: bool = Field(default=False)
multisig_threshold: int = Field(default=1)
multisig_signers: List[str] = Field(default_factory=list, sa_column=Column(JSON))
# Activity tracking
last_transaction: Optional[datetime] = Field(default=None)
transaction_count: int = Field(default=0)
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Indexes
__table_args__ = (
Index('idx_agent_wallet_agent_chain', 'agent_id', 'chain_id'),
Index('idx_agent_wallet_address', 'chain_address'),
Index('idx_agent_wallet_active', 'is_active'),
)
# Request/Response Models for API
class AgentIdentityCreate(SQLModel):
"""Request model for creating agent identities"""
agent_id: str
owner_address: str
display_name: str = Field(max_length=100, default="")
description: str = Field(default="")
avatar_url: str = Field(default="")
supported_chains: List[int] = Field(default_factory=list)
primary_chain: int = Field(default=1)
metadata: Dict[str, Any] = Field(default_factory=dict)
tags: List[str] = Field(default_factory=list)
class AgentIdentityUpdate(SQLModel):
"""Request model for updating agent identities"""
display_name: Optional[str] = Field(default=None, max_length=100)
description: Optional[str] = Field(default=None)
avatar_url: Optional[str] = Field(default=None)
status: Optional[IdentityStatus] = Field(default=None)
verification_level: Optional[VerificationType] = Field(default=None)
supported_chains: Optional[List[int]] = Field(default=None)
primary_chain: Optional[int] = Field(default=None)
metadata: Optional[Dict[str, Any]] = Field(default=None)
settings: Optional[Dict[str, Any]] = Field(default=None)
tags: Optional[List[str]] = Field(default=None)
class CrossChainMappingCreate(SQLModel):
"""Request model for creating cross-chain mappings"""
agent_id: str
chain_id: int
chain_type: ChainType = Field(default=ChainType.ETHEREUM)
chain_address: str
wallet_address: Optional[str] = Field(default=None)
wallet_type: str = Field(default="agent-wallet")
chain_metadata: Dict[str, Any] = Field(default_factory=dict)
class CrossChainMappingUpdate(SQLModel):
"""Request model for updating cross-chain mappings"""
chain_address: Optional[str] = Field(default=None)
wallet_address: Optional[str] = Field(default=None)
wallet_type: Optional[str] = Field(default=None)
chain_metadata: Optional[Dict[str, Any]] = Field(default=None)
is_verified: Optional[bool] = Field(default=None)
class IdentityVerificationCreate(SQLModel):
"""Request model for creating identity verifications"""
agent_id: str
chain_id: int
verification_type: VerificationType
verifier_address: str
proof_hash: str
proof_data: Dict[str, Any] = Field(default_factory=dict)
expires_at: Optional[datetime] = Field(default=None)
verification_metadata: Dict[str, Any] = Field(default_factory=dict)
class AgentWalletCreate(SQLModel):
"""Request model for creating agent wallets"""
agent_id: str
chain_id: int
chain_address: str
wallet_type: str = Field(default="agent-wallet")
contract_address: Optional[str] = Field(default=None)
spending_limit: float = Field(default=0.0)
permissions: List[str] = Field(default_factory=list)
requires_multisig: bool = Field(default=False)
multisig_threshold: int = Field(default=1)
multisig_signers: List[str] = Field(default_factory=list)
class AgentWalletUpdate(SQLModel):
"""Request model for updating agent wallets"""
contract_address: Optional[str] = Field(default=None)
spending_limit: Optional[float] = Field(default=None)
permissions: Optional[List[str]] = Field(default=None)
is_active: Optional[bool] = Field(default=None)
requires_multisig: Optional[bool] = Field(default=None)
multisig_threshold: Optional[int] = Field(default=None)
multisig_signers: Optional[List[str]] = Field(default=None)
# Response Models
class AgentIdentityResponse(SQLModel):
"""Response model for agent identity"""
id: str
agent_id: str
owner_address: str
display_name: str
description: str
avatar_url: str
status: IdentityStatus
verification_level: VerificationType
is_verified: bool
verified_at: Optional[datetime]
supported_chains: List[str]
primary_chain: int
reputation_score: float
total_transactions: int
successful_transactions: int
last_activity: Optional[datetime]
metadata: Dict[str, Any]
tags: List[str]
created_at: datetime
updated_at: datetime
class CrossChainMappingResponse(SQLModel):
"""Response model for cross-chain mapping"""
id: str
agent_id: str
chain_id: int
chain_type: ChainType
chain_address: str
is_verified: bool
verified_at: Optional[datetime]
wallet_address: Optional[str]
wallet_type: str
chain_metadata: Dict[str, Any]
last_transaction: Optional[datetime]
transaction_count: int
created_at: datetime
updated_at: datetime
class AgentWalletResponse(SQLModel):
"""Response model for agent wallet"""
id: str
agent_id: str
chain_id: int
chain_address: str
wallet_type: str
contract_address: Optional[str]
balance: float
spending_limit: float
total_spent: float
is_active: bool
permissions: List[str]
requires_multisig: bool
multisig_threshold: int
multisig_signers: List[str]
last_transaction: Optional[datetime]
transaction_count: int
created_at: datetime
updated_at: datetime

View File

@@ -0,0 +1,271 @@
"""
Agent Portfolio Domain Models
Domain models for agent portfolio management, trading strategies, and risk assessment.
"""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional
from uuid import uuid4
from sqlalchemy import Column, JSON
from sqlmodel import Field, SQLModel, Relationship
class StrategyType(str, Enum):
CONSERVATIVE = "conservative"
BALANCED = "balanced"
AGGRESSIVE = "aggressive"
DYNAMIC = "dynamic"
class TradeStatus(str, Enum):
PENDING = "pending"
EXECUTED = "executed"
FAILED = "failed"
CANCELLED = "cancelled"
class RiskLevel(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class PortfolioStrategy(SQLModel, table=True):
"""Trading strategy configuration for agent portfolios"""
__tablename__ = "portfolio_strategy"
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
strategy_type: StrategyType = Field(index=True)
target_allocations: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
max_drawdown: float = Field(default=20.0) # Maximum drawdown percentage
rebalance_frequency: int = Field(default=86400) # Rebalancing frequency in seconds
volatility_threshold: float = Field(default=15.0) # Volatility threshold for rebalancing
is_active: bool = Field(default=True, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
portfolios: List["AgentPortfolio"] = Relationship(back_populates="strategy")
class AgentPortfolio(SQLModel, table=True):
"""Portfolio managed by an autonomous agent"""
__tablename__ = "agent_portfolio"
id: Optional[int] = Field(default=None, primary_key=True)
agent_address: str = Field(index=True)
strategy_id: int = Field(foreign_key="portfolio_strategy.id", index=True)
contract_portfolio_id: Optional[str] = Field(default=None, index=True)
initial_capital: float = Field(default=0.0)
total_value: float = Field(default=0.0)
risk_score: float = Field(default=0.0) # Risk score (0-100)
risk_tolerance: float = Field(default=50.0) # Risk tolerance percentage
is_active: bool = Field(default=True, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
updated_at: datetime = Field(default_factory=datetime.utcnow)
last_rebalance: datetime = Field(default_factory=datetime.utcnow)
# Relationships
strategy: PortfolioStrategy = Relationship(back_populates="portfolios")
assets: List["PortfolioAsset"] = Relationship(back_populates="portfolio")
trades: List["PortfolioTrade"] = Relationship(back_populates="portfolio")
risk_metrics: Optional["RiskMetrics"] = Relationship(back_populates="portfolio")
class PortfolioAsset(SQLModel, table=True):
"""Asset holdings within a portfolio"""
__tablename__ = "portfolio_asset"
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int = Field(foreign_key="agent_portfolio.id", index=True)
token_symbol: str = Field(index=True)
token_address: str = Field(index=True)
balance: float = Field(default=0.0)
target_allocation: float = Field(default=0.0) # Target allocation percentage
current_allocation: float = Field(default=0.0) # Current allocation percentage
average_cost: float = Field(default=0.0) # Average cost basis
unrealized_pnl: float = Field(default=0.0) # Unrealized profit/loss
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
portfolio: AgentPortfolio = Relationship(back_populates="assets")
class PortfolioTrade(SQLModel, table=True):
"""Trade executed within a portfolio"""
__tablename__ = "portfolio_trade"
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int = Field(foreign_key="agent_portfolio.id", index=True)
sell_token: str = Field(index=True)
buy_token: str = Field(index=True)
sell_amount: float = Field(default=0.0)
buy_amount: float = Field(default=0.0)
price: float = Field(default=0.0)
fee_amount: float = Field(default=0.0)
status: TradeStatus = Field(default=TradeStatus.PENDING, index=True)
transaction_hash: Optional[str] = Field(default=None, index=True)
executed_at: Optional[datetime] = Field(default=None, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
# Relationships
portfolio: AgentPortfolio = Relationship(back_populates="trades")
class RiskMetrics(SQLModel, table=True):
"""Risk assessment metrics for a portfolio"""
__tablename__ = "risk_metrics"
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int = Field(foreign_key="agent_portfolio.id", index=True)
volatility: float = Field(default=0.0) # Portfolio volatility
max_drawdown: float = Field(default=0.0) # Maximum drawdown
sharpe_ratio: float = Field(default=0.0) # Sharpe ratio
beta: float = Field(default=0.0) # Beta coefficient
alpha: float = Field(default=0.0) # Alpha coefficient
var_95: float = Field(default=0.0) # Value at Risk at 95% confidence
var_99: float = Field(default=0.0) # Value at Risk at 99% confidence
correlation_matrix: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
risk_level: RiskLevel = Field(default=RiskLevel.LOW, index=True)
overall_risk_score: float = Field(default=0.0) # Overall risk score (0-100)
stress_test_results: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
portfolio: AgentPortfolio = Relationship(back_populates="risk_metrics")
class RebalanceHistory(SQLModel, table=True):
"""History of portfolio rebalancing events"""
__tablename__ = "rebalance_history"
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int = Field(foreign_key="agent_portfolio.id", index=True)
trigger_reason: str = Field(index=True) # Reason for rebalancing
pre_rebalance_value: float = Field(default=0.0)
post_rebalance_value: float = Field(default=0.0)
trades_executed: int = Field(default=0)
rebalance_cost: float = Field(default=0.0) # Cost of rebalancing
execution_time_ms: int = Field(default=0) # Execution time in milliseconds
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
class PerformanceMetrics(SQLModel, table=True):
"""Performance metrics for portfolios"""
__tablename__ = "performance_metrics"
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int = Field(foreign_key="agent_portfolio.id", index=True)
period: str = Field(index=True) # Performance period (1d, 7d, 30d, etc.)
total_return: float = Field(default=0.0) # Total return percentage
annualized_return: float = Field(default=0.0) # Annualized return
volatility: float = Field(default=0.0) # Period volatility
max_drawdown: float = Field(default=0.0) # Maximum drawdown in period
win_rate: float = Field(default=0.0) # Win rate percentage
profit_factor: float = Field(default=0.0) # Profit factor
sharpe_ratio: float = Field(default=0.0) # Sharpe ratio
sortino_ratio: float = Field(default=0.0) # Sortino ratio
calmar_ratio: float = Field(default=0.0) # Calmar ratio
benchmark_return: float = Field(default=0.0) # Benchmark return
alpha: float = Field(default=0.0) # Alpha vs benchmark
beta: float = Field(default=0.0) # Beta vs benchmark
tracking_error: float = Field(default=0.0) # Tracking error
information_ratio: float = Field(default=0.0) # Information ratio
updated_at: datetime = Field(default_factory=datetime.utcnow)
period_start: datetime = Field(default_factory=datetime.utcnow)
period_end: datetime = Field(default_factory=datetime.utcnow)
class PortfolioAlert(SQLModel, table=True):
"""Alerts for portfolio events"""
__tablename__ = "portfolio_alert"
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int = Field(foreign_key="agent_portfolio.id", index=True)
alert_type: str = Field(index=True) # Type of alert
severity: str = Field(index=True) # Severity level
message: str = Field(default="")
metadata: Dict[str, str] = Field(default_factory=dict, sa_column=Column(JSON))
is_acknowledged: bool = Field(default=False, index=True)
acknowledged_at: Optional[datetime] = Field(default=None)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
resolved_at: Optional[datetime] = Field(default=None)
class StrategySignal(SQLModel, table=True):
"""Trading signals generated by strategies"""
__tablename__ = "strategy_signal"
id: Optional[int] = Field(default=None, primary_key=True)
strategy_id: int = Field(foreign_key="portfolio_strategy.id", index=True)
signal_type: str = Field(index=True) # BUY, SELL, HOLD
token_symbol: str = Field(index=True)
confidence: float = Field(default=0.0) # Confidence level (0-1)
price_target: float = Field(default=0.0) # Target price
stop_loss: float = Field(default=0.0) # Stop loss price
time_horizon: str = Field(default="1d") # Time horizon
reasoning: str = Field(default="") # Signal reasoning
metadata: Dict[str, str] = Field(default_factory=dict, sa_column=Column(JSON))
is_executed: bool = Field(default=False, index=True)
executed_at: Optional[datetime] = Field(default=None)
expires_at: datetime = Field(default_factory=lambda: datetime.utcnow() + timedelta(hours=24))
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
class PortfolioSnapshot(SQLModel, table=True):
"""Daily snapshot of portfolio state"""
__tablename__ = "portfolio_snapshot"
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int = Field(foreign_key="agent_portfolio.id", index=True)
snapshot_date: datetime = Field(index=True)
total_value: float = Field(default=0.0)
cash_balance: float = Field(default=0.0)
asset_count: int = Field(default=0)
top_holdings: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
sector_allocation: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
geographic_allocation: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
risk_metrics: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
performance_metrics: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=datetime.utcnow)
class TradingRule(SQLModel, table=True):
"""Trading rules and constraints for portfolios"""
__tablename__ = "trading_rule"
id: Optional[int] = Field(default=None, primary_key=True)
portfolio_id: int = Field(foreign_key="agent_portfolio.id", index=True)
rule_type: str = Field(index=True) # Type of rule
rule_name: str = Field(index=True)
parameters: Dict[str, str] = Field(default_factory=dict, sa_column=Column(JSON))
is_active: bool = Field(default=True, index=True)
priority: int = Field(default=0) # Rule priority (higher = more important)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class MarketCondition(SQLModel, table=True):
"""Market conditions affecting portfolio decisions"""
__tablename__ = "market_condition"
id: Optional[int] = Field(default=None, primary_key=True)
condition_type: str = Field(index=True) # BULL, BEAR, SIDEWAYS, VOLATILE
market_index: str = Field(index=True) # Market index (SPY, QQQ, etc.)
confidence: float = Field(default=0.0) # Confidence in condition
indicators: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
sentiment_score: float = Field(default=0.0) # Market sentiment score
volatility_index: float = Field(default=0.0) # VIX or similar
trend_strength: float = Field(default=0.0) # Trend strength
support_level: float = Field(default=0.0) # Support level
resistance_level: float = Field(default=0.0) # Resistance level
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
expires_at: datetime = Field(default_factory=lambda: datetime.utcnow() + timedelta(hours=24))

View File

@@ -0,0 +1,329 @@
"""
AMM Domain Models
Domain models for automated market making, liquidity pools, and swap transactions.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional
from uuid import uuid4
from sqlalchemy import Column, JSON
from sqlmodel import Field, SQLModel, Relationship
class PoolStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
PAUSED = "paused"
MAINTENANCE = "maintenance"
class SwapStatus(str, Enum):
PENDING = "pending"
EXECUTED = "executed"
FAILED = "failed"
CANCELLED = "cancelled"
class LiquidityPositionStatus(str, Enum):
ACTIVE = "active"
WITHDRAWN = "withdrawn"
PENDING = "pending"
class LiquidityPool(SQLModel, table=True):
"""Liquidity pool for automated market making"""
__tablename__ = "liquidity_pool"
id: Optional[int] = Field(default=None, primary_key=True)
contract_pool_id: str = Field(index=True) # Contract pool ID
token_a: str = Field(index=True) # Token A address
token_b: str = Field(index=True) # Token B address
token_a_symbol: str = Field(index=True) # Token A symbol
token_b_symbol: str = Field(index=True) # Token B symbol
fee_percentage: float = Field(default=0.3) # Trading fee percentage
reserve_a: float = Field(default=0.0) # Token A reserve
reserve_b: float = Field(default=0.0) # Token B reserve
total_liquidity: float = Field(default=0.0) # Total liquidity tokens
total_supply: float = Field(default=0.0) # Total LP token supply
apr: float = Field(default=0.0) # Annual percentage rate
volume_24h: float = Field(default=0.0) # 24h trading volume
fees_24h: float = Field(default=0.0) # 24h fee revenue
tvl: float = Field(default=0.0) # Total value locked
utilization_rate: float = Field(default=0.0) # Pool utilization rate
price_impact_threshold: float = Field(default=0.05) # Price impact threshold
max_slippage: float = Field(default=0.05) # Maximum slippage
is_active: bool = Field(default=True, index=True)
status: PoolStatus = Field(default=PoolStatus.ACTIVE, index=True)
created_by: str = Field(index=True) # Creator address
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
updated_at: datetime = Field(default_factory=datetime.utcnow)
last_trade_time: Optional[datetime] = Field(default=None)
# Relationships
positions: List["LiquidityPosition"] = Relationship(back_populates="pool")
swaps: List["SwapTransaction"] = Relationship(back_populates="pool")
metrics: List["PoolMetrics"] = Relationship(back_populates="pool")
incentives: List["IncentiveProgram"] = Relationship(back_populates="pool")
class LiquidityPosition(SQLModel, table=True):
"""Liquidity provider position in a pool"""
__tablename__ = "liquidity_position"
id: Optional[int] = Field(default=None, primary_key=True)
pool_id: int = Field(foreign_key="liquidity_pool.id", index=True)
provider_address: str = Field(index=True)
liquidity_amount: float = Field(default=0.0) # Amount of liquidity tokens
shares_owned: float = Field(default=0.0) # Percentage of pool owned
deposit_amount_a: float = Field(default=0.0) # Initial token A deposit
deposit_amount_b: float = Field(default=0.0) # Initial token B deposit
current_amount_a: float = Field(default=0.0) # Current token A amount
current_amount_b: float = Field(default=0.0) # Current token B amount
unrealized_pnl: float = Field(default=0.0) # Unrealized P&L
fees_earned: float = Field(default=0.0) # Fees earned
impermanent_loss: float = Field(default=0.0) # Impermanent loss
status: LiquidityPositionStatus = Field(default=LiquidityPositionStatus.ACTIVE, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
updated_at: datetime = Field(default_factory=datetime.utcnow)
last_deposit: Optional[datetime] = Field(default=None)
last_withdrawal: Optional[datetime] = Field(default=None)
# Relationships
pool: LiquidityPool = Relationship(back_populates="positions")
fee_claims: List["FeeClaim"] = Relationship(back_populates="position")
class SwapTransaction(SQLModel, table=True):
"""Swap transaction executed in a pool"""
__tablename__ = "swap_transaction"
id: Optional[int] = Field(default=None, primary_key=True)
pool_id: int = Field(foreign_key="liquidity_pool.id", index=True)
user_address: str = Field(index=True)
token_in: str = Field(index=True)
token_out: str = Field(index=True)
amount_in: float = Field(default=0.0)
amount_out: float = Field(default=0.0)
price: float = Field(default=0.0) # Execution price
price_impact: float = Field(default=0.0) # Price impact
slippage: float = Field(default=0.0) # Slippage percentage
fee_amount: float = Field(default=0.0) # Fee amount
fee_percentage: float = Field(default=0.0) # Applied fee percentage
status: SwapStatus = Field(default=SwapStatus.PENDING, index=True)
transaction_hash: Optional[str] = Field(default=None, index=True)
block_number: Optional[int] = Field(default=None)
gas_used: Optional[int] = Field(default=None)
gas_price: Optional[float] = Field(default=None)
executed_at: Optional[datetime] = Field(default=None, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
deadline: datetime = Field(default_factory=lambda: datetime.utcnow() + timedelta(minutes=20))
# Relationships
pool: LiquidityPool = Relationship(back_populates="swaps")
class PoolMetrics(SQLModel, table=True):
"""Historical metrics for liquidity pools"""
__tablename__ = "pool_metrics"
id: Optional[int] = Field(default=None, primary_key=True)
pool_id: int = Field(foreign_key="liquidity_pool.id", index=True)
timestamp: datetime = Field(index=True)
total_volume_24h: float = Field(default=0.0)
total_fees_24h: float = Field(default=0.0)
total_value_locked: float = Field(default=0.0)
apr: float = Field(default=0.0)
utilization_rate: float = Field(default=0.0)
liquidity_depth: float = Field(default=0.0) # Liquidity depth at 1% price impact
price_volatility: float = Field(default=0.0) # Price volatility
swap_count_24h: int = Field(default=0) # Number of swaps in 24h
unique_traders_24h: int = Field(default=0) # Unique traders in 24h
average_trade_size: float = Field(default=0.0) # Average trade size
impermanent_loss_24h: float = Field(default=0.0) # 24h impermanent loss
liquidity_provider_count: int = Field(default=0) # Number of liquidity providers
top_lps: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON)) # Top LPs by share
created_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
pool: LiquidityPool = Relationship(back_populates="metrics")
class FeeStructure(SQLModel, table=True):
"""Fee structure for liquidity pools"""
__tablename__ = "fee_structure"
id: Optional[int] = Field(default=None, primary_key=True)
pool_id: int = Field(foreign_key="liquidity_pool.id", index=True)
base_fee_percentage: float = Field(default=0.3) # Base fee percentage
current_fee_percentage: float = Field(default=0.3) # Current fee percentage
volatility_adjustment: float = Field(default=0.0) # Volatility-based adjustment
volume_adjustment: float = Field(default=0.0) # Volume-based adjustment
liquidity_adjustment: float = Field(default=0.0) # Liquidity-based adjustment
time_adjustment: float = Field(default=0.0) # Time-based adjustment
adjusted_at: datetime = Field(default_factory=datetime.utcnow)
expires_at: datetime = Field(default_factory=lambda: datetime.utcnow() + timedelta(hours=24))
adjustment_reason: str = Field(default="") # Reason for adjustment
created_at: datetime = Field(default_factory=datetime.utcnow)
class IncentiveProgram(SQLModel, table=True):
"""Incentive program for liquidity providers"""
__tablename__ = "incentive_program"
id: Optional[int] = Field(default=None, primary_key=True)
pool_id: int = Field(foreign_key="liquidity_pool.id", index=True)
program_name: str = Field(index=True)
reward_token: str = Field(index=True) # Reward token address
daily_reward_amount: float = Field(default=0.0) # Daily reward amount
total_reward_amount: float = Field(default=0.0) # Total reward amount
remaining_reward_amount: float = Field(default=0.0) # Remaining rewards
incentive_multiplier: float = Field(default=1.0) # Incentive multiplier
duration_days: int = Field(default=30) # Program duration in days
minimum_liquidity: float = Field(default=0.0) # Minimum liquidity to qualify
maximum_liquidity: float = Field(default=0.0) # Maximum liquidity cap (0 = no cap)
vesting_period_days: int = Field(default=0) # Vesting period (0 = no vesting)
is_active: bool = Field(default=True, index=True)
start_time: datetime = Field(default_factory=datetime.utcnow)
end_time: datetime = Field(default_factory=lambda: datetime.utcnow() + timedelta(days=30))
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
pool: LiquidityPool = Relationship(back_populates="incentives")
rewards: List["LiquidityReward"] = Relationship(back_populates="program")
class LiquidityReward(SQLModel, table=True):
"""Reward earned by liquidity providers"""
__tablename__ = "liquidity_reward"
id: Optional[int] = Field(default=None, primary_key=True)
program_id: int = Field(foreign_key="incentive_program.id", index=True)
position_id: int = Field(foreign_key="liquidity_position.id", index=True)
provider_address: str = Field(index=True)
reward_amount: float = Field(default=0.0)
reward_token: str = Field(index=True)
liquidity_share: float = Field(default=0.0) # Share of pool liquidity
time_weighted_share: float = Field(default=0.0) # Time-weighted share
is_claimed: bool = Field(default=False, index=True)
claimed_at: Optional[datetime] = Field(default=None)
claim_transaction_hash: Optional[str] = Field(default=None)
vesting_start: Optional[datetime] = Field(default=None)
vesting_end: Optional[datetime] = Field(default=None)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
# Relationships
program: IncentiveProgram = Relationship(back_populates="rewards")
position: LiquidityPosition = Relationship(back_populates="fee_claims")
class FeeClaim(SQLModel, table=True):
"""Fee claim by liquidity providers"""
__tablename__ = "fee_claim"
id: Optional[int] = Field(default=None, primary_key=True)
position_id: int = Field(foreign_key="liquidity_position.id", index=True)
provider_address: str = Field(index=True)
fee_amount: float = Field(default=0.0)
fee_token: str = Field(index=True)
claim_period_start: datetime = Field(index=True)
claim_period_end: datetime = Field(index=True)
liquidity_share: float = Field(default=0.0) # Share of pool liquidity
is_claimed: bool = Field(default=False, index=True)
claimed_at: Optional[datetime] = Field(default=None)
claim_transaction_hash: Optional[str] = Field(default=None)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
# Relationships
position: LiquidityPosition = Relationship(back_populates="fee_claims")
class PoolConfiguration(SQLModel, table=True):
"""Configuration settings for liquidity pools"""
__tablename__ = "pool_configuration"
id: Optional[int] = Field(default=None, primary_key=True)
pool_id: int = Field(foreign_key="liquidity_pool.id", index=True)
config_key: str = Field(index=True)
config_value: str = Field(default="")
config_type: str = Field(default="string") # string, number, boolean, json
is_active: bool = Field(default=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class PoolAlert(SQLModel, table=True):
"""Alerts for pool events and conditions"""
__tablename__ = "pool_alert"
id: Optional[int] = Field(default=None, primary_key=True)
pool_id: int = Field(foreign_key="liquidity_pool.id", index=True)
alert_type: str = Field(index=True) # LOW_LIQUIDITY, HIGH_VOLATILITY, etc.
severity: str = Field(index=True) # LOW, MEDIUM, HIGH, CRITICAL
title: str = Field(default="")
message: str = Field(default="")
metadata: Dict[str, str] = Field(default_factory=dict, sa_column=Column(JSON))
threshold_value: float = Field(default=0.0) # Threshold that triggered alert
current_value: float = Field(default=0.0) # Current value
is_acknowledged: bool = Field(default=False, index=True)
acknowledged_by: Optional[str] = Field(default=None)
acknowledged_at: Optional[datetime] = Field(default=None)
is_resolved: bool = Field(default=False, index=True)
resolved_at: Optional[datetime] = Field(default=None)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
expires_at: datetime = Field(default_factory=lambda: datetime.utcnow() + timedelta(hours=24))
class PoolSnapshot(SQLModel, table=True):
"""Daily snapshot of pool state"""
__tablename__ = "pool_snapshot"
id: Optional[int] = Field(default=None, primary_key=True)
pool_id: int = Field(foreign_key="liquidity_pool.id", index=True)
snapshot_date: datetime = Field(index=True)
reserve_a: float = Field(default=0.0)
reserve_b: float = Field(default=0.0)
total_liquidity: float = Field(default=0.0)
price_a_to_b: float = Field(default=0.0) # Price of A in terms of B
price_b_to_a: float = Field(default=0.0) # Price of B in terms of A
volume_24h: float = Field(default=0.0)
fees_24h: float = Field(default=0.0)
tvl: float = Field(default=0.0)
apr: float = Field(default=0.0)
utilization_rate: float = Field(default=0.0)
liquidity_provider_count: int = Field(default=0)
swap_count_24h: int = Field(default=0)
average_slippage: float = Field(default=0.0)
average_price_impact: float = Field(default=0.0)
impermanent_loss: float = Field(default=0.0)
created_at: datetime = Field(default_factory=datetime.utcnow)
class ArbitrageOpportunity(SQLModel, table=True):
"""Arbitrage opportunities across pools"""
__tablename__ = "arbitrage_opportunity"
id: Optional[int] = Field(default=None, primary_key=True)
token_a: str = Field(index=True)
token_b: str = Field(index=True)
pool_1_id: int = Field(foreign_key="liquidity_pool.id", index=True)
pool_2_id: int = Field(foreign_key="liquidity_pool.id", index=True)
price_1: float = Field(default=0.0) # Price in pool 1
price_2: float = Field(default=0.0) # Price in pool 2
price_difference: float = Field(default=0.0) # Price difference percentage
potential_profit: float = Field(default=0.0) # Potential profit amount
gas_cost_estimate: float = Field(default=0.0) # Estimated gas cost
net_profit: float = Field(default=0.0) # Net profit after gas
required_amount: float = Field(default=0.0) # Amount needed for arbitrage
confidence: float = Field(default=0.0) # Confidence in opportunity
is_executed: bool = Field(default=False, index=True)
executed_at: Optional[datetime] = Field(default=None)
execution_tx_hash: Optional[str] = Field(default=None)
actual_profit: Optional[float] = Field(default=None)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
expires_at: datetime = Field(default_factory=lambda: datetime.utcnow() + timedelta(minutes=5))

View File

@@ -0,0 +1,357 @@
"""
Cross-Chain Bridge Domain Models
Domain models for cross-chain asset transfers, bridge requests, and validator management.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional
from uuid import uuid4
from sqlalchemy import Column, JSON
from sqlmodel import Field, SQLModel, Relationship
class BridgeRequestStatus(str, Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
EXPIRED = "expired"
RESOLVED = "resolved"
class ChainType(str, Enum):
ETHEREUM = "ethereum"
POLYGON = "polygon"
BSC = "bsc"
ARBITRUM = "arbitrum"
OPTIMISM = "optimism"
AVALANCHE = "avalanche"
FANTOM = "fantom"
HARMONY = "harmony"
class TransactionType(str, Enum):
INITIATION = "initiation"
CONFIRMATION = "confirmation"
COMPLETION = "completion"
REFUND = "refund"
DISPUTE = "dispute"
class ValidatorStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
SLASHED = "slashed"
class BridgeRequest(SQLModel, table=True):
"""Cross-chain bridge transfer request"""
__tablename__ = "bridge_request"
id: Optional[int] = Field(default=None, primary_key=True)
contract_request_id: str = Field(index=True) # Contract request ID
sender_address: str = Field(index=True)
recipient_address: str = Field(index=True)
source_token: str = Field(index=True) # Source token address
target_token: str = Field(index=True) # Target token address
source_chain_id: int = Field(index=True)
target_chain_id: int = Field(index=True)
amount: float = Field(default=0.0)
bridge_fee: float = Field(default=0.0)
total_amount: float = Field(default=0.0) # Amount including fee
exchange_rate: float = Field(default=1.0) # Exchange rate between tokens
status: BridgeRequestStatus = Field(default=BridgeRequestStatus.PENDING, index=True)
zk_proof: Optional[str] = Field(default=None) # Zero-knowledge proof
merkle_proof: Optional[str] = Field(default=None) # Merkle proof for completion
lock_tx_hash: Optional[str] = Field(default=None, index=True) # Lock transaction hash
unlock_tx_hash: Optional[str] = Field(default=None, index=True) # Unlock transaction hash
confirmations: int = Field(default=0) # Number of confirmations received
required_confirmations: int = Field(default=3) # Required confirmations
dispute_reason: Optional[str] = Field(default=None)
resolution_action: Optional[str] = Field(default=None)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
updated_at: datetime = Field(default_factory=datetime.utcnow)
confirmed_at: Optional[datetime] = Field(default=None)
completed_at: Optional[datetime] = Field(default=None)
resolved_at: Optional[datetime] = Field(default=None)
expires_at: datetime = Field(default_factory=lambda: datetime.utcnow() + timedelta(hours=24))
# Relationships
transactions: List["BridgeTransaction"] = Relationship(back_populates="bridge_request")
disputes: List["BridgeDispute"] = Relationship(back_populates="bridge_request")
class SupportedToken(SQLModel, table=True):
"""Supported tokens for cross-chain bridging"""
__tablename__ = "supported_token"
id: Optional[int] = Field(default=None, primary_key=True)
token_address: str = Field(index=True)
token_symbol: str = Field(index=True)
token_name: str = Field(default="")
decimals: int = Field(default=18)
bridge_limit: float = Field(default=1000000.0) # Maximum bridge amount
fee_percentage: float = Field(default=0.5) # Bridge fee percentage
min_amount: float = Field(default=0.01) # Minimum bridge amount
max_amount: float = Field(default=1000000.0) # Maximum bridge amount
requires_whitelist: bool = Field(default=False)
is_active: bool = Field(default=True, index=True)
is_wrapped: bool = Field(default=False) # Whether it's a wrapped token
original_token: Optional[str] = Field(default=None) # Original token address for wrapped tokens
supported_chains: List[int] = Field(default_factory=list, sa_column=Column(JSON))
bridge_contracts: Dict[int, str] = Field(default_factory=dict, sa_column=Column(JSON)) # Chain ID -> Contract address
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class ChainConfig(SQLModel, table=True):
"""Configuration for supported blockchain networks"""
__tablename__ = "chain_config"
id: Optional[int] = Field(default=None, primary_key=True)
chain_id: int = Field(index=True)
chain_name: str = Field(index=True)
chain_type: ChainType = Field(index=True)
rpc_url: str = Field(default="")
block_explorer_url: str = Field(default="")
bridge_contract_address: str = Field(default="")
native_token: str = Field(default="")
native_token_symbol: str = Field(default="")
block_time: int = Field(default=12) # Average block time in seconds
min_confirmations: int = Field(default=3) # Minimum confirmations for finality
avg_block_time: int = Field(default=12) # Average block time
finality_time: int = Field(default=300) # Time to finality in seconds
gas_token: str = Field(default="")
max_gas_price: float = Field(default=0.0) # Maximum gas price
is_active: bool = Field(default=True, index=True)
is_testnet: bool = Field(default=False)
requires_validator: bool = Field(default=True) # Whether validator confirmation is required
validator_threshold: float = Field(default=0.67) # Validator threshold percentage
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class Validator(SQLModel, table=True):
"""Bridge validator for cross-chain confirmations"""
__tablename__ = "validator"
id: Optional[int] = Field(default=None, primary_key=True)
validator_address: str = Field(index=True)
validator_name: str = Field(default="")
weight: int = Field(default=1) # Validator weight
commission_rate: float = Field(default=0.0) # Commission rate
total_validations: int = Field(default=0) # Total number of validations
successful_validations: int = Field(default=0) # Successful validations
failed_validations: int = Field(default=0) # Failed validations
slashed_amount: float = Field(default=0.0) # Total amount slashed
earned_fees: float = Field(default=0.0) # Total fees earned
reputation_score: float = Field(default=100.0) # Reputation score (0-100)
uptime_percentage: float = Field(default=100.0) # Uptime percentage
last_validation: Optional[datetime] = Field(default=None)
last_seen: Optional[datetime] = Field(default=None)
status: ValidatorStatus = Field(default=ValidatorStatus.ACTIVE, index=True)
is_active: bool = Field(default=True, index=True)
supported_chains: List[int] = Field(default_factory=list, sa_column=Column(JSON))
metadata: Dict[str, str] = Field(default_factory=dict, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
transactions: List["BridgeTransaction"] = Relationship(back_populates="validator")
class BridgeTransaction(SQLModel, table=True):
"""Transactions related to bridge requests"""
__tablename__ = "bridge_transaction"
id: Optional[int] = Field(default=None, primary_key=True)
bridge_request_id: int = Field(foreign_key="bridge_request.id", index=True)
validator_address: Optional[str] = Field(default=None, index=True)
transaction_type: TransactionType = Field(index=True)
transaction_hash: Optional[str] = Field(default=None, index=True)
block_number: Optional[int] = Field(default=None)
block_hash: Optional[str] = Field(default=None)
gas_used: Optional[int] = Field(default=None)
gas_price: Optional[float] = Field(default=None)
transaction_cost: Optional[float] = Field(default=None)
signature: Optional[str] = Field(default=None) # Validator signature
merkle_proof: Optional[List[str]] = Field(default_factory=list, sa_column=Column(JSON))
confirmations: int = Field(default=0) # Number of confirmations
is_successful: bool = Field(default=False)
error_message: Optional[str] = Field(default=None)
retry_count: int = Field(default=0)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
confirmed_at: Optional[datetime] = Field(default=None)
completed_at: Optional[datetime] = Field(default=None)
# Relationships
bridge_request: BridgeRequest = Relationship(back_populates="transactions")
validator: Optional[Validator] = Relationship(back_populates="transactions")
class BridgeDispute(SQLModel, table=True):
"""Dispute records for failed bridge transfers"""
__tablename__ = "bridge_dispute"
id: Optional[int] = Field(default=None, primary_key=True)
bridge_request_id: int = Field(foreign_key="bridge_request.id", index=True)
dispute_type: str = Field(index=True) # TIMEOUT, INSUFFICIENT_FUNDS, VALIDATOR_MISBEHAVIOR, etc.
dispute_reason: str = Field(default="")
dispute_status: str = Field(default="open") # open, investigating, resolved, rejected
reporter_address: str = Field(index=True)
evidence: Dict[str, str] = Field(default_factory=dict, sa_column=Column(JSON))
resolution_action: Optional[str] = Field(default=None)
resolution_details: Optional[str] = Field(default=None)
refund_amount: Optional[float] = Field(default=None)
compensation_amount: Optional[float] = Field(default=None)
penalty_amount: Optional[float] = Field(default=None)
investigator_address: Optional[str] = Field(default=None)
investigation_notes: Optional[str] = Field(default=None)
is_resolved: bool = Field(default=False, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
updated_at: datetime = Field(default_factory=datetime.utcnow)
resolved_at: Optional[datetime] = Field(default=None)
# Relationships
bridge_request: BridgeRequest = Relationship(back_populates="disputes")
class MerkleProof(SQLModel, table=True):
"""Merkle proofs for bridge transaction verification"""
__tablename__ = "merkle_proof"
id: Optional[int] = Field(default=None, primary_key=True)
bridge_request_id: int = Field(foreign_key="bridge_request.id", index=True)
proof_hash: str = Field(index=True) # Merkle proof hash
merkle_root: str = Field(index=True) # Merkle root
proof_data: List[str] = Field(default_factory=list, sa_column=Column(JSON)) # Proof data
leaf_index: int = Field(default=0) # Leaf index in tree
tree_depth: int = Field(default=0) # Tree depth
is_valid: bool = Field(default=False)
verified_at: Optional[datetime] = Field(default=None)
expires_at: datetime = Field(default_factory=lambda: datetime.utcnow() + timedelta(hours=24))
created_at: datetime = Field(default_factory=datetime.utcnow)
class BridgeStatistics(SQLModel, table=True):
"""Statistics for bridge operations"""
__tablename__ = "bridge_statistics"
id: Optional[int] = Field(default=None, primary_key=True)
chain_id: int = Field(index=True)
token_address: str = Field(index=True)
date: datetime = Field(index=True)
total_volume: float = Field(default=0.0) # Total volume for the day
total_transactions: int = Field(default=0) # Total number of transactions
successful_transactions: int = Field(default=0) # Successful transactions
failed_transactions: int = Field(default=0) # Failed transactions
total_fees: float = Field(default=0.0) # Total fees collected
average_transaction_time: float = Field(default=0.0) # Average time in minutes
average_transaction_size: float = Field(default=0.0) # Average transaction size
unique_users: int = Field(default=0) # Unique users for the day
peak_hour_volume: float = Field(default=0.0) # Peak hour volume
peak_hour_transactions: int = Field(default=0) # Peak hour transactions
created_at: datetime = Field(default_factory=datetime.utcnow)
class BridgeAlert(SQLModel, table=True):
"""Alerts for bridge operations and issues"""
__tablename__ = "bridge_alert"
id: Optional[int] = Field(default=None, primary_key=True)
alert_type: str = Field(index=True) # HIGH_FAILURE_RATE, LOW_LIQUIDITY, VALIDATOR_OFFLINE, etc.
severity: str = Field(index=True) # LOW, MEDIUM, HIGH, CRITICAL
chain_id: Optional[int] = Field(default=None, index=True)
token_address: Optional[str] = Field(default=None, index=True)
validator_address: Optional[str] = Field(default=None, index=True)
bridge_request_id: Optional[int] = Field(default=None, index=True)
title: str = Field(default="")
message: str = Field(default="")
metadata: Dict[str, str] = Field(default_factory=dict, sa_column=Column(JSON))
threshold_value: float = Field(default=0.0) # Threshold that triggered alert
current_value: float = Field(default=0.0) # Current value
is_acknowledged: bool = Field(default=False, index=True)
acknowledged_by: Optional[str] = Field(default=None)
acknowledged_at: Optional[datetime] = Field(default=None)
is_resolved: bool = Field(default=False, index=True)
resolved_at: Optional[datetime] = Field(default=None)
resolution_notes: Optional[str] = Field(default=None)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
expires_at: datetime = Field(default_factory=lambda: datetime.utcnow() + timedelta(hours=24))
class BridgeConfiguration(SQLModel, table=True):
"""Configuration settings for bridge operations"""
__tablename__ = "bridge_configuration"
id: Optional[int] = Field(default=None, primary_key=True)
config_key: str = Field(index=True)
config_value: str = Field(default="")
config_type: str = Field(default="string") # string, number, boolean, json
description: str = Field(default="")
is_active: bool = Field(default=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class LiquidityPool(SQLModel, table=True):
"""Liquidity pools for bridge operations"""
__tablename__ = "bridge_liquidity_pool"
id: Optional[int] = Field(default=None, primary_key=True)
chain_id: int = Field(index=True)
token_address: str = Field(index=True)
pool_address: str = Field(index=True)
total_liquidity: float = Field(default=0.0) # Total liquidity in pool
available_liquidity: float = Field(default=0.0) # Available liquidity
utilized_liquidity: float = Field(default=0.0) # Utilized liquidity
utilization_rate: float = Field(default=0.0) # Utilization rate
interest_rate: float = Field(default=0.0) # Interest rate
last_updated: datetime = Field(default_factory=datetime.utcnow)
is_active: bool = Field(default=True, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
class BridgeSnapshot(SQLModel, table=True):
"""Daily snapshot of bridge operations"""
__tablename__ = "bridge_snapshot"
id: Optional[int] = Field(default=None, primary_key=True)
snapshot_date: datetime = Field(index=True)
total_volume_24h: float = Field(default=0.0)
total_transactions_24h: int = Field(default=0)
successful_transactions_24h: int = Field(default=0)
failed_transactions_24h: int = Field(default=0)
total_fees_24h: float = Field(default=0.0)
average_transaction_time: float = Field(default=0.0)
unique_users_24h: int = Field(default=0)
active_validators: int = Field(default=0)
total_liquidity: float = Field(default=0.0)
bridge_utilization: float = Field(default=0.0)
top_tokens: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
top_chains: Dict[str, int] = Field(default_factory=dict, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=datetime.utcnow)
class ValidatorReward(SQLModel, table=True):
"""Rewards earned by validators"""
__tablename__ = "validator_reward"
id: Optional[int] = Field(default=None, primary_key=True)
validator_address: str = Field(index=True)
bridge_request_id: int = Field(foreign_key="bridge_request.id", index=True)
reward_amount: float = Field(default=0.0)
reward_token: str = Field(index=True)
reward_type: str = Field(index=True) # VALIDATION_FEE, PERFORMANCE_BONUS, etc.
reward_period: str = Field(index=True) # Daily, weekly, monthly
is_claimed: bool = Field(default=False, index=True)
claimed_at: Optional[datetime] = Field(default=None)
claim_transaction_hash: Optional[str] = Field(default=None)
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)

View File

@@ -0,0 +1,268 @@
"""
Cross-Chain Reputation Extensions
Extends the existing reputation system with cross-chain capabilities
"""
from datetime import datetime, date
from typing import Optional, Dict, List, Any
from uuid import uuid4
from enum import Enum
from sqlmodel import SQLModel, Field, Column, JSON, Index
from sqlalchemy import DateTime, func
from .reputation import AgentReputation, ReputationEvent, ReputationLevel
class CrossChainReputationConfig(SQLModel, table=True):
"""Chain-specific reputation configuration for cross-chain aggregation"""
__tablename__ = "cross_chain_reputation_configs"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"config_{uuid4().hex[:8]}", primary_key=True)
chain_id: int = Field(index=True, unique=True)
# Weighting configuration
chain_weight: float = Field(default=1.0) # Weight in cross-chain aggregation
base_reputation_bonus: float = Field(default=0.0) # Base reputation for new agents
# Scoring configuration
transaction_success_weight: float = Field(default=0.1)
transaction_failure_weight: float = Field(default=-0.2)
dispute_penalty_weight: float = Field(default=-0.3)
# Thresholds
minimum_transactions_for_score: int = Field(default=5)
reputation_decay_rate: float = Field(default=0.01) # Daily decay rate
anomaly_detection_threshold: float = Field(default=0.3) # Score change threshold
# Configuration metadata
is_active: bool = Field(default=True)
configuration_data: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class CrossChainReputationAggregation(SQLModel, table=True):
"""Aggregated cross-chain reputation data"""
__tablename__ = "cross_chain_reputation_aggregations"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"agg_{uuid4().hex[:8]}", primary_key=True)
agent_id: str = Field(index=True)
# Aggregated scores
aggregated_score: float = Field(index=True, ge=0.0, le=1.0)
weighted_score: float = Field(default=0.0, ge=0.0, le=1.0)
normalized_score: float = Field(default=0.0, ge=0.0, le=1.0)
# Chain breakdown
chain_count: int = Field(default=0)
active_chains: List[int] = Field(default_factory=list, sa_column=Column(JSON))
chain_scores: Dict[int, float] = Field(default_factory=dict, sa_column=Column(JSON))
chain_weights: Dict[int, float] = Field(default_factory=dict, sa_column=Column(JSON))
# Consistency metrics
score_variance: float = Field(default=0.0)
score_range: float = Field(default=0.0)
consistency_score: float = Field(default=1.0, ge=0.0, le=1.0)
# Verification status
verification_status: str = Field(default="pending") # pending, verified, failed
verification_details: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
# Timestamps
last_updated: datetime = Field(default_factory=datetime.utcnow)
created_at: datetime = Field(default_factory=datetime.utcnow)
# Indexes
__table_args__ = (
Index('idx_cross_chain_agg_agent', 'agent_id'),
Index('idx_cross_chain_agg_score', 'aggregated_score'),
Index('idx_cross_chain_agg_updated', 'last_updated'),
Index('idx_cross_chain_agg_status', 'verification_status'),
)
class CrossChainReputationEvent(SQLModel, table=True):
"""Cross-chain reputation events and synchronizations"""
__tablename__ = "cross_chain_reputation_events"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"event_{uuid4().hex[:8]}", primary_key=True)
agent_id: str = Field(index=True)
source_chain_id: int = Field(index=True)
target_chain_id: Optional[int] = Field(index=True)
# Event details
event_type: str = Field(max_length=50) # aggregation, migration, verification, etc.
impact_score: float = Field(ge=-1.0, le=1.0)
description: str = Field(default="")
# Cross-chain data
source_reputation: Optional[float] = Field(default=None)
target_reputation: Optional[float] = Field(default=None)
reputation_change: Optional[float] = Field(default=None)
# Event metadata
event_data: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
source: str = Field(default="system") # system, user, oracle, etc.
verified: bool = Field(default=False)
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
processed_at: Optional[datetime] = None
# Indexes
__table_args__ = (
Index('idx_cross_chain_event_agent', 'agent_id'),
Index('idx_cross_chain_event_chains', 'source_chain_id', 'target_chain_id'),
Index('idx_cross_chain_event_type', 'event_type'),
Index('idx_cross_chain_event_created', 'created_at'),
)
class ReputationMetrics(SQLModel, table=True):
"""Aggregated reputation metrics for analytics"""
__tablename__ = "reputation_metrics"
__table_args__ = {"extend_existing": True}
id: str = Field(default_factory=lambda: f"metrics_{uuid4().hex[:8]}", primary_key=True)
chain_id: int = Field(index=True)
metric_date: date = Field(index=True)
# Aggregated metrics
total_agents: int = Field(default=0)
average_reputation: float = Field(default=0.0)
reputation_distribution: Dict[str, int] = Field(default_factory=dict, sa_column=Column(JSON))
# Performance metrics
total_transactions: int = Field(default=0)
success_rate: float = Field(default=0.0)
dispute_rate: float = Field(default=0.0)
# Distribution metrics
level_distribution: Dict[str, int] = Field(default_factory=dict, sa_column=Column(JSON))
score_distribution: Dict[str, int] = Field(default_factory=dict, sa_column=Column(JSON))
# Cross-chain metrics
cross_chain_agents: int = Field(default=0)
average_consistency_score: float = Field(default=0.0)
chain_diversity_score: float = Field(default=0.0)
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Request/Response Models for Cross-Chain API
class CrossChainReputationRequest(SQLModel):
"""Request model for cross-chain reputation operations"""
agent_id: str
chain_ids: Optional[List[int]] = None
include_history: bool = False
include_metrics: bool = False
aggregation_method: str = "weighted" # weighted, average, normalized
class CrossChainReputationUpdateRequest(SQLModel):
"""Request model for cross-chain reputation updates"""
agent_id: str
chain_id: int
reputation_score: float = Field(ge=0.0, le=1.0)
transaction_data: Dict[str, Any] = Field(default_factory=dict)
source: str = "system"
description: str = ""
class CrossChainAggregationRequest(SQLModel):
"""Request model for cross-chain aggregation"""
agent_ids: List[str]
chain_ids: Optional[List[int]] = None
aggregation_method: str = "weighted"
force_recalculate: bool = False
class CrossChainVerificationRequest(SQLModel):
"""Request model for cross-chain reputation verification"""
agent_id: str
threshold: float = Field(default=0.5)
verification_method: str = "consistency" # consistency, weighted, minimum
include_details: bool = False
# Response Models
class CrossChainReputationResponse(SQLModel):
"""Response model for cross-chain reputation"""
agent_id: str
chain_reputations: Dict[int, Dict[str, Any]]
aggregated_score: float
weighted_score: float
normalized_score: float
chain_count: int
active_chains: List[int]
consistency_score: float
verification_status: str
last_updated: datetime
metadata: Dict[str, Any] = Field(default_factory=dict)
class CrossChainAnalyticsResponse(SQLModel):
"""Response model for cross-chain analytics"""
chain_id: Optional[int]
total_agents: int
cross_chain_agents: int
average_reputation: float
average_consistency_score: float
chain_diversity_score: float
reputation_distribution: Dict[str, int]
level_distribution: Dict[str, int]
score_distribution: Dict[str, int]
performance_metrics: Dict[str, Any]
cross_chain_metrics: Dict[str, Any]
generated_at: datetime
class ReputationAnomalyResponse(SQLModel):
"""Response model for reputation anomalies"""
agent_id: str
chain_id: int
anomaly_type: str
detected_at: datetime
description: str
severity: str
previous_score: float
current_score: float
score_change: float
confidence: float
metadata: Dict[str, Any] = Field(default_factory=dict)
class CrossChainLeaderboardResponse(SQLModel):
"""Response model for cross-chain reputation leaderboard"""
agents: List[CrossChainReputationResponse]
total_count: int
page: int
page_size: int
chain_filter: Optional[int]
sort_by: str
sort_order: str
last_updated: datetime
class ReputationVerificationResponse(SQLModel):
"""Response model for reputation verification"""
agent_id: str
threshold: float
is_verified: bool
verification_score: float
chain_verifications: Dict[int, bool]
verification_details: Dict[str, Any]
consistency_analysis: Dict[str, Any]
verified_at: datetime

View File

@@ -0,0 +1,566 @@
"""
Pricing Models for Dynamic Pricing Database Schema
SQLModel definitions for pricing history, strategies, and market metrics
"""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Optional, Dict, Any, List
from uuid import uuid4
from sqlalchemy import Column, JSON, Index
from sqlmodel import Field, SQLModel, Text
class PricingStrategyType(str, Enum):
"""Pricing strategy types for database"""
AGGRESSIVE_GROWTH = "aggressive_growth"
PROFIT_MAXIMIZATION = "profit_maximization"
MARKET_BALANCE = "market_balance"
COMPETITIVE_RESPONSE = "competitive_response"
DEMAND_ELASTICITY = "demand_elasticity"
PENETRATION_PRICING = "penetration_pricing"
PREMIUM_PRICING = "premium_pricing"
COST_PLUS = "cost_plus"
VALUE_BASED = "value_based"
COMPETITOR_BASED = "competitor_based"
class ResourceType(str, Enum):
"""Resource types for pricing"""
GPU = "gpu"
SERVICE = "service"
STORAGE = "storage"
NETWORK = "network"
COMPUTE = "compute"
class PriceTrend(str, Enum):
"""Price trend indicators"""
INCREASING = "increasing"
DECREASING = "decreasing"
STABLE = "stable"
VOLATILE = "volatile"
UNKNOWN = "unknown"
class PricingHistory(SQLModel, table=True):
"""Historical pricing data for analysis and machine learning"""
__tablename__ = "pricing_history"
__table_args__ = {
"extend_existing": True,
"indexes": [
Index("idx_pricing_history_resource_timestamp", "resource_id", "timestamp"),
Index("idx_pricing_history_type_region", "resource_type", "region"),
Index("idx_pricing_history_timestamp", "timestamp"),
Index("idx_pricing_history_provider", "provider_id")
]
}
id: str = Field(default_factory=lambda: f"ph_{uuid4().hex[:12]}", primary_key=True)
resource_id: str = Field(index=True)
resource_type: ResourceType = Field(index=True)
provider_id: Optional[str] = Field(default=None, index=True)
region: str = Field(default="global", index=True)
# Pricing data
price: float = Field(index=True)
base_price: float
price_change: Optional[float] = None # Change from previous price
price_change_percent: Optional[float] = None # Percentage change
# Market conditions at time of pricing
demand_level: float = Field(index=True)
supply_level: float = Field(index=True)
market_volatility: float
utilization_rate: float
# Strategy and factors
strategy_used: PricingStrategyType = Field(index=True)
strategy_parameters: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
pricing_factors: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
# Performance metrics
confidence_score: float
forecast_accuracy: Optional[float] = None
recommendation_followed: Optional[bool] = None
# Metadata
timestamp: datetime = Field(default_factory=datetime.utcnow, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
# Additional context
competitor_prices: List[float] = Field(default_factory=list, sa_column=Column(JSON))
market_sentiment: float = Field(default=0.0)
external_factors: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
# Reasoning and audit trail
price_reasoning: List[str] = Field(default_factory=list, sa_column=Column(JSON))
audit_log: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
class ProviderPricingStrategy(SQLModel, table=True):
"""Provider pricing strategies and configurations"""
__tablename__ = "provider_pricing_strategies"
__table_args__ = {
"extend_existing": True,
"indexes": [
Index("idx_provider_strategies_provider", "provider_id"),
Index("idx_provider_strategies_type", "strategy_type"),
Index("idx_provider_strategies_active", "is_active"),
Index("idx_provider_strategies_resource", "resource_type", "provider_id")
]
}
id: str = Field(default_factory=lambda: f"pps_{uuid4().hex[:12]}", primary_key=True)
provider_id: str = Field(index=True)
strategy_type: PricingStrategyType = Field(index=True)
resource_type: Optional[ResourceType] = Field(default=None, index=True)
# Strategy configuration
strategy_name: str
strategy_description: Optional[str] = None
parameters: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
# Constraints and limits
min_price: Optional[float] = None
max_price: Optional[float] = None
max_change_percent: float = Field(default=0.5)
min_change_interval: int = Field(default=300) # seconds
strategy_lock_period: int = Field(default=3600) # seconds
# Strategy rules
rules: List[Dict[str, Any]] = Field(default_factory=list, sa_column=Column(JSON))
custom_conditions: List[str] = Field(default_factory=list, sa_column=Column(JSON))
# Status and metadata
is_active: bool = Field(default=True, index=True)
auto_optimize: bool = Field(default=True)
learning_enabled: bool = Field(default=True)
priority: int = Field(default=5) # 1-10 priority level
# Geographic scope
regions: List[str] = Field(default_factory=list, sa_column=Column(JSON))
global_strategy: bool = Field(default=True)
# Performance tracking
total_revenue_impact: float = Field(default=0.0)
market_share_impact: float = Field(default=0.0)
customer_satisfaction_impact: float = Field(default=0.0)
strategy_effectiveness_score: float = Field(default=0.0)
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
last_applied: Optional[datetime] = None
expires_at: Optional[datetime] = None
# Audit information
created_by: Optional[str] = None
updated_by: Optional[str] = None
version: int = Field(default=1)
class MarketMetrics(SQLModel, table=True):
"""Real-time and historical market metrics"""
__tablename__ = "market_metrics"
__table_args__ = {
"extend_existing": True,
"indexes": [
Index("idx_market_metrics_region_type", "region", "resource_type"),
Index("idx_market_metrics_timestamp", "timestamp"),
Index("idx_market_metrics_demand", "demand_level"),
Index("idx_market_metrics_supply", "supply_level"),
Index("idx_market_metrics_composite", "region", "resource_type", "timestamp")
]
}
id: str = Field(default_factory=lambda: f"mm_{uuid4().hex[:12]}", primary_key=True)
region: str = Field(index=True)
resource_type: ResourceType = Field(index=True)
# Core market metrics
demand_level: float = Field(index=True)
supply_level: float = Field(index=True)
average_price: float = Field(index=True)
price_volatility: float = Field(index=True)
utilization_rate: float = Field(index=True)
# Market depth and liquidity
total_capacity: float
available_capacity: float
pending_orders: int
completed_orders: int
order_book_depth: float
# Competitive landscape
competitor_count: int
average_competitor_price: float
price_spread: float # Difference between highest and lowest prices
market_concentration: float # HHI or similar metric
# Market sentiment and activity
market_sentiment: float = Field(default=0.0)
trading_volume: float
price_momentum: float # Rate of price change
liquidity_score: float
# Regional factors
regional_multiplier: float = Field(default=1.0)
currency_adjustment: float = Field(default=1.0)
regulatory_factors: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
# Data quality and confidence
data_sources: List[str] = Field(default_factory=list, sa_column=Column(JSON))
confidence_score: float
data_freshness: int # Age of data in seconds
completeness_score: float
# Timestamps
timestamp: datetime = Field(default_factory=datetime.utcnow, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
# Additional metrics
custom_metrics: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
external_factors: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
class PriceForecast(SQLModel, table=True):
"""Price forecasting data and accuracy tracking"""
__tablename__ = "price_forecasts"
__table_args__ = {
"extend_existing": True,
"indexes": [
Index("idx_price_forecasts_resource", "resource_id"),
Index("idx_price_forecasts_target", "target_timestamp"),
Index("idx_price_forecasts_created", "created_at"),
Index("idx_price_forecasts_horizon", "forecast_horizon_hours")
]
}
id: str = Field(default_factory=lambda: f"pf_{uuid4().hex[:12]}", primary_key=True)
resource_id: str = Field(index=True)
resource_type: ResourceType = Field(index=True)
region: str = Field(default="global", index=True)
# Forecast parameters
forecast_horizon_hours: int = Field(index=True)
model_version: str
strategy_used: PricingStrategyType
# Forecast data points
forecast_points: List[Dict[str, Any]] = Field(default_factory=list, sa_column=Column(JSON))
confidence_intervals: Dict[str, List[float]] = Field(default_factory=dict, sa_column=Column(JSON))
# Forecast metadata
average_forecast_price: float
price_range_forecast: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
trend_forecast: PriceTrend
volatility_forecast: float
# Model performance
model_confidence: float
accuracy_score: Optional[float] = None # Populated after actual prices are known
mean_absolute_error: Optional[float] = None
mean_absolute_percentage_error: Optional[float] = None
# Input data used for forecast
input_data_summary: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
market_conditions_at_forecast: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
target_timestamp: datetime = Field(index=True) # When forecast is for
evaluated_at: Optional[datetime] = None # When forecast was evaluated
# Status and outcomes
forecast_status: str = Field(default="pending") # pending, evaluated, expired
outcome: Optional[str] = None # accurate, inaccurate, mixed
lessons_learned: List[str] = Field(default_factory=list, sa_column=Column(JSON))
class PricingOptimization(SQLModel, table=True):
"""Pricing optimization experiments and results"""
__tablename__ = "pricing_optimizations"
__table_args__ = {
"extend_existing": True,
"indexes": [
Index("idx_pricing_opt_provider", "provider_id"),
Index("idx_pricing_opt_experiment", "experiment_id"),
Index("idx_pricing_opt_status", "status"),
Index("idx_pricing_opt_created", "created_at")
]
}
id: str = Field(default_factory=lambda: f"po_{uuid4().hex[:12]}", primary_key=True)
experiment_id: str = Field(index=True)
provider_id: str = Field(index=True)
resource_type: Optional[ResourceType] = Field(default=None, index=True)
# Experiment configuration
experiment_name: str
experiment_type: str # ab_test, multivariate, optimization
hypothesis: str
control_strategy: PricingStrategyType
test_strategy: PricingStrategyType
# Experiment parameters
sample_size: int
confidence_level: float = Field(default=0.95)
statistical_power: float = Field(default=0.8)
minimum_detectable_effect: float
# Experiment scope
regions: List[str] = Field(default_factory=list, sa_column=Column(JSON))
duration_days: int
start_date: datetime
end_date: Optional[datetime] = None
# Results
control_performance: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
test_performance: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
statistical_significance: Optional[float] = None
effect_size: Optional[float] = None
# Business impact
revenue_impact: Optional[float] = None
profit_impact: Optional[float] = None
market_share_impact: Optional[float] = None
customer_satisfaction_impact: Optional[float] = None
# Status and metadata
status: str = Field(default="planned") # planned, running, completed, failed
conclusion: Optional[str] = None
recommendations: List[str] = Field(default_factory=list, sa_column=Column(JSON))
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
updated_at: datetime = Field(default_factory=datetime.utcnow)
completed_at: Optional[datetime] = None
# Audit trail
created_by: Optional[str] = None
reviewed_by: Optional[str] = None
approved_by: Optional[str] = None
class PricingAlert(SQLModel, table=True):
"""Pricing alerts and notifications"""
__tablename__ = "pricing_alerts"
__table_args__ = {
"extend_existing": True,
"indexes": [
Index("idx_pricing_alerts_provider", "provider_id"),
Index("idx_pricing_alerts_type", "alert_type"),
Index("idx_pricing_alerts_status", "status"),
Index("idx_pricing_alerts_severity", "severity"),
Index("idx_pricing_alerts_created", "created_at")
]
}
id: str = Field(default_factory=lambda: f"pa_{uuid4().hex[:12]}", primary_key=True)
provider_id: Optional[str] = Field(default=None, index=True)
resource_id: Optional[str] = Field(default=None, index=True)
resource_type: Optional[ResourceType] = Field(default=None, index=True)
# Alert details
alert_type: str = Field(index=True) # price_volatility, strategy_performance, market_change, etc.
severity: str = Field(index=True) # low, medium, high, critical
title: str
description: str
# Alert conditions
trigger_conditions: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
threshold_values: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
actual_values: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
# Alert context
market_conditions: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
strategy_context: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
historical_context: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
# Recommendations and actions
recommendations: List[str] = Field(default_factory=list, sa_column=Column(JSON))
automated_actions_taken: List[str] = Field(default_factory=list, sa_column=Column(JSON))
manual_actions_required: List[str] = Field(default_factory=list, sa_column=Column(JSON))
# Status and resolution
status: str = Field(default="active") # active, acknowledged, resolved, dismissed
resolution: Optional[str] = None
resolution_notes: Optional[str] = Field(default=None, sa_column=Text)
# Impact assessment
business_impact: Optional[str] = None
revenue_impact_estimate: Optional[float] = None
customer_impact_estimate: Optional[str] = None
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
first_seen: datetime = Field(default_factory=datetime.utcnow)
last_seen: datetime = Field(default_factory=datetime.utcnow)
acknowledged_at: Optional[datetime] = None
resolved_at: Optional[datetime] = None
# Communication
notification_sent: bool = Field(default=False)
notification_channels: List[str] = Field(default_factory=list, sa_column=Column(JSON))
escalation_level: int = Field(default=0)
class PricingRule(SQLModel, table=True):
"""Custom pricing rules and conditions"""
__tablename__ = "pricing_rules"
__table_args__ = {
"extend_existing": True,
"indexes": [
Index("idx_pricing_rules_provider", "provider_id"),
Index("idx_pricing_rules_strategy", "strategy_id"),
Index("idx_pricing_rules_active", "is_active"),
Index("idx_pricing_rules_priority", "priority")
]
}
id: str = Field(default_factory=lambda: f"pr_{uuid4().hex[:12]}", primary_key=True)
provider_id: Optional[str] = Field(default=None, index=True)
strategy_id: Optional[str] = Field(default=None, index=True)
# Rule definition
rule_name: str
rule_description: Optional[str] = None
rule_type: str # condition, action, constraint, optimization
# Rule logic
condition_expression: str = Field(..., description="Logical condition for rule")
action_expression: str = Field(..., description="Action to take when condition is met")
priority: int = Field(default=5, index=True) # 1-10 priority
# Rule scope
resource_types: List[ResourceType] = Field(default_factory=list, sa_column=Column(JSON))
regions: List[str] = Field(default_factory=list, sa_column=Column(JSON))
time_conditions: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
# Rule parameters
parameters: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
thresholds: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
multipliers: Dict[str, float] = Field(default_factory=dict, sa_column=Column(JSON))
# Status and execution
is_active: bool = Field(default=True, index=True)
execution_count: int = Field(default=0)
success_count: int = Field(default=0)
failure_count: int = Field(default=0)
last_executed: Optional[datetime] = None
last_success: Optional[datetime] = None
# Performance metrics
average_execution_time: Optional[float] = None
success_rate: float = Field(default=1.0)
business_impact: Optional[float] = None
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
expires_at: Optional[datetime] = None
# Audit trail
created_by: Optional[str] = None
updated_by: Optional[str] = None
version: int = Field(default=1)
change_log: List[Dict[str, Any]] = Field(default_factory=list, sa_column=Column(JSON))
class PricingAuditLog(SQLModel, table=True):
"""Audit log for pricing changes and decisions"""
__tablename__ = "pricing_audit_log"
__table_args__ = {
"extend_existing": True,
"indexes": [
Index("idx_pricing_audit_provider", "provider_id"),
Index("idx_pricing_audit_resource", "resource_id"),
Index("idx_pricing_audit_action", "action_type"),
Index("idx_pricing_audit_timestamp", "timestamp"),
Index("idx_pricing_audit_user", "user_id")
]
}
id: str = Field(default_factory=lambda: f"pal_{uuid4().hex[:12]}", primary_key=True)
provider_id: Optional[str] = Field(default=None, index=True)
resource_id: Optional[str] = Field(default=None, index=True)
user_id: Optional[str] = Field(default=None, index=True)
# Action details
action_type: str = Field(index=True) # price_change, strategy_update, rule_creation, etc.
action_description: str
action_source: str # manual, automated, api, system
# State changes
before_state: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
after_state: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
changed_fields: List[str] = Field(default_factory=list, sa_column=Column(JSON))
# Context and reasoning
decision_reasoning: Optional[str] = Field(default=None, sa_column=Text)
market_conditions: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
business_context: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
# Impact and outcomes
immediate_impact: Optional[Dict[str, float]] = Field(default_factory=dict, sa_column=Column(JSON))
expected_impact: Optional[Dict[str, float]] = Field(default_factory=dict, sa_column=Column(JSON))
actual_impact: Optional[Dict[str, float]] = Field(default_factory=dict, sa_column=Column(JSON))
# Compliance and approval
compliance_flags: List[str] = Field(default_factory=list, sa_column=Column(JSON))
approval_required: bool = Field(default=False)
approved_by: Optional[str] = None
approved_at: Optional[datetime] = None
# Technical details
api_endpoint: Optional[str] = None
request_id: Optional[str] = None
session_id: Optional[str] = None
ip_address: Optional[str] = None
# Timestamps
timestamp: datetime = Field(default_factory=datetime.utcnow, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
# Additional metadata
metadata: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON))
tags: List[str] = Field(default_factory=list, sa_column=Column(JSON))
# View definitions for common queries
class PricingSummaryView(SQLModel):
"""View for pricing summary analytics"""
__tablename__ = "pricing_summary_view"
provider_id: str
resource_type: ResourceType
region: str
current_price: float
price_trend: PriceTrend
price_volatility: float
utilization_rate: float
strategy_used: PricingStrategyType
strategy_effectiveness: float
last_updated: datetime
total_revenue_7d: float
market_share: float
class MarketHeatmapView(SQLModel):
"""View for market heatmap data"""
__tablename__ = "market_heatmap_view"
region: str
resource_type: ResourceType
demand_level: float
supply_level: float
average_price: float
price_volatility: float
utilization_rate: float
market_sentiment: float
competitor_count: int
timestamp: datetime

View File

@@ -0,0 +1,721 @@
"""
Pricing Strategies Domain Module
Defines various pricing strategies and their configurations for dynamic pricing
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from enum import Enum
from datetime import datetime
import json
class PricingStrategy(str, Enum):
"""Dynamic pricing strategy types"""
AGGRESSIVE_GROWTH = "aggressive_growth"
PROFIT_MAXIMIZATION = "profit_maximization"
MARKET_BALANCE = "market_balance"
COMPETITIVE_RESPONSE = "competitive_response"
DEMAND_ELASTICITY = "demand_elasticity"
PENETRATION_PRICING = "penetration_pricing"
PREMIUM_PRICING = "premium_pricing"
COST_PLUS = "cost_plus"
VALUE_BASED = "value_based"
COMPETITOR_BASED = "competitor_based"
class StrategyPriority(str, Enum):
"""Strategy priority levels"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class RiskTolerance(str, Enum):
"""Risk tolerance levels for pricing strategies"""
CONSERVATIVE = "conservative"
MODERATE = "moderate"
AGGRESSIVE = "aggressive"
@dataclass
class StrategyParameters:
"""Parameters for pricing strategy configuration"""
# Base pricing parameters
base_multiplier: float = 1.0
min_price_margin: float = 0.1 # 10% minimum margin
max_price_margin: float = 2.0 # 200% maximum margin
# Market sensitivity parameters
demand_sensitivity: float = 0.5 # 0-1, how much demand affects price
supply_sensitivity: float = 0.3 # 0-1, how much supply affects price
competition_sensitivity: float = 0.4 # 0-1, how much competition affects price
# Time-based parameters
peak_hour_multiplier: float = 1.2
off_peak_multiplier: float = 0.8
weekend_multiplier: float = 1.1
# Performance parameters
performance_bonus_rate: float = 0.1 # 10% bonus for high performance
performance_penalty_rate: float = 0.05 # 5% penalty for low performance
# Risk management parameters
max_price_change_percent: float = 0.3 # Maximum 30% change per update
volatility_threshold: float = 0.2 # Trigger for circuit breaker
confidence_threshold: float = 0.7 # Minimum confidence for price changes
# Strategy-specific parameters
growth_target_rate: float = 0.15 # 15% growth target for growth strategies
profit_target_margin: float = 0.25 # 25% profit target for profit strategies
market_share_target: float = 0.1 # 10% market share target
# Regional parameters
regional_adjustments: Dict[str, float] = field(default_factory=dict)
# Custom parameters
custom_parameters: Dict[str, Any] = field(default_factory=dict)
@dataclass
class StrategyRule:
"""Individual rule within a pricing strategy"""
rule_id: str
name: str
description: str
condition: str # Expression that evaluates to True/False
action: str # Action to take when condition is met
priority: StrategyPriority
enabled: bool = True
created_at: datetime = field(default_factory=datetime.utcnow)
# Rule execution tracking
execution_count: int = 0
last_executed: Optional[datetime] = None
success_rate: float = 1.0
@dataclass
class PricingStrategyConfig:
"""Complete configuration for a pricing strategy"""
strategy_id: str
name: str
description: str
strategy_type: PricingStrategy
parameters: StrategyParameters
rules: List[StrategyRule] = field(default_factory=list)
# Strategy metadata
risk_tolerance: RiskTolerance = RiskTolerance.MODERATE
priority: StrategyPriority = StrategyPriority.MEDIUM
auto_optimize: bool = True
learning_enabled: bool = True
# Strategy constraints
min_price: Optional[float] = None
max_price: Optional[float] = None
resource_types: List[str] = field(default_factory=list)
regions: List[str] = field(default_factory=list)
# Performance tracking
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
last_applied: Optional[datetime] = None
# Strategy effectiveness metrics
total_revenue_impact: float = 0.0
market_share_impact: float = 0.0
customer_satisfaction_impact: float = 0.0
strategy_effectiveness_score: float = 0.0
class StrategyLibrary:
"""Library of predefined pricing strategies"""
@staticmethod
def get_aggressive_growth_strategy() -> PricingStrategyConfig:
"""Get aggressive growth strategy configuration"""
parameters = StrategyParameters(
base_multiplier=0.85,
min_price_margin=0.05, # Lower margins for growth
max_price_margin=1.5,
demand_sensitivity=0.3, # Less sensitive to demand
supply_sensitivity=0.2,
competition_sensitivity=0.6, # Highly competitive
peak_hour_multiplier=1.1,
off_peak_multiplier=0.7,
weekend_multiplier=1.05,
performance_bonus_rate=0.05,
performance_penalty_rate=0.02,
growth_target_rate=0.25, # 25% growth target
market_share_target=0.15 # 15% market share target
)
rules = [
StrategyRule(
rule_id="growth_competitive_undercut",
name="Competitive Undercutting",
description="Undercut competitors by 5% to gain market share",
condition="competitor_price > 0 and current_price > competitor_price * 0.95",
action="set_price = competitor_price * 0.95",
priority=StrategyPriority.HIGH
),
StrategyRule(
rule_id="growth_volume_discount",
name="Volume Discount",
description="Offer discounts for high-volume customers",
condition="customer_volume > threshold and customer_loyalty < 6_months",
action="apply_discount = 0.1",
priority=StrategyPriority.MEDIUM
)
]
return PricingStrategyConfig(
strategy_id="aggressive_growth_v1",
name="Aggressive Growth Strategy",
description="Focus on rapid market share acquisition through competitive pricing",
strategy_type=PricingStrategy.AGGRESSIVE_GROWTH,
parameters=parameters,
rules=rules,
risk_tolerance=RiskTolerance.AGGRESSIVE,
priority=StrategyPriority.HIGH
)
@staticmethod
def get_profit_maximization_strategy() -> PricingStrategyConfig:
"""Get profit maximization strategy configuration"""
parameters = StrategyParameters(
base_multiplier=1.25,
min_price_margin=0.3, # Higher margins for profit
max_price_margin=3.0,
demand_sensitivity=0.7, # Highly sensitive to demand
supply_sensitivity=0.4,
competition_sensitivity=0.2, # Less competitive focus
peak_hour_multiplier=1.4,
off_peak_multiplier=1.0,
weekend_multiplier=1.2,
performance_bonus_rate=0.15,
performance_penalty_rate=0.08,
profit_target_margin=0.35, # 35% profit target
max_price_change_percent=0.2 # More conservative changes
)
rules = [
StrategyRule(
rule_id="profit_demand_premium",
name="Demand Premium Pricing",
description="Apply premium pricing during high demand periods",
condition="demand_level > 0.8 and competitor_capacity < 0.7",
action="set_price = current_price * 1.3",
priority=StrategyPriority.CRITICAL
),
StrategyRule(
rule_id="profit_performance_premium",
name="Performance Premium",
description="Charge premium for high-performance resources",
condition="performance_score > 0.9 and customer_satisfaction > 0.85",
action="apply_premium = 0.2",
priority=StrategyPriority.HIGH
)
]
return PricingStrategyConfig(
strategy_id="profit_maximization_v1",
name="Profit Maximization Strategy",
description="Maximize profit margins through premium pricing and demand capture",
strategy_type=PricingStrategy.PROFIT_MAXIMIZATION,
parameters=parameters,
rules=rules,
risk_tolerance=RiskTolerance.MODERATE,
priority=StrategyPriority.HIGH
)
@staticmethod
def get_market_balance_strategy() -> PricingStrategyConfig:
"""Get market balance strategy configuration"""
parameters = StrategyParameters(
base_multiplier=1.0,
min_price_margin=0.15,
max_price_margin=2.0,
demand_sensitivity=0.5,
supply_sensitivity=0.3,
competition_sensitivity=0.4,
peak_hour_multiplier=1.2,
off_peak_multiplier=0.8,
weekend_multiplier=1.1,
performance_bonus_rate=0.1,
performance_penalty_rate=0.05,
volatility_threshold=0.15, # Lower volatility threshold
confidence_threshold=0.8 # Higher confidence requirement
)
rules = [
StrategyRule(
rule_id="balance_market_follow",
name="Market Following",
description="Follow market trends while maintaining stability",
condition="market_trend == increasing and price_position < market_average",
action="adjust_price = market_average * 0.98",
priority=StrategyPriority.MEDIUM
),
StrategyRule(
rule_id="balance_stability_maintain",
name="Stability Maintenance",
description="Maintain price stability during volatile periods",
condition="volatility > 0.15 and confidence < 0.7",
action="freeze_price = true",
priority=StrategyPriority.HIGH
)
]
return PricingStrategyConfig(
strategy_id="market_balance_v1",
name="Market Balance Strategy",
description="Maintain balanced pricing that follows market trends while ensuring stability",
strategy_type=PricingStrategy.MARKET_BALANCE,
parameters=parameters,
rules=rules,
risk_tolerance=RiskTolerance.MODERATE,
priority=StrategyPriority.MEDIUM
)
@staticmethod
def get_competitive_response_strategy() -> PricingStrategyConfig:
"""Get competitive response strategy configuration"""
parameters = StrategyParameters(
base_multiplier=0.95,
min_price_margin=0.1,
max_price_margin=1.8,
demand_sensitivity=0.4,
supply_sensitivity=0.3,
competition_sensitivity=0.8, # Highly competitive
peak_hour_multiplier=1.15,
off_peak_multiplier=0.85,
weekend_multiplier=1.05,
performance_bonus_rate=0.08,
performance_penalty_rate=0.03
)
rules = [
StrategyRule(
rule_id="competitive_price_match",
name="Price Matching",
description="Match or beat competitor prices",
condition="competitor_price < current_price * 0.95",
action="set_price = competitor_price * 0.98",
priority=StrategyPriority.CRITICAL
),
StrategyRule(
rule_id="competitive_promotion_response",
name="Promotion Response",
description="Respond to competitor promotions",
condition="competitor_promotion == true and market_share_declining",
action="apply_promotion = competitor_promotion_rate * 1.1",
priority=StrategyPriority.HIGH
)
]
return PricingStrategyConfig(
strategy_id="competitive_response_v1",
name="Competitive Response Strategy",
description="Reactively respond to competitor pricing actions to maintain market position",
strategy_type=PricingStrategy.COMPETITIVE_RESPONSE,
parameters=parameters,
rules=rules,
risk_tolerance=RiskTolerance.MODERATE,
priority=StrategyPriority.HIGH
)
@staticmethod
def get_demand_elasticity_strategy() -> PricingStrategyConfig:
"""Get demand elasticity strategy configuration"""
parameters = StrategyParameters(
base_multiplier=1.0,
min_price_margin=0.12,
max_price_margin=2.2,
demand_sensitivity=0.8, # Highly sensitive to demand
supply_sensitivity=0.3,
competition_sensitivity=0.4,
peak_hour_multiplier=1.3,
off_peak_multiplier=0.7,
weekend_multiplier=1.1,
performance_bonus_rate=0.1,
performance_penalty_rate=0.05,
max_price_change_percent=0.4 # Allow larger changes for elasticity
)
rules = [
StrategyRule(
rule_id="elasticity_demand_capture",
name="Demand Capture",
description="Aggressively price to capture demand surges",
condition="demand_growth_rate > 0.2 and supply_constraint == true",
action="set_price = current_price * 1.25",
priority=StrategyPriority.HIGH
),
StrategyRule(
rule_id="elasticity_demand_stimulation",
name="Demand Stimulation",
description="Lower prices to stimulate demand during lulls",
condition="demand_level < 0.4 and inventory_turnover < threshold",
action="apply_discount = 0.15",
priority=StrategyPriority.MEDIUM
)
]
return PricingStrategyConfig(
strategy_id="demand_elasticity_v1",
name="Demand Elasticity Strategy",
description="Dynamically adjust prices based on demand elasticity to optimize revenue",
strategy_type=PricingStrategy.DEMAND_ELASTICITY,
parameters=parameters,
rules=rules,
risk_tolerance=RiskTolerance.AGGRESSIVE,
priority=StrategyPriority.MEDIUM
)
@staticmethod
def get_penetration_pricing_strategy() -> PricingStrategyConfig:
"""Get penetration pricing strategy configuration"""
parameters = StrategyParameters(
base_multiplier=0.7, # Low initial prices
min_price_margin=0.05,
max_price_margin=1.5,
demand_sensitivity=0.3,
supply_sensitivity=0.2,
competition_sensitivity=0.7,
peak_hour_multiplier=1.0,
off_peak_multiplier=0.6,
weekend_multiplier=0.9,
growth_target_rate=0.3, # 30% growth target
market_share_target=0.2 # 20% market share target
)
rules = [
StrategyRule(
rule_id="penetration_market_entry",
name="Market Entry Pricing",
description="Very low prices for new market entry",
condition="market_share < 0.05 and time_in_market < 6_months",
action="set_price = cost * 1.1",
priority=StrategyPriority.CRITICAL
),
StrategyRule(
rule_id="penetration_gradual_increase",
name="Gradual Price Increase",
description="Gradually increase prices after market penetration",
condition="market_share > 0.1 and customer_loyalty > 12_months",
action="increase_price = 0.05",
priority=StrategyPriority.MEDIUM
)
]
return PricingStrategyConfig(
strategy_id="penetration_pricing_v1",
name="Penetration Pricing Strategy",
description="Low initial prices to gain market share, followed by gradual increases",
strategy_type=PricingStrategy.PENETRATION_PRICING,
parameters=parameters,
rules=rules,
risk_tolerance=RiskTolerance.AGGRESSIVE,
priority=StrategyPriority.HIGH
)
@staticmethod
def get_premium_pricing_strategy() -> PricingStrategyConfig:
"""Get premium pricing strategy configuration"""
parameters = StrategyParameters(
base_multiplier=1.8, # High base prices
min_price_margin=0.5,
max_price_margin=4.0,
demand_sensitivity=0.2, # Less sensitive to demand
supply_sensitivity=0.3,
competition_sensitivity=0.1, # Ignore competition
peak_hour_multiplier=1.5,
off_peak_multiplier=1.2,
weekend_multiplier=1.4,
performance_bonus_rate=0.2,
performance_penalty_rate=0.1,
profit_target_margin=0.4 # 40% profit target
)
rules = [
StrategyRule(
rule_id="premium_quality_assurance",
name="Quality Assurance Premium",
description="Maintain premium pricing for quality assurance",
condition="quality_score > 0.95 and brand_recognition > high",
action="maintain_premium = true",
priority=StrategyPriority.CRITICAL
),
StrategyRule(
rule_id="premium_exclusivity",
name="Exclusivity Pricing",
description="Premium pricing for exclusive features",
condition="exclusive_features == true and customer_segment == premium",
action="apply_premium = 0.3",
priority=StrategyPriority.HIGH
)
]
return PricingStrategyConfig(
strategy_id="premium_pricing_v1",
name="Premium Pricing Strategy",
description="High-end pricing strategy focused on quality and exclusivity",
strategy_type=PricingStrategy.PREMIUM_PRICING,
parameters=parameters,
rules=rules,
risk_tolerance=RiskTolerance.CONSERVATIVE,
priority=StrategyPriority.MEDIUM
)
@staticmethod
def get_all_strategies() -> Dict[PricingStrategy, PricingStrategyConfig]:
"""Get all available pricing strategies"""
return {
PricingStrategy.AGGRESSIVE_GROWTH: StrategyLibrary.get_aggressive_growth_strategy(),
PricingStrategy.PROFIT_MAXIMIZATION: StrategyLibrary.get_profit_maximization_strategy(),
PricingStrategy.MARKET_BALANCE: StrategyLibrary.get_market_balance_strategy(),
PricingStrategy.COMPETITIVE_RESPONSE: StrategyLibrary.get_competitive_response_strategy(),
PricingStrategy.DEMAND_ELASTICITY: StrategyLibrary.get_demand_elasticity_strategy(),
PricingStrategy.PENETRATION_PRICING: StrategyLibrary.get_penetration_pricing_strategy(),
PricingStrategy.PREMIUM_PRICING: StrategyLibrary.get_premium_pricing_strategy()
}
class StrategyOptimizer:
"""Optimizes pricing strategies based on performance data"""
def __init__(self):
self.performance_history: Dict[str, List[Dict[str, Any]]] = {}
self.optimization_rules = self._initialize_optimization_rules()
def optimize_strategy(
self,
strategy_config: PricingStrategyConfig,
performance_data: Dict[str, Any]
) -> PricingStrategyConfig:
"""Optimize strategy parameters based on performance"""
strategy_id = strategy_config.strategy_id
# Store performance data
if strategy_id not in self.performance_history:
self.performance_history[strategy_id] = []
self.performance_history[strategy_id].append({
"timestamp": datetime.utcnow(),
"performance": performance_data
})
# Apply optimization rules
optimized_config = self._apply_optimization_rules(strategy_config, performance_data)
# Update strategy effectiveness score
optimized_config.strategy_effectiveness_score = self._calculate_effectiveness_score(
performance_data
)
return optimized_config
def _initialize_optimization_rules(self) -> List[Dict[str, Any]]:
"""Initialize optimization rules"""
return [
{
"name": "Revenue Optimization",
"condition": "revenue_growth < target and price_elasticity > 0.5",
"action": "decrease_base_multiplier",
"adjustment": -0.05
},
{
"name": "Margin Protection",
"condition": "profit_margin < minimum and demand_inelastic",
"action": "increase_base_multiplier",
"adjustment": 0.03
},
{
"name": "Market Share Growth",
"condition": "market_share_declining and competitive_pressure_high",
"action": "increase_competition_sensitivity",
"adjustment": 0.1
},
{
"name": "Volatility Reduction",
"condition": "price_volatility > threshold and customer_complaints_high",
"action": "decrease_max_price_change",
"adjustment": -0.1
},
{
"name": "Demand Capture",
"condition": "demand_surge_detected and capacity_available",
"action": "increase_demand_sensitivity",
"adjustment": 0.15
}
]
def _apply_optimization_rules(
self,
strategy_config: PricingStrategyConfig,
performance_data: Dict[str, Any]
) -> PricingStrategyConfig:
"""Apply optimization rules to strategy configuration"""
# Create a copy to avoid modifying the original
optimized_config = PricingStrategyConfig(
strategy_id=strategy_config.strategy_id,
name=strategy_config.name,
description=strategy_config.description,
strategy_type=strategy_config.strategy_type,
parameters=StrategyParameters(
base_multiplier=strategy_config.parameters.base_multiplier,
min_price_margin=strategy_config.parameters.min_price_margin,
max_price_margin=strategy_config.parameters.max_price_margin,
demand_sensitivity=strategy_config.parameters.demand_sensitivity,
supply_sensitivity=strategy_config.parameters.supply_sensitivity,
competition_sensitivity=strategy_config.parameters.competition_sensitivity,
peak_hour_multiplier=strategy_config.parameters.peak_hour_multiplier,
off_peak_multiplier=strategy_config.parameters.off_peak_multiplier,
weekend_multiplier=strategy_config.parameters.weekend_multiplier,
performance_bonus_rate=strategy_config.parameters.performance_bonus_rate,
performance_penalty_rate=strategy_config.parameters.performance_penalty_rate,
max_price_change_percent=strategy_config.parameters.max_price_change_percent,
volatility_threshold=strategy_config.parameters.volatility_threshold,
confidence_threshold=strategy_config.parameters.confidence_threshold,
growth_target_rate=strategy_config.parameters.growth_target_rate,
profit_target_margin=strategy_config.parameters.profit_target_margin,
market_share_target=strategy_config.parameters.market_share_target,
regional_adjustments=strategy_config.parameters.regional_adjustments.copy(),
custom_parameters=strategy_config.parameters.custom_parameters.copy()
),
rules=strategy_config.rules.copy(),
risk_tolerance=strategy_config.risk_tolerance,
priority=strategy_config.priority,
auto_optimize=strategy_config.auto_optimize,
learning_enabled=strategy_config.learning_enabled,
min_price=strategy_config.min_price,
max_price=strategy_config.max_price,
resource_types=strategy_config.resource_types.copy(),
regions=strategy_config.regions.copy()
)
# Apply each optimization rule
for rule in self.optimization_rules:
if self._evaluate_rule_condition(rule["condition"], performance_data):
self._apply_rule_action(optimized_config, rule["action"], rule["adjustment"])
return optimized_config
def _evaluate_rule_condition(self, condition: str, performance_data: Dict[str, Any]) -> bool:
"""Evaluate optimization rule condition"""
# Simple condition evaluation (in production, use a proper expression evaluator)
try:
# Replace variables with actual values
condition_eval = condition
# Common performance metrics
metrics = {
"revenue_growth": performance_data.get("revenue_growth", 0),
"price_elasticity": performance_data.get("price_elasticity", 0.5),
"profit_margin": performance_data.get("profit_margin", 0.2),
"market_share_declining": performance_data.get("market_share_declining", False),
"competitive_pressure_high": performance_data.get("competitive_pressure_high", False),
"price_volatility": performance_data.get("price_volatility", 0.1),
"customer_complaints_high": performance_data.get("customer_complaints_high", False),
"demand_surge_detected": performance_data.get("demand_surge_detected", False),
"capacity_available": performance_data.get("capacity_available", True)
}
# Simple condition parsing
for key, value in metrics.items():
condition_eval = condition_eval.replace(key, str(value))
# Evaluate simple conditions
if "and" in condition_eval:
parts = condition_eval.split(" and ")
return all(self._evaluate_simple_condition(part.strip()) for part in parts)
else:
return self._evaluate_simple_condition(condition_eval.strip())
except Exception as e:
return False
def _evaluate_simple_condition(self, condition: str) -> bool:
"""Evaluate a simple condition"""
try:
# Handle common comparison operators
if "<" in condition:
left, right = condition.split("<", 1)
return float(left.strip()) < float(right.strip())
elif ">" in condition:
left, right = condition.split(">", 1)
return float(left.strip()) > float(right.strip())
elif "==" in condition:
left, right = condition.split("==", 1)
return left.strip() == right.strip()
elif "True" in condition:
return True
elif "False" in condition:
return False
else:
return bool(condition)
except Exception:
return False
def _apply_rule_action(self, config: PricingStrategyConfig, action: str, adjustment: float):
"""Apply optimization rule action"""
if action == "decrease_base_multiplier":
config.parameters.base_multiplier = max(0.5, config.parameters.base_multiplier + adjustment)
elif action == "increase_base_multiplier":
config.parameters.base_multiplier = min(2.0, config.parameters.base_multiplier + adjustment)
elif action == "increase_competition_sensitivity":
config.parameters.competition_sensitivity = min(1.0, config.parameters.competition_sensitivity + adjustment)
elif action == "decrease_max_price_change":
config.parameters.max_price_change_percent = max(0.1, config.parameters.max_price_change_percent + adjustment)
elif action == "increase_demand_sensitivity":
config.parameters.demand_sensitivity = min(1.0, config.parameters.demand_sensitivity + adjustment)
def _calculate_effectiveness_score(self, performance_data: Dict[str, Any]) -> float:
"""Calculate overall strategy effectiveness score"""
# Weight different performance metrics
weights = {
"revenue_growth": 0.3,
"profit_margin": 0.25,
"market_share": 0.2,
"customer_satisfaction": 0.15,
"price_stability": 0.1
}
score = 0.0
total_weight = 0.0
for metric, weight in weights.items():
if metric in performance_data:
value = performance_data[metric]
# Normalize values to 0-1 scale
if metric in ["revenue_growth", "profit_margin", "market_share", "customer_satisfaction"]:
normalized_value = min(1.0, max(0.0, value))
else: # price_stability (lower is better, so invert)
normalized_value = min(1.0, max(0.0, 1.0 - value))
score += normalized_value * weight
total_weight += weight
return score / total_weight if total_weight > 0 else 0.5