chore(cleanup): remove obsolete scripts and update paths for production deployment

- Remove dev/scripts/check-file-organization.sh (obsolete organization checker)
- Remove dev/scripts/community_onboarding.py (unused 559-line automation script)
- Update gpu_miner_host.py log path from /home/oib/windsurf/aitbc to /opt/aitbc
- Add service status and standardization badges to README.md
This commit is contained in:
oib
2026-03-04 13:24:38 +01:00
parent 50954a4b31
commit 3df0a9ed62
126 changed files with 1870 additions and 458 deletions

View File

@@ -0,0 +1,547 @@
#!/usr/bin/env python3
"""
AITBC Performance Baseline Testing
This script establishes performance baselines for the AITBC platform,
including API response times, throughput, resource usage, and user experience metrics.
"""
import asyncio
import json
import logging
import time
import statistics
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from pathlib import Path
import aiohttp
import psutil
import subprocess
import sys
@dataclass
class PerformanceMetric:
"""Individual performance measurement."""
timestamp: float
metric_name: str
value: float
unit: str
context: Dict[str, Any]
@dataclass
class BaselineResult:
"""Performance baseline result."""
metric_name: str
baseline_value: float
unit: str
samples: int
min_value: float
max_value: float
mean_value: float
median_value: float
std_deviation: float
percentile_95: float
percentile_99: float
status: str # "pass", "warning", "fail"
threshold: Optional[float]
class PerformanceBaseline:
"""Performance baseline testing system."""
def __init__(self, config_path: str = "config/performance_config.json"):
self.config = self._load_config(config_path)
self.logger = self._setup_logging()
self.baselines = self._load_baselines()
self.current_metrics = []
def _load_config(self, config_path: str) -> Dict:
"""Load performance testing configuration."""
default_config = {
"test_duration": 300, # 5 minutes
"concurrent_users": 10,
"ramp_up_time": 60, # 1 minute
"endpoints": {
"health": "https://api.aitbc.dev/health",
"users": "https://api.aitbc.dev/api/v1/users",
"transactions": "https://api.aitbc.dev/api/v1/transactions",
"blockchain": "https://api.aitbc.dev/api/v1/blockchain/status",
"marketplace": "https://api.aitbc.dev/api/v1/marketplace/listings"
},
"thresholds": {
"response_time_p95": 2000, # ms
"response_time_p99": 5000, # ms
"error_rate": 1.0, # %
"throughput_min": 100, # requests/second
"cpu_max": 80, # %
"memory_max": 85, # %
"disk_io_max": 100 # MB/s
},
"scenarios": {
"light_load": {"users": 5, "duration": 60},
"medium_load": {"users": 20, "duration": 120},
"heavy_load": {"users": 50, "duration": 180},
"stress_test": {"users": 100, "duration": 300}
}
}
config_file = Path(config_path)
if config_file.exists():
with open(config_file, 'r') as f:
user_config = json.load(f)
default_config.update(user_config)
return default_config
def _setup_logging(self) -> logging.Logger:
"""Setup logging for performance testing."""
logger = logging.getLogger("performance_baseline")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _load_baselines(self) -> Dict:
"""Load existing baselines."""
baseline_file = Path("data/performance_baselines.json")
if baseline_file.exists():
with open(baseline_file, 'r') as f:
return json.load(f)
return {}
def _save_baselines(self) -> None:
"""Save baselines to file."""
baseline_file = Path("data/performance_baselines.json")
baseline_file.parent.mkdir(exist_ok=True)
with open(baseline_file, 'w') as f:
json.dump(self.baselines, f, indent=2)
async def measure_api_response_time(self, endpoint: str, method: str = "GET",
payload: Dict = None) -> float:
"""Measure API response time."""
start_time = time.time()
try:
async with aiohttp.ClientSession() as session:
if method.upper() == "GET":
async with session.get(endpoint) as response:
await response.text()
elif method.upper() == "POST":
async with session.post(endpoint, json=payload) as response:
await response.text()
else:
raise ValueError(f"Unsupported method: {method}")
end_time = time.time()
return (end_time - start_time) * 1000 # Convert to ms
except Exception as e:
self.logger.error(f"Error measuring {endpoint}: {e}")
return -1 # Indicate error
async def run_load_test(self, scenario: str) -> Dict[str, Any]:
"""Run load test scenario."""
scenario_config = self.config["scenarios"][scenario]
users = scenario_config["users"]
duration = scenario_config["duration"]
self.logger.info(f"Running {scenario} load test: {users} users for {duration}s")
results = {
"scenario": scenario,
"users": users,
"duration": duration,
"start_time": time.time(),
"metrics": {},
"system_metrics": []
}
# Start system monitoring
monitoring_task = asyncio.create_task(self._monitor_system_resources(results))
# Run concurrent requests
tasks = []
for i in range(users):
task = asyncio.create_task(self._simulate_user(duration))
tasks.append(task)
# Wait for all tasks to complete
user_results = await asyncio.gather(*tasks, return_exceptions=True)
# Stop monitoring
monitoring_task.cancel()
# Process results
all_response_times = []
error_count = 0
total_requests = 0
for user_result in user_results:
if isinstance(user_result, Exception):
error_count += 1
continue
for metric in user_result:
if metric.metric_name == "response_time" and metric.value > 0:
all_response_times.append(metric.value)
elif metric.metric_name == "error":
error_count += 1
total_requests += 1
# Calculate statistics
if all_response_times:
results["metrics"]["response_time"] = {
"samples": len(all_response_times),
"min": min(all_response_times),
"max": max(all_response_times),
"mean": statistics.mean(all_response_times),
"median": statistics.median(all_response_times),
"std_dev": statistics.stdev(all_response_times) if len(all_response_times) > 1 else 0,
"p95": self._percentile(all_response_times, 95),
"p99": self._percentile(all_response_times, 99)
}
results["metrics"]["error_rate"] = (error_count / total_requests * 100) if total_requests > 0 else 0
results["metrics"]["throughput"] = total_requests / duration
results["end_time"] = time.time()
return results
async def _simulate_user(self, duration: int) -> List[PerformanceMetric]:
"""Simulate a single user's activity."""
metrics = []
end_time = time.time() + duration
endpoints = list(self.config["endpoints"].keys())
while time.time() < end_time:
# Random endpoint selection
endpoint_name = endpoints[hash(str(time.time())) % len(endpoints)]
endpoint_url = self.config["endpoints"][endpoint_name]
# Measure response time
response_time = await self.measure_api_response_time(endpoint_url)
if response_time > 0:
metrics.append(PerformanceMetric(
timestamp=time.time(),
metric_name="response_time",
value=response_time,
unit="ms",
context={"endpoint": endpoint_name}
))
else:
metrics.append(PerformanceMetric(
timestamp=time.time(),
metric_name="error",
value=1,
unit="count",
context={"endpoint": endpoint_name}
))
# Random think time (1-5 seconds)
await asyncio.sleep(1 + (hash(str(time.time())) % 5))
return metrics
async def _monitor_system_resources(self, results: Dict) -> None:
"""Monitor system resources during test."""
try:
while True:
# Collect system metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk_io = psutil.disk_io_counters()
system_metric = {
"timestamp": time.time(),
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"disk_read_bytes": disk_io.read_bytes,
"disk_write_bytes": disk_io.write_bytes
}
results["system_metrics"].append(system_metric)
await asyncio.sleep(5) # Sample every 5 seconds
except asyncio.CancelledError:
self.logger.info("System monitoring stopped")
except Exception as e:
self.logger.error(f"Error in system monitoring: {e}")
def _percentile(self, values: List[float], percentile: float) -> float:
"""Calculate percentile of values."""
if not values:
return 0
sorted_values = sorted(values)
index = (percentile / 100) * (len(sorted_values) - 1)
if index.is_integer():
return sorted_values[int(index)]
else:
lower = sorted_values[int(index)]
upper = sorted_values[int(index) + 1]
return lower + (upper - lower) * (index - int(index))
async def establish_baseline(self, scenario: str) -> BaselineResult:
"""Establish performance baseline for a scenario."""
self.logger.info(f"Establishing baseline for {scenario}")
# Run load test
test_results = await self.run_load_test(scenario)
# Extract key metrics
response_time_data = test_results["metrics"].get("response_time", {})
error_rate = test_results["metrics"].get("error_rate", 0)
throughput = test_results["metrics"].get("throughput", 0)
# Create baseline result for response time
if response_time_data:
baseline = BaselineResult(
metric_name=f"{scenario}_response_time_p95",
baseline_value=response_time_data["p95"],
unit="ms",
samples=response_time_data["samples"],
min_value=response_time_data["min"],
max_value=response_time_data["max"],
mean_value=response_time_data["mean"],
median_value=response_time_data["median"],
std_deviation=response_time_data["std_dev"],
percentile_95=response_time_data["p95"],
percentile_99=response_time_data["p99"],
status="pass",
threshold=self.config["thresholds"]["response_time_p95"]
)
# Check against threshold
if baseline.percentile_95 > baseline.threshold:
baseline.status = "fail"
elif baseline.percentile_95 > baseline.threshold * 0.8:
baseline.status = "warning"
# Store baseline
self.baselines[f"{scenario}_response_time_p95"] = asdict(baseline)
self._save_baselines()
return baseline
return None
async def compare_with_baseline(self, scenario: str) -> Dict[str, Any]:
"""Compare current performance with established baseline."""
self.logger.info(f"Comparing {scenario} with baseline")
# Run current test
current_results = await self.run_load_test(scenario)
# Get baseline
baseline_key = f"{scenario}_response_time_p95"
baseline_data = self.baselines.get(baseline_key)
if not baseline_data:
return {"error": "No baseline found for scenario"}
comparison = {
"scenario": scenario,
"baseline": baseline_data,
"current": current_results["metrics"],
"comparison": {},
"status": "unknown"
}
# Compare response times
current_p95 = current_results["metrics"].get("response_time", {}).get("p95", 0)
baseline_p95 = baseline_data["baseline_value"]
if current_p95 > 0:
percent_change = ((current_p95 - baseline_p95) / baseline_p95) * 100
comparison["comparison"]["response_time_p95"] = {
"baseline": baseline_p95,
"current": current_p95,
"percent_change": percent_change,
"status": "pass" if percent_change < 10 else "warning" if percent_change < 25 else "fail"
}
# Compare error rates
current_error_rate = current_results["metrics"].get("error_rate", 0)
baseline_error_rate = baseline_data.get("error_rate", 0)
error_change = current_error_rate - baseline_error_rate
comparison["comparison"]["error_rate"] = {
"baseline": baseline_error_rate,
"current": current_error_rate,
"change": error_change,
"status": "pass" if error_change < 0.5 else "warning" if error_change < 2.0 else "fail"
}
# Compare throughput
current_throughput = current_results["metrics"].get("throughput", 0)
baseline_throughput = baseline_data.get("throughput", 0)
if baseline_throughput > 0:
throughput_change = ((current_throughput - baseline_throughput) / baseline_throughput) * 100
comparison["comparison"]["throughput"] = {
"baseline": baseline_throughput,
"current": current_throughput,
"percent_change": throughput_change,
"status": "pass" if throughput_change > -10 else "warning" if throughput_change > -25 else "fail"
}
# Overall status
statuses = [cmp.get("status") for cmp in comparison["comparison"].values()]
if "fail" in statuses:
comparison["status"] = "fail"
elif "warning" in statuses:
comparison["status"] = "warning"
else:
comparison["status"] = "pass"
return comparison
async def run_all_scenarios(self) -> Dict[str, Any]:
"""Run all performance test scenarios."""
results = {}
for scenario in self.config["scenarios"].keys():
try:
self.logger.info(f"Running scenario: {scenario}")
# Establish baseline if not exists
if f"{scenario}_response_time_p95" not in self.baselines:
baseline = await self.establish_baseline(scenario)
results[scenario] = {"baseline": asdict(baseline)}
else:
# Compare with existing baseline
comparison = await self.compare_with_baseline(scenario)
results[scenario] = comparison
except Exception as e:
self.logger.error(f"Error running scenario {scenario}: {e}")
results[scenario] = {"error": str(e)}
return results
async def generate_performance_report(self) -> Dict[str, Any]:
"""Generate comprehensive performance report."""
self.logger.info("Generating performance report")
# Run all scenarios
scenario_results = await self.run_all_scenarios()
# Calculate overall metrics
total_scenarios = len(scenario_results)
passed_scenarios = len([r for r in scenario_results.values() if r.get("status") == "pass"])
warning_scenarios = len([r for r in scenario_results.values() if r.get("status") == "warning"])
failed_scenarios = len([r for r in scenario_results.values() if r.get("status") == "fail"])
report = {
"timestamp": datetime.now().isoformat(),
"summary": {
"total_scenarios": total_scenarios,
"passed": passed_scenarios,
"warnings": warning_scenarios,
"failed": failed_scenarios,
"success_rate": (passed_scenarios / total_scenarios * 100) if total_scenarios > 0 else 0,
"overall_status": "pass" if failed_scenarios == 0 else "warning" if failed_scenarios == 0 else "fail"
},
"scenarios": scenario_results,
"baselines": self.baselines,
"thresholds": self.config["thresholds"],
"recommendations": self._generate_recommendations(scenario_results)
}
# Save report
report_file = Path("data/performance_report.json")
report_file.parent.mkdir(exist_ok=True)
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
return report
def _generate_recommendations(self, scenario_results: Dict) -> List[str]:
"""Generate performance recommendations."""
recommendations = []
for scenario, result in scenario_results.items():
if result.get("status") == "fail":
recommendations.append(f"URGENT: {scenario} scenario failed performance tests")
elif result.get("status") == "warning":
recommendations.append(f"Review {scenario} scenario performance degradation")
# Check for common issues
high_response_times = []
high_error_rates = []
for scenario, result in scenario_results.items():
if "comparison" in result:
comp = result["comparison"]
if comp.get("response_time_p95", {}).get("status") == "fail":
high_response_times.append(scenario)
if comp.get("error_rate", {}).get("status") == "fail":
high_error_rates.append(scenario)
if high_response_times:
recommendations.append(f"High response times detected in: {', '.join(high_response_times)}")
if high_error_rates:
recommendations.append(f"High error rates detected in: {', '.join(high_error_rates)}")
if not recommendations:
recommendations.append("All performance tests passed. System is performing within expected parameters.")
return recommendations
# CLI interface
async def main():
"""Main CLI interface."""
import argparse
parser = argparse.ArgumentParser(description="AITBC Performance Baseline Testing")
parser.add_argument("--scenario", help="Run specific scenario")
parser.add_argument("--baseline", help="Establish baseline for scenario")
parser.add_argument("--compare", help="Compare scenario with baseline")
parser.add_argument("--all", action="store_true", help="Run all scenarios")
parser.add_argument("--report", action="store_true", help="Generate performance report")
args = parser.parse_args()
baseline = PerformanceBaseline()
if args.scenario:
if args.baseline:
result = await baseline.establish_baseline(args.scenario)
print(f"Baseline established: {result}")
elif args.compare:
comparison = await baseline.compare_with_baseline(args.scenario)
print(json.dumps(comparison, indent=2))
else:
result = await baseline.run_load_test(args.scenario)
print(json.dumps(result, indent=2, default=str))
elif args.all:
results = await baseline.run_all_scenarios()
print(json.dumps(results, indent=2, default=str))
elif args.report:
report = await baseline.generate_performance_report()
print(json.dumps(report, indent=2))
else:
print("Use --help to see available options")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,718 @@
"""
AITBC Production Monitoring and Analytics
This module provides comprehensive monitoring and analytics capabilities
for the AITBC production environment, including metrics collection,
alerting, and dashboard generation.
"""
import asyncio
import json
import logging
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from pathlib import Path
import subprocess
import psutil
import aiohttp
import statistics
@dataclass
class SystemMetrics:
"""System performance metrics."""
timestamp: float
cpu_percent: float
memory_percent: float
disk_usage: float
network_io: Dict[str, int]
process_count: int
load_average: List[float]
@dataclass
class ApplicationMetrics:
"""Application performance metrics."""
timestamp: float
active_users: int
api_requests: int
response_time_avg: float
response_time_p95: float
error_rate: float
throughput: float
cache_hit_rate: float
@dataclass
class BlockchainMetrics:
"""Blockchain network metrics."""
timestamp: float
block_height: int
gas_price: float
transaction_count: int
network_hashrate: float
peer_count: int
sync_status: str
@dataclass
class SecurityMetrics:
"""Security monitoring metrics."""
timestamp: float
failed_logins: int
suspicious_ips: int
security_events: int
vulnerability_scans: int
blocked_requests: int
audit_log_entries: int
class ProductionMonitor:
"""Production monitoring system."""
def __init__(self, config_path: str = "config/monitoring_config.json"):
self.config = self._load_config(config_path)
self.logger = self._setup_logging()
self.metrics_history = {
"system": [],
"application": [],
"blockchain": [],
"security": []
}
self.alerts = []
self.dashboards = {}
def _load_config(self, config_path: str) -> Dict:
"""Load monitoring configuration."""
default_config = {
"collection_interval": 60, # seconds
"retention_days": 30,
"alert_thresholds": {
"cpu_percent": 80,
"memory_percent": 85,
"disk_usage": 90,
"error_rate": 5.0,
"response_time_p95": 2000, # ms
"failed_logins": 10,
"security_events": 5
},
"endpoints": {
"health": "https://api.aitbc.dev/health",
"metrics": "https://api.aitbc.dev/metrics",
"blockchain": "https://api.aitbc.dev/blockchain/stats",
"security": "https://api.aitbc.dev/security/stats"
},
"notifications": {
"slack_webhook": os.getenv("SLACK_WEBHOOK_URL"),
"email_smtp": os.getenv("SMTP_SERVER"),
"pagerduty_key": os.getenv("PAGERDUTY_KEY")
}
}
config_file = Path(config_path)
if config_file.exists():
with open(config_file, 'r') as f:
user_config = json.load(f)
default_config.update(user_config)
return default_config
def _setup_logging(self) -> logging.Logger:
"""Setup logging for monitoring system."""
logger = logging.getLogger("production_monitor")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
async def collect_system_metrics(self) -> SystemMetrics:
"""Collect system performance metrics."""
try:
# CPU metrics
cpu_percent = psutil.cpu_percent(interval=1)
load_avg = list(psutil.getloadavg())
# Memory metrics
memory = psutil.virtual_memory()
memory_percent = memory.percent
# Disk metrics
disk = psutil.disk_usage('/')
disk_usage = (disk.used / disk.total) * 100
# Network metrics
network = psutil.net_io_counters()
network_io = {
"bytes_sent": network.bytes_sent,
"bytes_recv": network.bytes_recv,
"packets_sent": network.packets_sent,
"packets_recv": network.packets_recv
}
# Process metrics
process_count = len(psutil.pids())
return SystemMetrics(
timestamp=time.time(),
cpu_percent=cpu_percent,
memory_percent=memory_percent,
disk_usage=disk_usage,
network_io=network_io,
process_count=process_count,
load_average=load_avg
)
except Exception as e:
self.logger.error(f"Error collecting system metrics: {e}")
return None
async def collect_application_metrics(self) -> ApplicationMetrics:
"""Collect application performance metrics."""
try:
async with aiohttp.ClientSession() as session:
# Get metrics from application
async with session.get(self.config["endpoints"]["metrics"]) as response:
if response.status == 200:
data = await response.json()
return ApplicationMetrics(
timestamp=time.time(),
active_users=data.get("active_users", 0),
api_requests=data.get("api_requests", 0),
response_time_avg=data.get("response_time_avg", 0),
response_time_p95=data.get("response_time_p95", 0),
error_rate=data.get("error_rate", 0),
throughput=data.get("throughput", 0),
cache_hit_rate=data.get("cache_hit_rate", 0)
)
# Fallback metrics if API is unavailable
return ApplicationMetrics(
timestamp=time.time(),
active_users=0,
api_requests=0,
response_time_avg=0,
response_time_p95=0,
error_rate=0,
throughput=0,
cache_hit_rate=0
)
except Exception as e:
self.logger.error(f"Error collecting application metrics: {e}")
return None
async def collect_blockchain_metrics(self) -> BlockchainMetrics:
"""Collect blockchain network metrics."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.config["endpoints"]["blockchain"]) as response:
if response.status == 200:
data = await response.json()
return BlockchainMetrics(
timestamp=time.time(),
block_height=data.get("block_height", 0),
gas_price=data.get("gas_price", 0),
transaction_count=data.get("transaction_count", 0),
network_hashrate=data.get("network_hashrate", 0),
peer_count=data.get("peer_count", 0),
sync_status=data.get("sync_status", "unknown")
)
return BlockchainMetrics(
timestamp=time.time(),
block_height=0,
gas_price=0,
transaction_count=0,
network_hashrate=0,
peer_count=0,
sync_status="unknown"
)
except Exception as e:
self.logger.error(f"Error collecting blockchain metrics: {e}")
return None
async def collect_security_metrics(self) -> SecurityMetrics:
"""Collect security monitoring metrics."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.config["endpoints"]["security"]) as response:
if response.status == 200:
data = await response.json()
return SecurityMetrics(
timestamp=time.time(),
failed_logins=data.get("failed_logins", 0),
suspicious_ips=data.get("suspicious_ips", 0),
security_events=data.get("security_events", 0),
vulnerability_scans=data.get("vulnerability_scans", 0),
blocked_requests=data.get("blocked_requests", 0),
audit_log_entries=data.get("audit_log_entries", 0)
)
return SecurityMetrics(
timestamp=time.time(),
failed_logins=0,
suspicious_ips=0,
security_events=0,
vulnerability_scans=0,
blocked_requests=0,
audit_log_entries=0
)
except Exception as e:
self.logger.error(f"Error collecting security metrics: {e}")
return None
async def collect_all_metrics(self) -> Dict[str, Any]:
"""Collect all metrics."""
tasks = [
self.collect_system_metrics(),
self.collect_application_metrics(),
self.collect_blockchain_metrics(),
self.collect_security_metrics()
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
"system": results[0] if not isinstance(results[0], Exception) else None,
"application": results[1] if not isinstance(results[1], Exception) else None,
"blockchain": results[2] if not isinstance(results[2], Exception) else None,
"security": results[3] if not isinstance(results[3], Exception) else None
}
async def check_alerts(self, metrics: Dict[str, Any]) -> List[Dict]:
"""Check metrics against alert thresholds."""
alerts = []
thresholds = self.config["alert_thresholds"]
# System alerts
if metrics["system"]:
sys_metrics = metrics["system"]
if sys_metrics.cpu_percent > thresholds["cpu_percent"]:
alerts.append({
"type": "system",
"metric": "cpu_percent",
"value": sys_metrics.cpu_percent,
"threshold": thresholds["cpu_percent"],
"severity": "warning" if sys_metrics.cpu_percent < 90 else "critical",
"message": f"High CPU usage: {sys_metrics.cpu_percent:.1f}%"
})
if sys_metrics.memory_percent > thresholds["memory_percent"]:
alerts.append({
"type": "system",
"metric": "memory_percent",
"value": sys_metrics.memory_percent,
"threshold": thresholds["memory_percent"],
"severity": "warning" if sys_metrics.memory_percent < 95 else "critical",
"message": f"High memory usage: {sys_metrics.memory_percent:.1f}%"
})
if sys_metrics.disk_usage > thresholds["disk_usage"]:
alerts.append({
"type": "system",
"metric": "disk_usage",
"value": sys_metrics.disk_usage,
"threshold": thresholds["disk_usage"],
"severity": "critical",
"message": f"High disk usage: {sys_metrics.disk_usage:.1f}%"
})
# Application alerts
if metrics["application"]:
app_metrics = metrics["application"]
if app_metrics.error_rate > thresholds["error_rate"]:
alerts.append({
"type": "application",
"metric": "error_rate",
"value": app_metrics.error_rate,
"threshold": thresholds["error_rate"],
"severity": "warning" if app_metrics.error_rate < 10 else "critical",
"message": f"High error rate: {app_metrics.error_rate:.1f}%"
})
if app_metrics.response_time_p95 > thresholds["response_time_p95"]:
alerts.append({
"type": "application",
"metric": "response_time_p95",
"value": app_metrics.response_time_p95,
"threshold": thresholds["response_time_p95"],
"severity": "warning",
"message": f"High response time: {app_metrics.response_time_p95:.0f}ms"
})
# Security alerts
if metrics["security"]:
sec_metrics = metrics["security"]
if sec_metrics.failed_logins > thresholds["failed_logins"]:
alerts.append({
"type": "security",
"metric": "failed_logins",
"value": sec_metrics.failed_logins,
"threshold": thresholds["failed_logins"],
"severity": "warning",
"message": f"High failed login count: {sec_metrics.failed_logins}"
})
if sec_metrics.security_events > thresholds["security_events"]:
alerts.append({
"type": "security",
"metric": "security_events",
"value": sec_metrics.security_events,
"threshold": thresholds["security_events"],
"severity": "critical",
"message": f"High security events: {sec_metrics.security_events}"
})
return alerts
async def send_alert(self, alert: Dict) -> bool:
"""Send alert notification."""
try:
# Log alert
self.logger.warning(f"ALERT: {alert['message']}")
# Send to Slack
if self.config["notifications"]["slack_webhook"]:
await self._send_slack_alert(alert)
# Send to PagerDuty for critical alerts
if alert["severity"] == "critical" and self.config["notifications"]["pagerduty_key"]:
await self._send_pagerduty_alert(alert)
# Store alert
alert["timestamp"] = time.time()
self.alerts.append(alert)
return True
except Exception as e:
self.logger.error(f"Error sending alert: {e}")
return False
async def _send_slack_alert(self, alert: Dict) -> bool:
"""Send alert to Slack."""
try:
webhook_url = self.config["notifications"]["slack_webhook"]
color = {
"warning": "warning",
"critical": "danger",
"info": "good"
}.get(alert["severity"], "warning")
payload = {
"text": f"AITBC Alert: {alert['message']}",
"attachments": [{
"color": color,
"fields": [
{"title": "Type", "value": alert["type"], "short": True},
{"title": "Metric", "value": alert["metric"], "short": True},
{"title": "Value", "value": str(alert["value"]), "short": True},
{"title": "Threshold", "value": str(alert["threshold"]), "short": True},
{"title": "Severity", "value": alert["severity"], "short": True}
],
"timestamp": int(time.time())
}]
}
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload) as response:
return response.status == 200
except Exception as e:
self.logger.error(f"Error sending Slack alert: {e}")
return False
async def _send_pagerduty_alert(self, alert: Dict) -> bool:
"""Send alert to PagerDuty."""
try:
api_key = self.config["notifications"]["pagerduty_key"]
payload = {
"routing_key": api_key,
"event_action": "trigger",
"payload": {
"summary": f"AITBC Alert: {alert['message']}",
"source": "aitbc-monitor",
"severity": alert["severity"],
"timestamp": datetime.now().isoformat(),
"custom_details": alert
}
}
async with aiohttp.ClientSession() as session:
async with session.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload
) as response:
return response.status == 202
except Exception as e:
self.logger.error(f"Error sending PagerDuty alert: {e}")
return False
async def generate_dashboard(self) -> Dict:
"""Generate monitoring dashboard data."""
try:
# Get recent metrics (last hour)
cutoff_time = time.time() - 3600
recent_metrics = {
"system": [m for m in self.metrics_history["system"] if m.timestamp > cutoff_time],
"application": [m for m in self.metrics_history["application"] if m.timestamp > cutoff_time],
"blockchain": [m for m in self.metrics_history["blockchain"] if m.timestamp > cutoff_time],
"security": [m for m in self.metrics_history["security"] if m.timestamp > cutoff_time]
}
dashboard = {
"timestamp": time.time(),
"status": "healthy",
"alerts": self.alerts[-10:], # Last 10 alerts
"metrics": {
"current": await self.collect_all_metrics(),
"trends": self._calculate_trends(recent_metrics),
"summaries": self._calculate_summaries(recent_metrics)
}
}
# Determine overall status
critical_alerts = [a for a in self.alerts if a.get("severity") == "critical"]
if critical_alerts:
dashboard["status"] = "critical"
elif self.alerts:
dashboard["status"] = "warning"
return dashboard
except Exception as e:
self.logger.error(f"Error generating dashboard: {e}")
return {"status": "error", "error": str(e)}
def _calculate_trends(self, recent_metrics: Dict) -> Dict:
"""Calculate metric trends."""
trends = {}
for metric_type, metrics in recent_metrics.items():
if not metrics:
continue
# Calculate trend for each numeric field
if metric_type == "system" and metrics:
trends["system"] = {
"cpu_trend": self._calculate_trend([m.cpu_percent for m in metrics]),
"memory_trend": self._calculate_trend([m.memory_percent for m in metrics]),
"disk_trend": self._calculate_trend([m.disk_usage for m in metrics])
}
elif metric_type == "application" and metrics:
trends["application"] = {
"response_time_trend": self._calculate_trend([m.response_time_avg for m in metrics]),
"error_rate_trend": self._calculate_trend([m.error_rate for m in metrics]),
"throughput_trend": self._calculate_trend([m.throughput for m in metrics])
}
return trends
def _calculate_trend(self, values: List[float]) -> str:
"""Calculate trend direction."""
if len(values) < 2:
return "stable"
# Simple linear regression to determine trend
n = len(values)
x = list(range(n))
x_mean = sum(x) / n
y_mean = sum(values) / n
numerator = sum((x[i] - x_mean) * (values[i] - y_mean) for i in range(n))
denominator = sum((x[i] - x_mean) ** 2 for i in range(n))
if denominator == 0:
return "stable"
slope = numerator / denominator
if slope > 0.1:
return "increasing"
elif slope < -0.1:
return "decreasing"
else:
return "stable"
def _calculate_summaries(self, recent_metrics: Dict) -> Dict:
"""Calculate metric summaries."""
summaries = {}
for metric_type, metrics in recent_metrics.items():
if not metrics:
continue
if metric_type == "system" and metrics:
summaries["system"] = {
"avg_cpu": statistics.mean([m.cpu_percent for m in metrics]),
"max_cpu": max([m.cpu_percent for m in metrics]),
"avg_memory": statistics.mean([m.memory_percent for m in metrics]),
"max_memory": max([m.memory_percent for m in metrics]),
"avg_disk": statistics.mean([m.disk_usage for m in metrics])
}
elif metric_type == "application" and metrics:
summaries["application"] = {
"avg_response_time": statistics.mean([m.response_time_avg for m in metrics]),
"max_response_time": max([m.response_time_p95 for m in metrics]),
"avg_error_rate": statistics.mean([m.error_rate for m in metrics]),
"total_requests": sum([m.api_requests for m in metrics]),
"avg_throughput": statistics.mean([m.throughput for m in metrics])
}
return summaries
async def store_metrics(self, metrics: Dict) -> None:
"""Store metrics in history."""
try:
timestamp = time.time()
# Add to history
if metrics["system"]:
self.metrics_history["system"].append(metrics["system"])
if metrics["application"]:
self.metrics_history["application"].append(metrics["application"])
if metrics["blockchain"]:
self.metrics_history["blockchain"].append(metrics["blockchain"])
if metrics["security"]:
self.metrics_history["security"].append(metrics["security"])
# Cleanup old metrics
cutoff_time = timestamp - (self.config["retention_days"] * 24 * 3600)
for metric_type in self.metrics_history:
self.metrics_history[metric_type] = [
m for m in self.metrics_history[metric_type]
if m.timestamp > cutoff_time
]
# Save to file
await self._save_metrics_to_file()
except Exception as e:
self.logger.error(f"Error storing metrics: {e}")
async def _save_metrics_to_file(self) -> None:
"""Save metrics to file."""
try:
metrics_file = Path("data/metrics_history.json")
metrics_file.parent.mkdir(exist_ok=True)
# Convert dataclasses to dicts for JSON serialization
serializable_history = {}
for metric_type, metrics in self.metrics_history.items():
serializable_history[metric_type] = [
asdict(m) if hasattr(m, '__dict__') else m
for m in metrics
]
with open(metrics_file, 'w') as f:
json.dump(serializable_history, f, indent=2)
except Exception as e:
self.logger.error(f"Error saving metrics to file: {e}")
async def run_monitoring_cycle(self) -> None:
"""Run a complete monitoring cycle."""
try:
# Collect metrics
metrics = await self.collect_all_metrics()
# Store metrics
await self.store_metrics(metrics)
# Check alerts
alerts = await self.check_alerts(metrics)
# Send alerts
for alert in alerts:
await self.send_alert(alert)
# Generate dashboard
dashboard = await self.generate_dashboard()
# Log summary
self.logger.info(f"Monitoring cycle completed. Status: {dashboard['status']}")
if alerts:
self.logger.warning(f"Generated {len(alerts)} alerts")
except Exception as e:
self.logger.error(f"Error in monitoring cycle: {e}")
async def start_monitoring(self) -> None:
"""Start continuous monitoring."""
self.logger.info("Starting production monitoring")
while True:
try:
await self.run_monitoring_cycle()
await asyncio.sleep(self.config["collection_interval"])
except KeyboardInterrupt:
self.logger.info("Monitoring stopped by user")
break
except Exception as e:
self.logger.error(f"Error in monitoring loop: {e}")
await asyncio.sleep(60) # Wait before retrying
# CLI interface
async def main():
"""Main CLI interface."""
import argparse
parser = argparse.ArgumentParser(description="AITBC Production Monitoring")
parser.add_argument("--start", action="store_true", help="Start monitoring")
parser.add_argument("--collect", action="store_true", help="Collect metrics once")
parser.add_argument("--dashboard", action="store_true", help="Generate dashboard")
parser.add_argument("--alerts", action="store_true", help="Check alerts")
args = parser.parse_args()
monitor = ProductionMonitor()
if args.start:
await monitor.start_monitoring()
elif args.collect:
metrics = await monitor.collect_all_metrics()
print(json.dumps(metrics, indent=2, default=str))
elif args.dashboard:
dashboard = await monitor.generate_dashboard()
print(json.dumps(dashboard, indent=2, default=str))
elif args.alerts:
metrics = await monitor.collect_all_metrics()
alerts = await monitor.check_alerts(metrics)
print(json.dumps(alerts, indent=2, default=str))
else:
print("Use --help to see available options")
if __name__ == "__main__":
asyncio.run(main())