- Change file mode from 644 to 755 for all project files - Add chain_id parameter to get_balance RPC endpoint with default "ait-devnet" - Rename Miner.extra_meta_data to extra_metadata for consistency
266 lines
11 KiB
Python
Executable File
266 lines
11 KiB
Python
Executable File
"""
|
|
Marketplace Adaptive Resource Scaler
|
|
Implements predictive and reactive auto-scaling of marketplace resources based on demand.
|
|
"""
|
|
|
|
import time
|
|
import asyncio
|
|
import logging
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from datetime import datetime, timedelta
|
|
import math
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ScalingPolicy:
|
|
"""Configuration for scaling behavior"""
|
|
def __init__(
|
|
self,
|
|
min_nodes: int = 2,
|
|
max_nodes: int = 100,
|
|
target_utilization: float = 0.75,
|
|
scale_up_threshold: float = 0.85,
|
|
scale_down_threshold: float = 0.40,
|
|
cooldown_period_sec: int = 300, # 5 minutes between scaling actions
|
|
predictive_scaling: bool = True
|
|
):
|
|
self.min_nodes = min_nodes
|
|
self.max_nodes = max_nodes
|
|
self.target_utilization = target_utilization
|
|
self.scale_up_threshold = scale_up_threshold
|
|
self.scale_down_threshold = scale_down_threshold
|
|
self.cooldown_period_sec = cooldown_period_sec
|
|
self.predictive_scaling = predictive_scaling
|
|
|
|
class ResourceScaler:
|
|
"""Adaptive resource scaling engine for the AITBC marketplace"""
|
|
|
|
def __init__(self, policy: Optional[ScalingPolicy] = None):
|
|
self.policy = policy or ScalingPolicy()
|
|
|
|
# Current state
|
|
self.current_nodes = self.policy.min_nodes
|
|
self.active_gpu_nodes = 0
|
|
self.active_cpu_nodes = self.policy.min_nodes
|
|
|
|
self.last_scaling_action_time = 0
|
|
self.scaling_history = []
|
|
|
|
# Historical demand tracking for predictive scaling
|
|
# Format: hour_of_week (0-167) -> avg_utilization
|
|
self.historical_demand = {}
|
|
|
|
self.is_running = False
|
|
self._scaler_task = None
|
|
|
|
async def start(self):
|
|
if self.is_running:
|
|
return
|
|
self.is_running = True
|
|
self._scaler_task = asyncio.create_task(self._scaling_loop())
|
|
logger.info(f"Resource Scaler started (Min: {self.policy.min_nodes}, Max: {self.policy.max_nodes})")
|
|
|
|
async def stop(self):
|
|
self.is_running = False
|
|
if self._scaler_task:
|
|
self._scaler_task.cancel()
|
|
logger.info("Resource Scaler stopped")
|
|
|
|
def update_historical_demand(self, utilization: float):
|
|
"""Update historical data for predictive scaling"""
|
|
now = datetime.utcnow()
|
|
hour_of_week = now.weekday() * 24 + now.hour
|
|
|
|
if hour_of_week not in self.historical_demand:
|
|
self.historical_demand[hour_of_week] = utilization
|
|
else:
|
|
# Exponential moving average (favor recent data)
|
|
current_avg = self.historical_demand[hour_of_week]
|
|
self.historical_demand[hour_of_week] = (current_avg * 0.9) + (utilization * 0.1)
|
|
|
|
def _predict_demand(self, lookahead_hours: int = 1) -> float:
|
|
"""Predict expected utilization based on historical patterns"""
|
|
if not self.policy.predictive_scaling or not self.historical_demand:
|
|
return 0.0
|
|
|
|
now = datetime.utcnow()
|
|
target_hour = (now.weekday() * 24 + now.hour + lookahead_hours) % 168
|
|
|
|
# If we have exact data for that hour
|
|
if target_hour in self.historical_demand:
|
|
return self.historical_demand[target_hour]
|
|
|
|
# Find nearest available data points
|
|
available_hours = sorted(self.historical_demand.keys())
|
|
if not available_hours:
|
|
return 0.0
|
|
|
|
# Simplistic interpolation
|
|
return sum(self.historical_demand.values()) / len(self.historical_demand)
|
|
|
|
async def _scaling_loop(self):
|
|
"""Background task that evaluates scaling rules periodically"""
|
|
while self.is_running:
|
|
try:
|
|
# In a real system, we'd fetch this from the Monitor or Coordinator
|
|
# Here we simulate fetching current metrics
|
|
current_utilization = self._get_current_utilization()
|
|
current_queue_depth = self._get_queue_depth()
|
|
|
|
self.update_historical_demand(current_utilization)
|
|
|
|
await self.evaluate_scaling(current_utilization, current_queue_depth)
|
|
|
|
# Check every 10 seconds
|
|
await asyncio.sleep(10.0)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in scaling loop: {e}")
|
|
await asyncio.sleep(10.0)
|
|
|
|
async def evaluate_scaling(self, current_utilization: float, queue_depth: int) -> Optional[Dict[str, Any]]:
|
|
"""Evaluate if scaling action is needed and execute if necessary"""
|
|
now = time.time()
|
|
|
|
# Check cooldown
|
|
if now - self.last_scaling_action_time < self.policy.cooldown_period_sec:
|
|
return None
|
|
|
|
predicted_utilization = self._predict_demand()
|
|
|
|
# Determine target node count
|
|
target_nodes = self.current_nodes
|
|
action = None
|
|
reason = ""
|
|
|
|
# Scale UP conditions
|
|
if current_utilization > self.policy.scale_up_threshold or queue_depth > self.current_nodes * 5:
|
|
# Reactive scale up
|
|
desired_increase = math.ceil(self.current_nodes * (current_utilization / self.policy.target_utilization - 1.0))
|
|
# Ensure we add at least 1, but bounded by queue depth and max_nodes
|
|
nodes_to_add = max(1, min(desired_increase, max(1, queue_depth // 2)))
|
|
|
|
target_nodes = min(self.policy.max_nodes, self.current_nodes + nodes_to_add)
|
|
|
|
if target_nodes > self.current_nodes:
|
|
action = "scale_up"
|
|
reason = f"High utilization ({current_utilization*100:.1f}%) or queue depth ({queue_depth})"
|
|
|
|
elif self.policy.predictive_scaling and predicted_utilization > self.policy.scale_up_threshold:
|
|
# Predictive scale up (proactive)
|
|
# Add nodes more conservatively for predictive scaling
|
|
target_nodes = min(self.policy.max_nodes, self.current_nodes + 1)
|
|
|
|
if target_nodes > self.current_nodes:
|
|
action = "scale_up"
|
|
reason = f"Predictive scaling (expected {predicted_utilization*100:.1f}% util)"
|
|
|
|
# Scale DOWN conditions
|
|
elif current_utilization < self.policy.scale_down_threshold and queue_depth == 0:
|
|
# Only scale down if predicted utilization is also low
|
|
if not self.policy.predictive_scaling or predicted_utilization < self.policy.target_utilization:
|
|
# Remove nodes conservatively
|
|
nodes_to_remove = max(1, int(self.current_nodes * 0.2))
|
|
target_nodes = max(self.policy.min_nodes, self.current_nodes - nodes_to_remove)
|
|
|
|
if target_nodes < self.current_nodes:
|
|
action = "scale_down"
|
|
reason = f"Low utilization ({current_utilization*100:.1f}%)"
|
|
|
|
# Execute scaling if needed
|
|
if action and target_nodes != self.current_nodes:
|
|
diff = abs(target_nodes - self.current_nodes)
|
|
|
|
result = await self._execute_scaling(action, diff, target_nodes)
|
|
|
|
record = {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"action": action,
|
|
"nodes_changed": diff,
|
|
"new_total": target_nodes,
|
|
"reason": reason,
|
|
"metrics_at_time": {
|
|
"utilization": current_utilization,
|
|
"queue_depth": queue_depth,
|
|
"predicted_utilization": predicted_utilization
|
|
}
|
|
}
|
|
|
|
self.scaling_history.append(record)
|
|
# Keep history manageable
|
|
if len(self.scaling_history) > 1000:
|
|
self.scaling_history = self.scaling_history[-1000:]
|
|
|
|
self.last_scaling_action_time = now
|
|
self.current_nodes = target_nodes
|
|
|
|
logger.info(f"Auto-scaler: {action.upper()} to {target_nodes} nodes. Reason: {reason}")
|
|
return record
|
|
|
|
return None
|
|
|
|
async def _execute_scaling(self, action: str, count: int, new_total: int) -> bool:
|
|
"""Execute the actual scaling action (e.g. interacting with Kubernetes/Docker/Cloud provider)"""
|
|
# In this implementation, we simulate the scaling delay
|
|
# In production, this would call cloud APIs (AWS AutoScaling, K8s Scale, etc.)
|
|
logger.debug(f"Executing {action} by {count} nodes...")
|
|
|
|
# Simulate API delay
|
|
await asyncio.sleep(2.0)
|
|
|
|
if action == "scale_up":
|
|
# Simulate provisioning new instances
|
|
# We assume a mix of CPU and GPU instances based on demand
|
|
new_gpus = count // 2
|
|
new_cpus = count - new_gpus
|
|
self.active_gpu_nodes += new_gpus
|
|
self.active_cpu_nodes += new_cpus
|
|
elif action == "scale_down":
|
|
# Simulate de-provisioning
|
|
# Prefer removing CPU nodes first if we have GPU ones
|
|
remove_cpus = min(count, max(0, self.active_cpu_nodes - self.policy.min_nodes))
|
|
remove_gpus = count - remove_cpus
|
|
|
|
self.active_cpu_nodes -= remove_cpus
|
|
self.active_gpu_nodes = max(0, self.active_gpu_nodes - remove_gpus)
|
|
|
|
return True
|
|
|
|
# --- Simulation helpers ---
|
|
def _get_current_utilization(self) -> float:
|
|
"""Simulate getting current cluster utilization"""
|
|
# In reality, fetch from MarketplaceMonitor or Coordinator
|
|
import random
|
|
# Base utilization with some noise
|
|
base = 0.6
|
|
return max(0.1, min(0.99, base + random.uniform(-0.2, 0.3)))
|
|
|
|
def _get_queue_depth(self) -> int:
|
|
"""Simulate getting current queue depth"""
|
|
import random
|
|
if random.random() > 0.8:
|
|
return random.randint(10, 50)
|
|
return random.randint(0, 5)
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get current scaler status"""
|
|
return {
|
|
"status": "running" if self.is_running else "stopped",
|
|
"current_nodes": {
|
|
"total": self.current_nodes,
|
|
"cpu_nodes": self.active_cpu_nodes,
|
|
"gpu_nodes": self.active_gpu_nodes
|
|
},
|
|
"policy": {
|
|
"min_nodes": self.policy.min_nodes,
|
|
"max_nodes": self.policy.max_nodes,
|
|
"target_utilization": self.policy.target_utilization
|
|
},
|
|
"last_action": self.scaling_history[-1] if self.scaling_history else None,
|
|
"prediction": {
|
|
"next_hour_utilization_estimate": round(self._predict_demand(1), 3)
|
|
}
|
|
}
|