Files
aitbc/dev/onboarding/onboarding-monitor.py
oib 50954a4b31 chore(systemd): remove obsolete systemd service files and update infrastructure documentation
- Remove 8 unused systemd service files from coordinator-api/systemd/
  - aitbc-adaptive-learning.service (port 8005)
  - aitbc-advanced-ai.service
  - aitbc-enterprise-api.service
  - aitbc-gpu-multimodal.service (port 8003)
  - aitbc-marketplace-enhanced.service (port 8006)
  - aitbc-modality-optimization.service (port 8004)
  - aitbc-multimodal.service (port 8002)
  - aitbc-openclaw-enhanced.service (port 8007
2026-03-04 12:16:50 +01:00

425 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
"""
onboarding-monitor.py - Monitor agent onboarding success and performance
This script monitors the success rate of agent onboarding, tracks metrics,
and provides insights for improving the onboarding process.
"""
import asyncio
import json
import sys
import time
import logging
from datetime import datetime, timedelta
from pathlib import Path
import requests
from collections import defaultdict
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class OnboardingMonitor:
"""Monitor agent onboarding metrics and performance"""
def __init__(self):
self.metrics = {
'total_onboardings': 0,
'successful_onboardings': 0,
'failed_onboardings': 0,
'agent_type_distribution': defaultdict(int),
'completion_times': [],
'failure_points': defaultdict(int),
'daily_stats': defaultdict(dict),
'error_patterns': defaultdict(int)
}
def load_existing_data(self):
"""Load existing onboarding data"""
data_file = Path('/tmp/aitbc-onboarding-metrics.json')
if data_file.exists():
try:
with open(data_file, 'r') as f:
data = json.load(f)
self.metrics.update(data)
logger.info(f"Loaded existing metrics: {data.get('total_onboardings', 0)} onboardings")
except Exception as e:
logger.error(f"Failed to load existing data: {e}")
def save_metrics(self):
"""Save current metrics to file"""
try:
data_file = Path('/tmp/aitbc-onboarding-metrics.json')
with open(data_file, 'w') as f:
json.dump(dict(self.metrics), f, indent=2)
except Exception as e:
logger.error(f"Failed to save metrics: {e}")
def scan_onboarding_reports(self):
"""Scan for onboarding report files"""
reports = []
report_dir = Path('/tmp')
for report_file in report_dir.glob('aitbc-onboarding-*.json'):
try:
with open(report_file, 'r') as f:
report = json.load(f)
reports.append(report)
except Exception as e:
logger.error(f"Failed to read report {report_file}: {e}")
return reports
def analyze_reports(self, reports):
"""Analyze onboarding reports and update metrics"""
for report in reports:
try:
onboarding = report.get('onboarding', {})
# Update basic metrics
self.metrics['total_onboardings'] += 1
if onboarding.get('status') == 'success':
self.metrics['successful_onboardings'] += 1
# Track completion time
duration = onboarding.get('duration_minutes', 0)
self.metrics['completion_times'].append(duration)
# Track agent type distribution
agent_type = self.extract_agent_type(report)
if agent_type:
self.metrics['agent_type_distribution'][agent_type] += 1
# Track daily stats
date = datetime.fromisoformat(onboarding['timestamp']).date()
self.metrics['daily_stats'][date]['successful'] = \
self.metrics['daily_stats'][date].get('successful', 0) + 1
self.metrics['daily_stats'][date]['total'] = \
self.metrics['daily_stats'][date].get('total', 0) + 1
else:
self.metrics['failed_onboardings'] += 1
# Track failure points
steps_completed = onboarding.get('steps_completed', [])
expected_steps = ['environment_check', 'capability_assessment',
'agent_type_recommendation', 'agent_creation',
'network_registration', 'swarm_integration',
'participation_started', 'report_generated']
for step in expected_steps:
if step not in steps_completed:
self.metrics['failure_points'][step] += 1
# Track errors
for error in onboarding.get('errors', []):
self.metrics['error_patterns'][error] += 1
# Track daily failures
date = datetime.fromisoformat(onboarding['timestamp']).date()
self.metrics['daily_stats'][date]['failed'] = \
self.metrics['daily_stats'][date].get('failed', 0) + 1
self.metrics['daily_stats'][date]['total'] = \
self.metrics['daily_stats'][date].get('total', 0) + 1
except Exception as e:
logger.error(f"Failed to analyze report: {e}")
def extract_agent_type(self, report):
"""Extract agent type from report"""
try:
agent_capabilities = report.get('agent_capabilities', {})
compute_type = agent_capabilities.get('specialization')
# Map specialization to agent type
type_mapping = {
'inference': 'compute_provider',
'training': 'compute_provider',
'processing': 'compute_consumer',
'coordination': 'swarm_coordinator',
'development': 'platform_builder'
}
return type_mapping.get(compute_type, 'unknown')
except:
return 'unknown'
def calculate_metrics(self):
"""Calculate derived metrics"""
metrics = {}
# Success rate
if self.metrics['total_onboardings'] > 0:
metrics['success_rate'] = (self.metrics['successful_onboardings'] /
self.metrics['total_onboardings']) * 100
else:
metrics['success_rate'] = 0
# Average completion time
if self.metrics['completion_times']:
metrics['avg_completion_time'] = sum(self.metrics['completion_times']) / len(self.metrics['completion_times'])
else:
metrics['avg_completion_time'] = 0
# Most common failure point
if self.metrics['failure_points']:
metrics['most_common_failure'] = max(self.metrics['failure_points'],
key=self.metrics['failure_points'].get)
else:
metrics['most_common_failure'] = 'none'
# Most common error
if self.metrics['error_patterns']:
metrics['most_common_error'] = max(self.metrics['error_patterns'],
key=self.metrics['error_patterns'].get)
else:
metrics['most_common_error'] = 'none'
# Agent type distribution percentages
total_agents = sum(self.metrics['agent_type_distribution'].values())
if total_agents > 0:
metrics['agent_type_percentages'] = {
agent_type: (count / total_agents) * 100
for agent_type, count in self.metrics['agent_type_distribution'].items()
}
else:
metrics['agent_type_percentages'] = {}
return metrics
def generate_report(self):
"""Generate comprehensive onboarding report"""
metrics = self.calculate_metrics()
report = {
'timestamp': datetime.utcnow().isoformat(),
'summary': {
'total_onboardings': self.metrics['total_onboardings'],
'successful_onboardings': self.metrics['successful_onboardings'],
'failed_onboardings': self.metrics['failed_onboardings'],
'success_rate': metrics['success_rate'],
'avg_completion_time_minutes': metrics['avg_completion_time']
},
'agent_type_distribution': dict(self.metrics['agent_type_distribution']),
'agent_type_percentages': metrics['agent_type_percentages'],
'failure_analysis': {
'most_common_failure_point': metrics['most_common_failure'],
'failure_points': dict(self.metrics['failure_points']),
'most_common_error': metrics['most_common_error'],
'error_patterns': dict(self.metrics['error_patterns'])
},
'daily_stats': dict(self.metrics['daily_stats']),
'recommendations': self.generate_recommendations(metrics)
}
return report
def generate_recommendations(self, metrics):
"""Generate improvement recommendations"""
recommendations = []
# Success rate recommendations
if metrics['success_rate'] < 80:
recommendations.append({
'priority': 'high',
'issue': 'Low success rate',
'recommendation': 'Review onboarding process for common failure points',
'action': 'Focus on fixing: ' + metrics['most_common_failure']
})
elif metrics['success_rate'] < 95:
recommendations.append({
'priority': 'medium',
'issue': 'Moderate success rate',
'recommendation': 'Optimize onboarding for better success rate',
'action': 'Monitor and improve failure points'
})
# Completion time recommendations
if metrics['avg_completion_time'] > 20:
recommendations.append({
'priority': 'medium',
'issue': 'Slow onboarding process',
'recommendation': 'Optimize onboarding steps for faster completion',
'action': 'Reduce time in capability assessment and registration'
})
# Agent type distribution recommendations
if 'compute_provider' not in metrics['agent_type_percentages'] or \
metrics['agent_type_percentages'].get('compute_provider', 0) < 20:
recommendations.append({
'priority': 'low',
'issue': 'Low compute provider adoption',
'recommendation': 'Improve compute provider onboarding experience',
'action': 'Simplify GPU setup and resource offering process'
})
# Error pattern recommendations
if metrics['most_common_error'] != 'none':
recommendations.append({
'priority': 'high',
'issue': f'Recurring error: {metrics["most_common_error"]}',
'recommendation': 'Fix common error pattern',
'action': 'Add better error handling and user guidance'
})
return recommendations
def print_dashboard(self):
"""Print a dashboard view of current metrics"""
metrics = self.calculate_metrics()
print("🤖 AITBC Agent Onboarding Dashboard")
print("=" * 50)
print()
# Summary stats
print("📊 SUMMARY:")
print(f" Total Onboardings: {self.metrics['total_onboardings']}")
print(f" Success Rate: {metrics['success_rate']:.1f}%")
print(f" Avg Completion Time: {metrics['avg_completion_time']:.1f} minutes")
print()
# Agent type distribution
print("🎯 AGENT TYPE DISTRIBUTION:")
for agent_type, count in self.metrics['agent_type_distribution'].items():
percentage = metrics['agent_type_percentages'].get(agent_type, 0)
print(f" {agent_type}: {count} ({percentage:.1f}%)")
print()
# Recent performance
print("📈 RECENT PERFORMANCE (Last 7 Days):")
recent_date = datetime.now().date() - timedelta(days=7)
recent_successful = 0
recent_total = 0
for date, stats in self.metrics['daily_stats'].items():
if date >= recent_date:
recent_total += stats.get('total', 0)
recent_successful += stats.get('successful', 0)
if recent_total > 0:
recent_success_rate = (recent_successful / recent_total) * 100
print(f" Success Rate: {recent_success_rate:.1f}% ({recent_successful}/{recent_total})")
else:
print(" No recent data available")
print()
# Issues
if metrics['most_common_failure'] != 'none':
print("⚠️ COMMON ISSUES:")
print(f" Most Common Failure: {metrics['most_common_failure']}")
if metrics['most_common_error'] != 'none':
print(f" Most Common Error: {metrics['most_common_error']}")
print()
# Recommendations
recommendations = self.generate_recommendations(metrics)
if recommendations:
print("💡 RECOMMENDATIONS:")
for rec in recommendations[:3]: # Show top 3
priority_emoji = "🔴" if rec['priority'] == 'high' else "🟡" if rec['priority'] == 'medium' else "🟢"
print(f" {priority_emoji} {rec['issue']}")
print(f" {rec['recommendation']}")
print()
def export_csv(self):
"""Export metrics to CSV format"""
import csv
from io import StringIO
output = StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(['Date', 'Total', 'Successful', 'Failed', 'Success Rate', 'Avg Time'])
# Write daily stats
for date, stats in sorted(self.metrics['daily_stats'].items()):
total = stats.get('total', 0)
successful = stats.get('successful', 0)
failed = stats.get('failed', 0)
success_rate = (successful / total * 100) if total > 0 else 0
writer.writerow([
date,
total,
successful,
failed,
f"{success_rate:.1f}%",
"N/A" # Would need to calculate daily average
])
csv_content = output.getvalue()
# Save to file
csv_file = Path('/tmp/aitbc-onboarding-metrics.csv')
with open(csv_file, 'w') as f:
f.write(csv_content)
print(f"📊 Metrics exported to: {csv_file}")
def run_monitoring(self):
"""Run continuous monitoring"""
print("🔍 Starting onboarding monitoring...")
print("Press Ctrl+C to stop monitoring")
print()
try:
while True:
# Load existing data
self.load_existing_data()
# Scan for new reports
reports = self.scan_onboarding_reports()
if reports:
print(f"📊 Processing {len(reports)} new onboarding reports...")
self.analyze_reports(reports)
self.save_metrics()
# Print updated dashboard
self.print_dashboard()
# Wait before next scan
time.sleep(300) # 5 minutes
except KeyboardInterrupt:
print("\n👋 Monitoring stopped by user")
except Exception as e:
logger.error(f"Monitoring error: {e}")
def main():
"""Main entry point"""
monitor = OnboardingMonitor()
# Parse command line arguments
if len(sys.argv) > 1:
command = sys.argv[1]
if command == 'dashboard':
monitor.load_existing_data()
monitor.print_dashboard()
elif command == 'export':
monitor.load_existing_data()
monitor.export_csv()
elif command == 'report':
monitor.load_existing_data()
report = monitor.generate_report()
print(json.dumps(report, indent=2))
elif command == 'monitor':
monitor.run_monitoring()
else:
print("Usage: python3 onboarding-monitor.py [dashboard|export|report|monitor]")
sys.exit(1)
else:
# Default: show dashboard
monitor.load_existing_data()
monitor.print_dashboard()
if __name__ == "__main__":
main()