Files
aitbc/dev/scripts/production_monitoring.py
oib 50954a4b31 chore(systemd): remove obsolete systemd service files and update infrastructure documentation
- Remove 8 unused systemd service files from coordinator-api/systemd/
  - aitbc-adaptive-learning.service (port 8005)
  - aitbc-advanced-ai.service
  - aitbc-enterprise-api.service
  - aitbc-gpu-multimodal.service (port 8003)
  - aitbc-marketplace-enhanced.service (port 8006)
  - aitbc-modality-optimization.service (port 8004)
  - aitbc-multimodal.service (port 8002)
  - aitbc-openclaw-enhanced.service (port 8007
2026-03-04 12:16:50 +01:00

719 lines
27 KiB
Python

"""
AITBC Production Monitoring and Analytics
This module provides comprehensive monitoring and analytics capabilities
for the AITBC production environment, including metrics collection,
alerting, and dashboard generation.
"""
import asyncio
import json
import logging
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from pathlib import Path
import subprocess
import psutil
import aiohttp
import statistics
@dataclass
class SystemMetrics:
"""System performance metrics."""
timestamp: float
cpu_percent: float
memory_percent: float
disk_usage: float
network_io: Dict[str, int]
process_count: int
load_average: List[float]
@dataclass
class ApplicationMetrics:
"""Application performance metrics."""
timestamp: float
active_users: int
api_requests: int
response_time_avg: float
response_time_p95: float
error_rate: float
throughput: float
cache_hit_rate: float
@dataclass
class BlockchainMetrics:
"""Blockchain network metrics."""
timestamp: float
block_height: int
gas_price: float
transaction_count: int
network_hashrate: float
peer_count: int
sync_status: str
@dataclass
class SecurityMetrics:
"""Security monitoring metrics."""
timestamp: float
failed_logins: int
suspicious_ips: int
security_events: int
vulnerability_scans: int
blocked_requests: int
audit_log_entries: int
class ProductionMonitor:
"""Production monitoring system."""
def __init__(self, config_path: str = "config/monitoring_config.json"):
self.config = self._load_config(config_path)
self.logger = self._setup_logging()
self.metrics_history = {
"system": [],
"application": [],
"blockchain": [],
"security": []
}
self.alerts = []
self.dashboards = {}
def _load_config(self, config_path: str) -> Dict:
"""Load monitoring configuration."""
default_config = {
"collection_interval": 60, # seconds
"retention_days": 30,
"alert_thresholds": {
"cpu_percent": 80,
"memory_percent": 85,
"disk_usage": 90,
"error_rate": 5.0,
"response_time_p95": 2000, # ms
"failed_logins": 10,
"security_events": 5
},
"endpoints": {
"health": "https://api.aitbc.dev/health",
"metrics": "https://api.aitbc.dev/metrics",
"blockchain": "https://api.aitbc.dev/blockchain/stats",
"security": "https://api.aitbc.dev/security/stats"
},
"notifications": {
"slack_webhook": os.getenv("SLACK_WEBHOOK_URL"),
"email_smtp": os.getenv("SMTP_SERVER"),
"pagerduty_key": os.getenv("PAGERDUTY_KEY")
}
}
config_file = Path(config_path)
if config_file.exists():
with open(config_file, 'r') as f:
user_config = json.load(f)
default_config.update(user_config)
return default_config
def _setup_logging(self) -> logging.Logger:
"""Setup logging for monitoring system."""
logger = logging.getLogger("production_monitor")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
async def collect_system_metrics(self) -> SystemMetrics:
"""Collect system performance metrics."""
try:
# CPU metrics
cpu_percent = psutil.cpu_percent(interval=1)
load_avg = list(psutil.getloadavg())
# Memory metrics
memory = psutil.virtual_memory()
memory_percent = memory.percent
# Disk metrics
disk = psutil.disk_usage('/')
disk_usage = (disk.used / disk.total) * 100
# Network metrics
network = psutil.net_io_counters()
network_io = {
"bytes_sent": network.bytes_sent,
"bytes_recv": network.bytes_recv,
"packets_sent": network.packets_sent,
"packets_recv": network.packets_recv
}
# Process metrics
process_count = len(psutil.pids())
return SystemMetrics(
timestamp=time.time(),
cpu_percent=cpu_percent,
memory_percent=memory_percent,
disk_usage=disk_usage,
network_io=network_io,
process_count=process_count,
load_average=load_avg
)
except Exception as e:
self.logger.error(f"Error collecting system metrics: {e}")
return None
async def collect_application_metrics(self) -> ApplicationMetrics:
"""Collect application performance metrics."""
try:
async with aiohttp.ClientSession() as session:
# Get metrics from application
async with session.get(self.config["endpoints"]["metrics"]) as response:
if response.status == 200:
data = await response.json()
return ApplicationMetrics(
timestamp=time.time(),
active_users=data.get("active_users", 0),
api_requests=data.get("api_requests", 0),
response_time_avg=data.get("response_time_avg", 0),
response_time_p95=data.get("response_time_p95", 0),
error_rate=data.get("error_rate", 0),
throughput=data.get("throughput", 0),
cache_hit_rate=data.get("cache_hit_rate", 0)
)
# Fallback metrics if API is unavailable
return ApplicationMetrics(
timestamp=time.time(),
active_users=0,
api_requests=0,
response_time_avg=0,
response_time_p95=0,
error_rate=0,
throughput=0,
cache_hit_rate=0
)
except Exception as e:
self.logger.error(f"Error collecting application metrics: {e}")
return None
async def collect_blockchain_metrics(self) -> BlockchainMetrics:
"""Collect blockchain network metrics."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.config["endpoints"]["blockchain"]) as response:
if response.status == 200:
data = await response.json()
return BlockchainMetrics(
timestamp=time.time(),
block_height=data.get("block_height", 0),
gas_price=data.get("gas_price", 0),
transaction_count=data.get("transaction_count", 0),
network_hashrate=data.get("network_hashrate", 0),
peer_count=data.get("peer_count", 0),
sync_status=data.get("sync_status", "unknown")
)
return BlockchainMetrics(
timestamp=time.time(),
block_height=0,
gas_price=0,
transaction_count=0,
network_hashrate=0,
peer_count=0,
sync_status="unknown"
)
except Exception as e:
self.logger.error(f"Error collecting blockchain metrics: {e}")
return None
async def collect_security_metrics(self) -> SecurityMetrics:
"""Collect security monitoring metrics."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.config["endpoints"]["security"]) as response:
if response.status == 200:
data = await response.json()
return SecurityMetrics(
timestamp=time.time(),
failed_logins=data.get("failed_logins", 0),
suspicious_ips=data.get("suspicious_ips", 0),
security_events=data.get("security_events", 0),
vulnerability_scans=data.get("vulnerability_scans", 0),
blocked_requests=data.get("blocked_requests", 0),
audit_log_entries=data.get("audit_log_entries", 0)
)
return SecurityMetrics(
timestamp=time.time(),
failed_logins=0,
suspicious_ips=0,
security_events=0,
vulnerability_scans=0,
blocked_requests=0,
audit_log_entries=0
)
except Exception as e:
self.logger.error(f"Error collecting security metrics: {e}")
return None
async def collect_all_metrics(self) -> Dict[str, Any]:
"""Collect all metrics."""
tasks = [
self.collect_system_metrics(),
self.collect_application_metrics(),
self.collect_blockchain_metrics(),
self.collect_security_metrics()
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
"system": results[0] if not isinstance(results[0], Exception) else None,
"application": results[1] if not isinstance(results[1], Exception) else None,
"blockchain": results[2] if not isinstance(results[2], Exception) else None,
"security": results[3] if not isinstance(results[3], Exception) else None
}
async def check_alerts(self, metrics: Dict[str, Any]) -> List[Dict]:
"""Check metrics against alert thresholds."""
alerts = []
thresholds = self.config["alert_thresholds"]
# System alerts
if metrics["system"]:
sys_metrics = metrics["system"]
if sys_metrics.cpu_percent > thresholds["cpu_percent"]:
alerts.append({
"type": "system",
"metric": "cpu_percent",
"value": sys_metrics.cpu_percent,
"threshold": thresholds["cpu_percent"],
"severity": "warning" if sys_metrics.cpu_percent < 90 else "critical",
"message": f"High CPU usage: {sys_metrics.cpu_percent:.1f}%"
})
if sys_metrics.memory_percent > thresholds["memory_percent"]:
alerts.append({
"type": "system",
"metric": "memory_percent",
"value": sys_metrics.memory_percent,
"threshold": thresholds["memory_percent"],
"severity": "warning" if sys_metrics.memory_percent < 95 else "critical",
"message": f"High memory usage: {sys_metrics.memory_percent:.1f}%"
})
if sys_metrics.disk_usage > thresholds["disk_usage"]:
alerts.append({
"type": "system",
"metric": "disk_usage",
"value": sys_metrics.disk_usage,
"threshold": thresholds["disk_usage"],
"severity": "critical",
"message": f"High disk usage: {sys_metrics.disk_usage:.1f}%"
})
# Application alerts
if metrics["application"]:
app_metrics = metrics["application"]
if app_metrics.error_rate > thresholds["error_rate"]:
alerts.append({
"type": "application",
"metric": "error_rate",
"value": app_metrics.error_rate,
"threshold": thresholds["error_rate"],
"severity": "warning" if app_metrics.error_rate < 10 else "critical",
"message": f"High error rate: {app_metrics.error_rate:.1f}%"
})
if app_metrics.response_time_p95 > thresholds["response_time_p95"]:
alerts.append({
"type": "application",
"metric": "response_time_p95",
"value": app_metrics.response_time_p95,
"threshold": thresholds["response_time_p95"],
"severity": "warning",
"message": f"High response time: {app_metrics.response_time_p95:.0f}ms"
})
# Security alerts
if metrics["security"]:
sec_metrics = metrics["security"]
if sec_metrics.failed_logins > thresholds["failed_logins"]:
alerts.append({
"type": "security",
"metric": "failed_logins",
"value": sec_metrics.failed_logins,
"threshold": thresholds["failed_logins"],
"severity": "warning",
"message": f"High failed login count: {sec_metrics.failed_logins}"
})
if sec_metrics.security_events > thresholds["security_events"]:
alerts.append({
"type": "security",
"metric": "security_events",
"value": sec_metrics.security_events,
"threshold": thresholds["security_events"],
"severity": "critical",
"message": f"High security events: {sec_metrics.security_events}"
})
return alerts
async def send_alert(self, alert: Dict) -> bool:
"""Send alert notification."""
try:
# Log alert
self.logger.warning(f"ALERT: {alert['message']}")
# Send to Slack
if self.config["notifications"]["slack_webhook"]:
await self._send_slack_alert(alert)
# Send to PagerDuty for critical alerts
if alert["severity"] == "critical" and self.config["notifications"]["pagerduty_key"]:
await self._send_pagerduty_alert(alert)
# Store alert
alert["timestamp"] = time.time()
self.alerts.append(alert)
return True
except Exception as e:
self.logger.error(f"Error sending alert: {e}")
return False
async def _send_slack_alert(self, alert: Dict) -> bool:
"""Send alert to Slack."""
try:
webhook_url = self.config["notifications"]["slack_webhook"]
color = {
"warning": "warning",
"critical": "danger",
"info": "good"
}.get(alert["severity"], "warning")
payload = {
"text": f"AITBC Alert: {alert['message']}",
"attachments": [{
"color": color,
"fields": [
{"title": "Type", "value": alert["type"], "short": True},
{"title": "Metric", "value": alert["metric"], "short": True},
{"title": "Value", "value": str(alert["value"]), "short": True},
{"title": "Threshold", "value": str(alert["threshold"]), "short": True},
{"title": "Severity", "value": alert["severity"], "short": True}
],
"timestamp": int(time.time())
}]
}
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload) as response:
return response.status == 200
except Exception as e:
self.logger.error(f"Error sending Slack alert: {e}")
return False
async def _send_pagerduty_alert(self, alert: Dict) -> bool:
"""Send alert to PagerDuty."""
try:
api_key = self.config["notifications"]["pagerduty_key"]
payload = {
"routing_key": api_key,
"event_action": "trigger",
"payload": {
"summary": f"AITBC Alert: {alert['message']}",
"source": "aitbc-monitor",
"severity": alert["severity"],
"timestamp": datetime.now().isoformat(),
"custom_details": alert
}
}
async with aiohttp.ClientSession() as session:
async with session.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload
) as response:
return response.status == 202
except Exception as e:
self.logger.error(f"Error sending PagerDuty alert: {e}")
return False
async def generate_dashboard(self) -> Dict:
"""Generate monitoring dashboard data."""
try:
# Get recent metrics (last hour)
cutoff_time = time.time() - 3600
recent_metrics = {
"system": [m for m in self.metrics_history["system"] if m.timestamp > cutoff_time],
"application": [m for m in self.metrics_history["application"] if m.timestamp > cutoff_time],
"blockchain": [m for m in self.metrics_history["blockchain"] if m.timestamp > cutoff_time],
"security": [m for m in self.metrics_history["security"] if m.timestamp > cutoff_time]
}
dashboard = {
"timestamp": time.time(),
"status": "healthy",
"alerts": self.alerts[-10:], # Last 10 alerts
"metrics": {
"current": await self.collect_all_metrics(),
"trends": self._calculate_trends(recent_metrics),
"summaries": self._calculate_summaries(recent_metrics)
}
}
# Determine overall status
critical_alerts = [a for a in self.alerts if a.get("severity") == "critical"]
if critical_alerts:
dashboard["status"] = "critical"
elif self.alerts:
dashboard["status"] = "warning"
return dashboard
except Exception as e:
self.logger.error(f"Error generating dashboard: {e}")
return {"status": "error", "error": str(e)}
def _calculate_trends(self, recent_metrics: Dict) -> Dict:
"""Calculate metric trends."""
trends = {}
for metric_type, metrics in recent_metrics.items():
if not metrics:
continue
# Calculate trend for each numeric field
if metric_type == "system" and metrics:
trends["system"] = {
"cpu_trend": self._calculate_trend([m.cpu_percent for m in metrics]),
"memory_trend": self._calculate_trend([m.memory_percent for m in metrics]),
"disk_trend": self._calculate_trend([m.disk_usage for m in metrics])
}
elif metric_type == "application" and metrics:
trends["application"] = {
"response_time_trend": self._calculate_trend([m.response_time_avg for m in metrics]),
"error_rate_trend": self._calculate_trend([m.error_rate for m in metrics]),
"throughput_trend": self._calculate_trend([m.throughput for m in metrics])
}
return trends
def _calculate_trend(self, values: List[float]) -> str:
"""Calculate trend direction."""
if len(values) < 2:
return "stable"
# Simple linear regression to determine trend
n = len(values)
x = list(range(n))
x_mean = sum(x) / n
y_mean = sum(values) / n
numerator = sum((x[i] - x_mean) * (values[i] - y_mean) for i in range(n))
denominator = sum((x[i] - x_mean) ** 2 for i in range(n))
if denominator == 0:
return "stable"
slope = numerator / denominator
if slope > 0.1:
return "increasing"
elif slope < -0.1:
return "decreasing"
else:
return "stable"
def _calculate_summaries(self, recent_metrics: Dict) -> Dict:
"""Calculate metric summaries."""
summaries = {}
for metric_type, metrics in recent_metrics.items():
if not metrics:
continue
if metric_type == "system" and metrics:
summaries["system"] = {
"avg_cpu": statistics.mean([m.cpu_percent for m in metrics]),
"max_cpu": max([m.cpu_percent for m in metrics]),
"avg_memory": statistics.mean([m.memory_percent for m in metrics]),
"max_memory": max([m.memory_percent for m in metrics]),
"avg_disk": statistics.mean([m.disk_usage for m in metrics])
}
elif metric_type == "application" and metrics:
summaries["application"] = {
"avg_response_time": statistics.mean([m.response_time_avg for m in metrics]),
"max_response_time": max([m.response_time_p95 for m in metrics]),
"avg_error_rate": statistics.mean([m.error_rate for m in metrics]),
"total_requests": sum([m.api_requests for m in metrics]),
"avg_throughput": statistics.mean([m.throughput for m in metrics])
}
return summaries
async def store_metrics(self, metrics: Dict) -> None:
"""Store metrics in history."""
try:
timestamp = time.time()
# Add to history
if metrics["system"]:
self.metrics_history["system"].append(metrics["system"])
if metrics["application"]:
self.metrics_history["application"].append(metrics["application"])
if metrics["blockchain"]:
self.metrics_history["blockchain"].append(metrics["blockchain"])
if metrics["security"]:
self.metrics_history["security"].append(metrics["security"])
# Cleanup old metrics
cutoff_time = timestamp - (self.config["retention_days"] * 24 * 3600)
for metric_type in self.metrics_history:
self.metrics_history[metric_type] = [
m for m in self.metrics_history[metric_type]
if m.timestamp > cutoff_time
]
# Save to file
await self._save_metrics_to_file()
except Exception as e:
self.logger.error(f"Error storing metrics: {e}")
async def _save_metrics_to_file(self) -> None:
"""Save metrics to file."""
try:
metrics_file = Path("data/metrics_history.json")
metrics_file.parent.mkdir(exist_ok=True)
# Convert dataclasses to dicts for JSON serialization
serializable_history = {}
for metric_type, metrics in self.metrics_history.items():
serializable_history[metric_type] = [
asdict(m) if hasattr(m, '__dict__') else m
for m in metrics
]
with open(metrics_file, 'w') as f:
json.dump(serializable_history, f, indent=2)
except Exception as e:
self.logger.error(f"Error saving metrics to file: {e}")
async def run_monitoring_cycle(self) -> None:
"""Run a complete monitoring cycle."""
try:
# Collect metrics
metrics = await self.collect_all_metrics()
# Store metrics
await self.store_metrics(metrics)
# Check alerts
alerts = await self.check_alerts(metrics)
# Send alerts
for alert in alerts:
await self.send_alert(alert)
# Generate dashboard
dashboard = await self.generate_dashboard()
# Log summary
self.logger.info(f"Monitoring cycle completed. Status: {dashboard['status']}")
if alerts:
self.logger.warning(f"Generated {len(alerts)} alerts")
except Exception as e:
self.logger.error(f"Error in monitoring cycle: {e}")
async def start_monitoring(self) -> None:
"""Start continuous monitoring."""
self.logger.info("Starting production monitoring")
while True:
try:
await self.run_monitoring_cycle()
await asyncio.sleep(self.config["collection_interval"])
except KeyboardInterrupt:
self.logger.info("Monitoring stopped by user")
break
except Exception as e:
self.logger.error(f"Error in monitoring loop: {e}")
await asyncio.sleep(60) # Wait before retrying
# CLI interface
async def main():
"""Main CLI interface."""
import argparse
parser = argparse.ArgumentParser(description="AITBC Production Monitoring")
parser.add_argument("--start", action="store_true", help="Start monitoring")
parser.add_argument("--collect", action="store_true", help="Collect metrics once")
parser.add_argument("--dashboard", action="store_true", help="Generate dashboard")
parser.add_argument("--alerts", action="store_true", help="Check alerts")
args = parser.parse_args()
monitor = ProductionMonitor()
if args.start:
await monitor.start_monitoring()
elif args.collect:
metrics = await monitor.collect_all_metrics()
print(json.dumps(metrics, indent=2, default=str))
elif args.dashboard:
dashboard = await monitor.generate_dashboard()
print(json.dumps(dashboard, indent=2, default=str))
elif args.alerts:
metrics = await monitor.collect_all_metrics()
alerts = await monitor.check_alerts(metrics)
print(json.dumps(alerts, indent=2, default=str))
else:
print("Use --help to see available options")
if __name__ == "__main__":
asyncio.run(main())