Files
aitbc/cli/commands/analytics.py
aitbc1 c0952c2525 refactor: flatten CLI directory structure - remove 'box in a box'
BEFORE:
/opt/aitbc/cli/
├── aitbc_cli/                    # Python package (box in a box)
│   ├── commands/
│   ├── main.py
│   └── ...
├── setup.py

AFTER:
/opt/aitbc/cli/                    # Flat structure
├── commands/                      # Direct access
├── main.py                        # Direct access
├── auth/
├── config/
├── core/
├── models/
├── utils/
├── plugins.py
└── setup.py

CHANGES MADE:
- Moved all files from aitbc_cli/ to cli/ root
- Fixed all relative imports (from . to absolute imports)
- Updated setup.py entry point: aitbc_cli.main → main
- Added CLI directory to Python path in entry script
- Simplified deployment.py to remove dependency on deleted core.deployment
- Fixed import paths in all command files
- Recreated virtual environment with new structure

BENEFITS:
- Eliminated 'box in a box' nesting
- Simpler directory structure
- Direct access to all modules
- Cleaner imports
- Easier maintenance and development
- CLI works with both 'python main.py' and 'aitbc' commands
2026-03-26 09:12:02 +01:00

403 lines
18 KiB
Python
Executable File

"""Analytics and monitoring commands for AITBC CLI"""
import click
import asyncio
from datetime import datetime, timedelta
from typing import Optional
from core.config import load_multichain_config
from core.analytics import ChainAnalytics
from utils import output, error, success
@click.group()
def analytics():
"""Chain analytics and monitoring commands"""
pass
@analytics.command()
@click.option('--chain-id', help='Specific chain ID to analyze')
@click.option('--hours', default=24, help='Time range in hours')
@click.option('--format', type=click.Choice(['table', 'json']), default='table', help='Output format')
@click.pass_context
def summary(ctx, chain_id, hours, format):
"""Get performance summary for chains"""
try:
config = load_multichain_config()
analytics = ChainAnalytics(config)
if chain_id:
# Single chain summary
summary = analytics.get_chain_performance_summary(chain_id, hours)
if not summary:
error(f"No data available for chain {chain_id}")
raise click.Abort()
# Format summary for display
summary_data = [
{"Metric": "Chain ID", "Value": summary["chain_id"]},
{"Metric": "Time Range", "Value": f"{summary['time_range_hours']} hours"},
{"Metric": "Data Points", "Value": summary["data_points"]},
{"Metric": "Health Score", "Value": f"{summary['health_score']:.1f}/100"},
{"Metric": "Active Alerts", "Value": summary["active_alerts"]},
{"Metric": "Avg TPS", "Value": f"{summary['statistics']['tps']['avg']:.2f}"},
{"Metric": "Avg Block Time", "Value": f"{summary['statistics']['block_time']['avg']:.2f}s"},
{"Metric": "Avg Gas Price", "Value": f"{summary['statistics']['gas_price']['avg']:,} wei"}
]
output(summary_data, ctx.obj.get('output_format', format), title=f"Chain Summary: {chain_id}")
else:
# Cross-chain analysis
analysis = analytics.get_cross_chain_analysis()
if not analysis:
error("No analytics data available")
raise click.Abort()
# Overview data
overview_data = [
{"Metric": "Total Chains", "Value": analysis["total_chains"]},
{"Metric": "Active Chains", "Value": analysis["active_chains"]},
{"Metric": "Total Alerts", "Value": analysis["alerts_summary"]["total_alerts"]},
{"Metric": "Critical Alerts", "Value": analysis["alerts_summary"]["critical_alerts"]},
{"Metric": "Total Memory Usage", "Value": f"{analysis['resource_usage']['total_memory_mb']:.1f}MB"},
{"Metric": "Total Disk Usage", "Value": f"{analysis['resource_usage']['total_disk_mb']:.1f}MB"},
{"Metric": "Total Clients", "Value": analysis["resource_usage"]["total_clients"]},
{"Metric": "Total Agents", "Value": analysis["resource_usage"]["total_agents"]}
]
output(overview_data, ctx.obj.get('output_format', format), title="Cross-Chain Analysis Overview")
# Performance comparison
if analysis["performance_comparison"]:
comparison_data = [
{
"Chain ID": chain_id,
"TPS": f"{data['tps']:.2f}",
"Block Time": f"{data['block_time']:.2f}s",
"Health Score": f"{data['health_score']:.1f}/100"
}
for chain_id, data in analysis["performance_comparison"].items()
]
output(comparison_data, ctx.obj.get('output_format', format), title="Chain Performance Comparison")
except Exception as e:
error(f"Error getting analytics summary: {str(e)}")
raise click.Abort()
@analytics.command()
@click.option('--realtime', is_flag=True, help='Real-time monitoring')
@click.option('--interval', default=30, help='Update interval in seconds')
@click.option('--chain-id', help='Monitor specific chain')
@click.pass_context
def monitor(ctx, realtime, interval, chain_id):
"""Monitor chain performance in real-time"""
try:
config = load_multichain_config()
analytics = ChainAnalytics(config)
if realtime:
# Real-time monitoring
from rich.console import Console
from rich.live import Live
from rich.table import Table
import time
console = Console()
def generate_monitor_table():
try:
# Collect latest metrics
asyncio.run(analytics.collect_all_metrics())
table = Table(title=f"Chain Monitor - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
table.add_column("Chain ID", style="cyan")
table.add_column("TPS", style="green")
table.add_column("Block Time", style="yellow")
table.add_column("Health", style="red")
table.add_column("Alerts", style="magenta")
if chain_id:
# Single chain monitoring
summary = analytics.get_chain_performance_summary(chain_id, 1)
if summary:
health_color = "green" if summary["health_score"] > 70 else "yellow" if summary["health_score"] > 40 else "red"
table.add_row(
chain_id,
f"{summary['statistics']['tps']['avg']:.2f}",
f"{summary['statistics']['block_time']['avg']:.2f}s",
f"[{health_color}]{summary['health_score']:.1f}[/{health_color}]",
str(summary["active_alerts"])
)
else:
# All chains monitoring
analysis = analytics.get_cross_chain_analysis()
for chain_id, data in analysis["performance_comparison"].items():
health_color = "green" if data["health_score"] > 70 else "yellow" if data["health_score"] > 40 else "red"
table.add_row(
chain_id,
f"{data['tps']:.2f}",
f"{data['block_time']:.2f}s",
f"[{health_color}]{data['health_score']:.1f}[/{health_color}]",
str(len([a for a in analytics.alerts if a.chain_id == chain_id]))
)
return table
except Exception as e:
return f"Error collecting metrics: {e}"
with Live(generate_monitor_table(), refresh_per_second=1) as live:
try:
while True:
live.update(generate_monitor_table())
time.sleep(interval)
except KeyboardInterrupt:
console.print("\n[yellow]Monitoring stopped by user[/yellow]")
else:
# Single snapshot
asyncio.run(analytics.collect_all_metrics())
if chain_id:
summary = analytics.get_chain_performance_summary(chain_id, 1)
if not summary:
error(f"No data available for chain {chain_id}")
raise click.Abort()
monitor_data = [
{"Metric": "Chain ID", "Value": summary["chain_id"]},
{"Metric": "Current TPS", "Value": f"{summary['statistics']['tps']['avg']:.2f}"},
{"Metric": "Current Block Time", "Value": f"{summary['statistics']['block_time']['avg']:.2f}s"},
{"Metric": "Health Score", "Value": f"{summary['health_score']:.1f}/100"},
{"Metric": "Active Alerts", "Value": summary["active_alerts"]},
{"Metric": "Memory Usage", "Value": f"{summary['latest_metrics']['memory_usage_mb']:.1f}MB"},
{"Metric": "Disk Usage", "Value": f"{summary['latest_metrics']['disk_usage_mb']:.1f}MB"},
{"Metric": "Active Nodes", "Value": summary["latest_metrics"]["active_nodes"]},
{"Metric": "Client Count", "Value": summary["latest_metrics"]["client_count"]},
{"Metric": "Agent Count", "Value": summary["latest_metrics"]["agent_count"]}
]
output(monitor_data, ctx.obj.get('output_format', 'table'), title=f"Chain Monitor: {chain_id}")
else:
analysis = analytics.get_cross_chain_analysis()
monitor_data = [
{"Metric": "Total Chains", "Value": analysis["total_chains"]},
{"Metric": "Active Chains", "Value": analysis["active_chains"]},
{"Metric": "Total Memory Usage", "Value": f"{analysis['resource_usage']['total_memory_mb']:.1f}MB"},
{"Metric": "Total Disk Usage", "Value": f"{analysis['resource_usage']['total_disk_mb']:.1f}MB"},
{"Metric": "Total Clients", "Value": analysis["resource_usage"]["total_clients"]},
{"Metric": "Total Agents", "Value": analysis["resource_usage"]["total_agents"]},
{"Metric": "Total Alerts", "Value": analysis["alerts_summary"]["total_alerts"]},
{"Metric": "Critical Alerts", "Value": analysis["alerts_summary"]["critical_alerts"]}
]
output(monitor_data, ctx.obj.get('output_format', 'table'), title="System Monitor")
except Exception as e:
error(f"Error during monitoring: {str(e)}")
raise click.Abort()
@analytics.command()
@click.option('--chain-id', help='Specific chain ID for predictions')
@click.option('--hours', default=24, help='Prediction time horizon in hours')
@click.option('--format', type=click.Choice(['table', 'json']), default='table', help='Output format')
@click.pass_context
def predict(ctx, chain_id, hours, format):
"""Predict chain performance"""
try:
config = load_multichain_config()
analytics = ChainAnalytics(config)
# Collect current metrics first
asyncio.run(analytics.collect_all_metrics())
if chain_id:
# Single chain prediction
predictions = asyncio.run(analytics.predict_chain_performance(chain_id, hours))
if not predictions:
error(f"No prediction data available for chain {chain_id}")
raise click.Abort()
prediction_data = [
{
"Metric": pred.metric,
"Predicted Value": f"{pred.predicted_value:.2f}",
"Confidence": f"{pred.confidence:.1%}",
"Time Horizon": f"{pred.time_horizon_hours}h"
}
for pred in predictions
]
output(prediction_data, ctx.obj.get('output_format', format), title=f"Performance Predictions: {chain_id}")
else:
# All chains prediction
analysis = analytics.get_cross_chain_analysis()
all_predictions = {}
for chain_id in analysis["performance_comparison"].keys():
predictions = asyncio.run(analytics.predict_chain_performance(chain_id, hours))
if predictions:
all_predictions[chain_id] = predictions
if not all_predictions:
error("No prediction data available")
raise click.Abort()
# Format predictions for display
prediction_data = []
for chain_id, predictions in all_predictions.items():
for pred in predictions:
prediction_data.append({
"Chain ID": chain_id,
"Metric": pred.metric,
"Predicted Value": f"{pred.predicted_value:.2f}",
"Confidence": f"{pred.confidence:.1%}",
"Time Horizon": f"{pred.time_horizon_hours}h"
})
output(prediction_data, ctx.obj.get('output_format', format), title="Chain Performance Predictions")
except Exception as e:
error(f"Error generating predictions: {str(e)}")
raise click.Abort()
@analytics.command()
@click.option('--chain-id', help='Specific chain ID for recommendations')
@click.option('--format', type=click.Choice(['table', 'json']), default='table', help='Output format')
@click.pass_context
def optimize(ctx, chain_id, format):
"""Get optimization recommendations"""
try:
config = load_multichain_config()
analytics = ChainAnalytics(config)
# Collect current metrics first
asyncio.run(analytics.collect_all_metrics())
if chain_id:
# Single chain recommendations
recommendations = analytics.get_optimization_recommendations(chain_id)
if not recommendations:
success(f"No optimization recommendations for chain {chain_id}")
return
recommendation_data = [
{
"Type": rec["type"],
"Priority": rec["priority"],
"Issue": rec["issue"],
"Current Value": rec["current_value"],
"Recommended Action": rec["recommended_action"],
"Expected Improvement": rec["expected_improvement"]
}
for rec in recommendations
]
output(recommendation_data, ctx.obj.get('output_format', format), title=f"Optimization Recommendations: {chain_id}")
else:
# All chains recommendations
analysis = analytics.get_cross_chain_analysis()
all_recommendations = {}
for chain_id in analysis["performance_comparison"].keys():
recommendations = analytics.get_optimization_recommendations(chain_id)
if recommendations:
all_recommendations[chain_id] = recommendations
if not all_recommendations:
success("No optimization recommendations available")
return
# Format recommendations for display
recommendation_data = []
for chain_id, recommendations in all_recommendations.items():
for rec in recommendations:
recommendation_data.append({
"Chain ID": chain_id,
"Type": rec["type"],
"Priority": rec["priority"],
"Issue": rec["issue"],
"Current Value": rec["current_value"],
"Recommended Action": rec["recommended_action"]
})
output(recommendation_data, ctx.obj.get('output_format', format), title="Chain Optimization Recommendations")
except Exception as e:
error(f"Error getting optimization recommendations: {str(e)}")
raise click.Abort()
@analytics.command()
@click.option('--severity', type=click.Choice(['all', 'critical', 'warning']), default='all', help='Alert severity filter')
@click.option('--hours', default=24, help='Time range in hours')
@click.option('--format', type=click.Choice(['table', 'json']), default='table', help='Output format')
@click.pass_context
def alerts(ctx, severity, hours, format):
"""View performance alerts"""
try:
config = load_multichain_config()
analytics = ChainAnalytics(config)
# Collect current metrics first
asyncio.run(analytics.collect_all_metrics())
# Filter alerts
cutoff_time = datetime.now() - timedelta(hours=hours)
filtered_alerts = [
alert for alert in analytics.alerts
if alert.timestamp >= cutoff_time
]
if severity != 'all':
filtered_alerts = [a for a in filtered_alerts if a.severity == severity]
if not filtered_alerts:
success("No alerts found")
return
alert_data = [
{
"Chain ID": alert.chain_id,
"Type": alert.alert_type,
"Severity": alert.severity,
"Message": alert.message,
"Current Value": f"{alert.current_value:.2f}",
"Threshold": f"{alert.threshold:.2f}",
"Time": alert.timestamp.strftime("%Y-%m-%d %H:%M:%S")
}
for alert in filtered_alerts
]
output(alert_data, ctx.obj.get('output_format', format), title=f"Performance Alerts (Last {hours}h)")
except Exception as e:
error(f"Error getting alerts: {str(e)}")
raise click.Abort()
@analytics.command()
@click.option('--format', type=click.Choice(['json']), default='json', help='Output format')
@click.pass_context
def dashboard(ctx, format):
"""Get complete dashboard data"""
try:
config = load_multichain_config()
analytics = ChainAnalytics(config)
# Collect current metrics
asyncio.run(analytics.collect_all_metrics())
# Get dashboard data
dashboard_data = analytics.get_dashboard_data()
if format == 'json':
import json
click.echo(json.dumps(dashboard_data, indent=2, default=str))
else:
error("Dashboard data only available in JSON format")
raise click.Abort()
except Exception as e:
error(f"Error getting dashboard data: {str(e)}")
raise click.Abort()