Files
aitbc/cli/commands/hermes.py
aitbc 852f2e5a8a
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Documentation Validation / validate-docs (push) Has been cancelled
Documentation Validation / validate-policies-strict (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
CLI Tests / test-cli (push) Has been cancelled
Blockchain Synchronization Verification / sync-verification (push) Successful in 11s
Contract Performance Benchmarks / benchmark-gas-usage (push) Successful in 1m36s
Contract Performance Benchmarks / benchmark-execution-time (push) Successful in 1m24s
Contract Performance Benchmarks / benchmark-throughput (push) Successful in 1m25s
Cross-Chain Functionality Tests / test-cross-chain-sync (push) Successful in 2s
Cross-Chain Functionality Tests / test-cross-chain-transactions (push) Successful in 5s
Cross-Chain Functionality Tests / test-cross-chain-bridge (push) Has been skipped
Cross-Chain Functionality Tests / test-multi-chain-consensus (push) Successful in 3s
Cross-Chain Functionality Tests / aggregate-results (push) Has been skipped
Multi-Chain Island Architecture Tests / test-multi-chain-island (push) Successful in 2s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 3s
P2P Network Verification / p2p-verification (push) Successful in 2s
Smart Contract Tests / test-solidity (map[name:aitbc-contracts path:contracts]) (push) Failing after 1m28s
Smart Contract Tests / test-solidity (map[name:aitbc-token path:packages/solidity/aitbc-token]) (push) Successful in 21s
Smart Contract Tests / test-foundry (push) Failing after 20s
Smart Contract Tests / lint-solidity (push) Successful in 30s
Smart Contract Tests / deploy-contracts (push) Successful in 1m40s
Systemd Sync / sync-systemd (push) Successful in 26s
Contract Performance Benchmarks / compare-benchmarks (push) Successful in 4s
Rename openclaw to hermes across documentation and workflows
- Update workflow paths from docs/openclaw to docs/hermes
- Rename skill prefixes from openclaw-* to hermes-*
- Update agent skill references in refactoring and analysis docs
- Rename OPENCLAW_AITBC_MASTERY_PLAN.md to reflect hermes branding
- Update CLI examples and command references throughout documentation
2026-05-07 11:42:06 +02:00

902 lines
32 KiB
Python
Executable File

"""hermes integration commands for AITBC CLI"""
import click
import httpx
import json
import time
import os
import datetime
import subprocess
from typing import Optional, Dict, Any, List
from utils import output, error, success, warning
@click.group()
def hermes():
"""hermes integration with edge computing deployment"""
pass
@click.group()
def deploy():
"""Agent deployment operations"""
pass
hermes.add_command(deploy)
@deploy.command()
@click.argument("agent_id")
@click.option("--region", required=True, help="Deployment region")
@click.option("--instances", default=1, help="Number of instances to deploy")
@click.option("--instance-type", default="standard", help="Instance type")
@click.option("--edge-locations", help="Comma-separated edge locations")
@click.option("--auto-scale", is_flag=True, help="Enable auto-scaling")
@click.pass_context
def deploy_agent(ctx, agent_id: str, region: str, instances: int, instance_type: str,
edge_locations: Optional[str], auto_scale: bool):
"""Deploy agent to hermes network"""
config = ctx.obj['config']
deployment_data = {
"agent_id": agent_id,
"region": region,
"instances": instances,
"instance_type": instance_type,
"auto_scale": auto_scale
}
if edge_locations:
deployment_data["edge_locations"] = [loc.strip() for loc in edge_locations.split(',')]
try:
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/hermes/deploy",
headers={"X-Api-Key": config.api_key or ""},
json=deployment_data
)
if response.status_code == 202:
deployment = response.json()
success(f"Agent deployment started: {deployment['id']}")
output(deployment, ctx.obj['output_format'])
else:
error(f"Failed to start deployment: {response.status_code}")
if response.text:
error(response.text)
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@click.argument("deployment_id")
@click.option("--instances", required=True, type=int, help="New number of instances")
@click.option("--auto-scale", is_flag=True, help="Enable auto-scaling")
@click.option("--min-instances", default=1, help="Minimum instances for auto-scaling")
@click.option("--max-instances", default=10, help="Maximum instances for auto-scaling")
@click.pass_context
def scale(ctx, deployment_id: str, instances: int, auto_scale: bool, min_instances: int, max_instances: int):
"""Scale agent deployment"""
config = ctx.obj['config']
scale_data = {
"instances": instances,
"auto_scale": auto_scale,
"min_instances": min_instances,
"max_instances": max_instances
}
try:
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/hermes/deployments/{deployment_id}/scale",
headers={"X-Api-Key": config.api_key or ""},
json=scale_data
)
if response.status_code == 200:
result = response.json()
success(f"Deployment scaled successfully")
output(result, ctx.obj['output_format'])
else:
error(f"Failed to scale deployment: {response.status_code}")
if response.text:
error(response.text)
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@deploy.command()
@click.argument("deployment_id")
@click.option("--objective", default="cost",
type=click.Choice(["cost", "performance", "latency", "efficiency"]),
help="Optimization objective")
@click.pass_context
def optimize(ctx, deployment_id: str, objective: str):
"""Optimize agent deployment"""
config = ctx.obj['config']
optimization_data = {"objective": objective}
try:
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/hermes/deployments/{deployment_id}/optimize",
headers={"X-Api-Key": config.api_key or ""},
json=optimization_data
)
if response.status_code == 200:
result = response.json()
success(f"Deployment optimization completed")
output(result, ctx.obj['output_format'])
else:
error(f"Failed to optimize deployment: {response.status_code}")
if response.text:
error(response.text)
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@click.group()
def monitor():
"""hermes monitoring operations"""
pass
hermes.add_command(monitor)
@monitor.command()
@click.argument("deployment_id")
@click.option("--metrics", default="latency,cost", help="Comma-separated metrics to monitor")
@click.option("--real-time", is_flag=True, help="Show real-time metrics")
@click.option("--interval", default=10, help="Update interval for real-time monitoring")
@click.pass_context
def monitor_metrics(ctx, deployment_id: str, metrics: str, real_time: bool, interval: int):
"""Monitor hermes agent performance"""
config = ctx.obj['config']
params = {"metrics": [m.strip() for m in metrics.split(',')]}
def get_metrics():
try:
with httpx.Client() as client:
response = client.get(
f"{config.coordinator_url}/hermes/deployments/{deployment_id}/metrics",
headers={"X-Api-Key": config.api_key or ""},
params=params
)
if response.status_code == 200:
return response.json()
else:
error(f"Failed to get metrics: {response.status_code}")
return None
except Exception as e:
error(f"Network error: {e}")
return None
if real_time:
click.echo(f"Monitoring deployment {deployment_id} (Ctrl+C to stop)...")
while True:
metrics_data = get_metrics()
if metrics_data:
click.clear()
click.echo(f"Deployment ID: {deployment_id}")
click.echo(f"Status: {metrics_data.get('status', 'Unknown')}")
click.echo(f"Instances: {metrics_data.get('instances', 'N/A')}")
metrics_list = metrics_data.get('metrics', {})
for metric in [m.strip() for m in metrics.split(',')]:
if metric in metrics_list:
value = metrics_list[metric]
click.echo(f"{metric.title()}: {value}")
if metrics_data.get('status') in ['terminated', 'failed']:
break
time.sleep(interval)
else:
metrics_data = get_metrics()
if metrics_data:
output(metrics_data, ctx.obj['output_format'])
@monitor.command()
@click.argument("deployment_id")
@click.pass_context
def status(ctx, deployment_id: str):
"""Get deployment status"""
config = ctx.obj['config']
try:
with httpx.Client() as client:
response = client.get(
f"{config.coordinator_url}/hermes/deployments/{deployment_id}/status",
headers={"X-Api-Key": config.api_key or ""}
)
if response.status_code == 200:
status_data = response.json()
output(status_data, ctx.obj['output_format'])
else:
error(f"Failed to get deployment status: {response.status_code}")
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@click.group()
def edge():
"""Edge computing operations"""
pass
hermes.add_command(edge)
@edge.command()
@click.argument("agent_id")
@click.option("--locations", required=True, help="Comma-separated edge locations")
@click.option("--strategy", default="latency",
type=click.Choice(["latency", "cost", "availability", "hybrid"]),
help="Edge deployment strategy")
@click.option("--replicas", default=1, help="Number of replicas per location")
@click.pass_context
def deploy(ctx, agent_id: str, locations: str, strategy: str, replicas: int):
"""Deploy agent to edge locations"""
config = ctx.obj['config']
edge_data = {
"agent_id": agent_id,
"locations": [loc.strip() for loc in locations.split(',')],
"strategy": strategy,
"replicas": replicas
}
try:
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/hermes/edge/deploy",
headers={"X-Api-Key": config.api_key or ""},
json=edge_data
)
if response.status_code == 202:
deployment = response.json()
success(f"Edge deployment started: {deployment['id']}")
output(deployment, ctx.obj['output_format'])
else:
error(f"Failed to start edge deployment: {response.status_code}")
if response.text:
error(response.text)
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@edge.command()
@click.option("--location", help="Filter by location")
@click.pass_context
def resources(ctx, location: Optional[str]):
"""Manage edge resources"""
config = ctx.obj['config']
params = {}
if location:
params["location"] = location
try:
with httpx.Client() as client:
response = client.get(
f"{config.coordinator_url}/hermes/edge/resources",
headers={"X-Api-Key": config.api_key or ""},
params=params
)
if response.status_code == 200:
resources = response.json()
output(resources, ctx.obj['output_format'])
else:
error(f"Failed to get edge resources: {response.status_code}")
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@edge.command()
@click.argument("deployment_id")
@click.option("--latency-target", type=int, help="Target latency in milliseconds")
@click.option("--cost-budget", type=float, help="Cost budget")
@click.option("--availability", type=float, help="Target availability (0.0-1.0)")
@click.pass_context
def optimize(ctx, deployment_id: str, latency_target: Optional[int],
cost_budget: Optional[float], availability: Optional[float]):
"""Optimize edge deployment performance"""
config = ctx.obj['config']
optimization_data = {}
if latency_target:
optimization_data["latency_target_ms"] = latency_target
if cost_budget:
optimization_data["cost_budget"] = cost_budget
if availability:
optimization_data["availability_target"] = availability
try:
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/hermes/edge/deployments/{deployment_id}/optimize",
headers={"X-Api-Key": config.api_key or ""},
json=optimization_data
)
if response.status_code == 200:
result = response.json()
success(f"Edge optimization completed")
output(result, ctx.obj['output_format'])
else:
error(f"Failed to optimize edge deployment: {response.status_code}")
if response.text:
error(response.text)
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@edge.command()
@click.argument("deployment_id")
@click.option("--standards", help="Comma-separated compliance standards")
@click.pass_context
def compliance(ctx, deployment_id: str, standards: Optional[str]):
"""Check edge security compliance"""
config = ctx.obj['config']
params = {}
if standards:
params["standards"] = [s.strip() for s in standards.split(',')]
try:
with httpx.Client() as client:
response = client.get(
f"{config.coordinator_url}/hermes/edge/deployments/{deployment_id}/compliance",
headers={"X-Api-Key": config.api_key or ""},
params=params
)
if response.status_code == 200:
compliance_data = response.json()
output(compliance_data, ctx.obj['output_format'])
else:
error(f"Failed to check compliance: {response.status_code}")
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@click.group()
def routing():
"""Agent skill routing and job offloading"""
pass
hermes.add_command(routing)
@routing.command()
@click.argument("deployment_id")
@click.option("--algorithm", default="load-balanced",
type=click.Choice(["load-balanced", "skill-based", "cost-based", "latency-based"]),
help="Routing algorithm")
@click.option("--weights", help="Comma-separated weights for routing factors")
@click.pass_context
def optimize(ctx, deployment_id: str, algorithm: str, weights: Optional[str]):
"""Optimize agent skill routing"""
config = ctx.obj['config']
routing_data = {"algorithm": algorithm}
if weights:
routing_data["weights"] = [w.strip() for w in weights.split(',')]
try:
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/hermes/routing/deployments/{deployment_id}/optimize",
headers={"X-Api-Key": config.api_key or ""},
json=routing_data
)
if response.status_code == 200:
result = response.json()
success(f"Routing optimization completed")
output(result, ctx.obj['output_format'])
else:
error(f"Failed to optimize routing: {response.status_code}")
if response.text:
error(response.text)
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@routing.command()
@click.argument("deployment_id")
@click.pass_context
def status(ctx, deployment_id: str):
"""Get routing status and statistics"""
config = ctx.obj['config']
try:
with httpx.Client() as client:
response = client.get(
f"{config.coordinator_url}/hermes/routing/deployments/{deployment_id}/status",
headers={"X-Api-Key": config.api_key or ""}
)
if response.status_code == 200:
status_data = response.json()
output(status_data, ctx.obj['output_format'])
else:
error(f"Failed to get routing status: {response.status_code}")
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@click.group()
def ecosystem():
"""hermes ecosystem development"""
pass
hermes.add_command(ecosystem)
@click.group()
def train():
"""Agent training operations"""
pass
hermes.add_command(train)
@ecosystem.command()
@click.option("--name", required=True, help="Solution name")
@click.option("--type", required=True,
type=click.Choice(["agent", "workflow", "integration", "tool"]),
help="Solution type")
@click.option("--description", default="", help="Solution description")
@click.option("--package", type=click.File('rb'), help="Solution package file")
@click.pass_context
def create(ctx, name: str, type: str, description: str, package):
"""Create hermes ecosystem solution"""
config = ctx.obj['config']
solution_data = {
"name": name,
"type": type,
"description": description
}
files = {}
if package:
files["package"] = package.read()
try:
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/hermes/ecosystem/solutions",
headers={"X-Api-Key": config.api_key or ""},
data=solution_data,
files=files
)
if response.status_code == 201:
solution = response.json()
success(f"hermes solution created: {solution['id']}")
output(solution, ctx.obj['output_format'])
else:
error(f"Failed to create solution: {response.status_code}")
if response.text:
error(response.text)
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@ecosystem.command()
@click.option("--type", help="Filter by solution type")
@click.option("--category", help="Filter by category")
@click.option("--limit", default=20, help="Number of solutions to list")
@click.pass_context
def list(ctx, type: Optional[str], category: Optional[str], limit: int):
"""List hermes ecosystem solutions"""
config = ctx.obj['config']
params = {"limit": limit}
if type:
params["type"] = type
if category:
params["category"] = category
try:
with httpx.Client() as client:
response = client.get(
f"{config.coordinator_url}/hermes/ecosystem/solutions",
headers={"X-Api-Key": config.api_key or ""},
params=params
)
if response.status_code == 200:
solutions = response.json()
output(solutions, ctx.obj['output_format'])
else:
error(f"Failed to list solutions: {response.status_code}")
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@ecosystem.command()
@click.argument("solution_id")
@click.pass_context
def install(ctx, solution_id: str):
"""Install hermes ecosystem solution"""
config = ctx.obj['config']
try:
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/hermes/ecosystem/solutions/{solution_id}/install",
headers={"X-Api-Key": config.api_key or ""}
)
if response.status_code == 200:
result = response.json()
success(f"Solution installed successfully")
output(result, ctx.obj['output_format'])
else:
error(f"Failed to install solution: {response.status_code}")
if response.text:
error(response.text)
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)
@train.command()
@click.option("--agent-id", required=True, help="Agent ID to train")
@click.option("--stage", required=True, help="Training stage (stage1_foundation, stage2_operations_mastery, etc.)")
@click.option("--training-data", required=True, type=click.Path(exists=True), help="Path to training data JSON file")
@click.option("--log-level", default="INFO", type=click.Choice(["DEBUG", "INFO", "SUCCESS", "WARNING", "ERROR"]), help="Logging level")
@click.pass_context
def agent(ctx, agent_id: str, stage: str, training_data: str, log_level: str):
"""Train hermes agent on AITBC operations"""
config = ctx.obj['config']
# Load training data
try:
with open(training_data, 'r') as f:
training_config = json.load(f)
except Exception as e:
error(f"Failed to load training data: {e}")
ctx.exit(1)
# Validate training data matches stage
if training_config.get('stage') != stage:
error(f"Training data stage mismatch: expected {stage}, got {training_config.get('stage')}")
ctx.exit(1)
# Initialize logging
log_dir = "/var/log/aitbc/agent-training"
os.makedirs(log_dir, exist_ok=True)
log_file = f"{log_dir}/agent_{agent_id}_{stage}_{int(time.time())}.log"
def log_entry(level: str, message: str, **kwargs):
timestamp = datetime.datetime.now().isoformat()
log_entry_data = {
"timestamp": timestamp,
"agent_id": agent_id,
"stage": stage,
"level": level,
"message": message,
**kwargs
}
with open(log_file, 'a') as f:
f.write(json.dumps(log_entry_data) + '\n')
if log_level == "DEBUG" or (log_level == "INFO" and level in ["INFO", "SUCCESS", "WARNING", "ERROR"]) or (log_level == "SUCCESS" and level in ["SUCCESS", "ERROR"]) or (log_level == "WARNING" and level in ["WARNING", "ERROR"]) or (log_level == "ERROR" and level == "ERROR"):
click.echo(f"[{timestamp}] [{level}] {message}")
log_entry("INFO", f"Starting agent training for {agent_id} on stage {stage}")
log_entry("INFO", f"Training data loaded from {training_data}")
# Execute training operations
operations = training_config.get('training_data', {}).get('operations', [])
completed_ops = 0
failed_ops = 0
for i, op in enumerate(operations, 1):
op_name = op.get('operation')
parameters = op.get('parameters', {})
expected_result = op.get('expected_result', {})
success_criteria = op.get('success_criteria', {})
log_entry("INFO", f"Executing operation {i}/{len(operations)}: {op_name}", operation=op_name, parameters=parameters)
start_time = time.time()
try:
# Simulate operation execution (replace with actual CLI calls)
# This would call the actual AITBC CLI commands
result = {
"status": "success",
"operation": op_name,
"timestamp": datetime.datetime.now().isoformat()
}
duration_ms = int((time.time() - start_time) * 1000)
# Check success criteria
success = True
if success_criteria.get('status') and result.get('status') != success_criteria['status']:
success = False
if success_criteria.get('performance', {}).get('max_duration_ms') and duration_ms > success_criteria['performance']['max_duration_ms']:
success = False
if success:
completed_ops += 1
log_entry("SUCCESS", f"Operation {op_name} completed", operation=op_name, duration_ms=duration_ms, result=result)
else:
failed_ops += 1
log_entry("WARNING", f"Operation {op_name} completed but did not meet success criteria", operation=op_name, duration_ms=duration_ms, result=result, success_criteria=success_criteria)
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
failed_ops += 1
log_entry("ERROR", f"Operation {op_name} failed: {e}", operation=op_name, duration_ms=duration_ms, error=str(e))
# Summary
total_ops = len(operations)
success_rate = (completed_ops / total_ops * 100) if total_ops > 0 else 0
log_entry("INFO", f"Training completed: {completed_ops}/{total_ops} operations successful ({success_rate:.1f}%)",
total_operations=total_ops, completed=completed_ops, failed=failed_ops, success_rate=success_rate)
success(f"Agent training completed: {completed_ops}/{total_ops} operations successful")
output({
"agent_id": agent_id,
"stage": stage,
"total_operations": total_ops,
"completed": completed_ops,
"failed": failed_ops,
"success_rate": f"{success_rate:.1f}%",
"log_file": log_file
}, ctx.obj['output_format'])
@train.command()
@click.option("--agent-id", required=True, help="Agent ID to validate")
@click.option("--stage", required=True, help="Training stage to validate")
@click.pass_context
def validate(ctx, agent_id: str, stage: str):
"""Validate agent training progress"""
config = ctx.obj['config']
# Load training data for validation
training_data_path = f"/opt/aitbc/docs/agent-training/{stage}.json"
try:
with open(training_data_path, 'r') as f:
training_config = json.load(f)
except Exception as e:
error(f"Failed to load training data: {e}")
ctx.exit(1)
# Run exam tests
exam_tests = training_config.get('validation', {}).get('exam_tests', [])
passing_score = training_config.get('validation', {}).get('passing_score', 80)
click.echo(f"Running {len(exam_tests)} exam tests for agent {agent_id} on stage {stage}...")
passed_tests = 0
total_weight = sum(test.get('weight', 1) for test in exam_tests)
earned_weight = 0
test_results = []
for i, test in enumerate(exam_tests, 1):
test_name = test.get('test_name')
operation = test.get('operation')
test_case = test.get('test_case', {})
expected_output = test.get('expected_output', {})
weight = test.get('weight', 1)
click.echo(f"Test {i}/{len(exam_tests)}: {test_name} (weight: {weight})")
# Execute actual operation and validate against expected output
try:
# Map operation names to CLI commands
operation_mapping = {
"wallet_create": "wallet create",
"wallet_balance": "wallet balance",
"blockchain_status": "blockchain status",
"service_status": "system status"
}
cli_command = operation_mapping.get(operation, operation)
if not cli_command:
warning(f"Operation {operation} not mapped to CLI command")
test_passed = False
else:
# Build CLI command arguments
cmd_args = []
for key, value in test_case.items():
if key == "wallet":
cmd_args.append(value)
elif key == "password":
cmd_args.append(value)
elif key == "name":
cmd_args.extend(["--name", value])
elif key == "service":
cmd_args.extend(["--service", value])
# Execute CLI command
try:
result = subprocess.run(
["python", "/opt/aitbc/cli/unified_cli.py", cli_command] + cmd_args,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
# Parse output and validate against expected output
# For now, consider it passed if command executed successfully
test_passed = True
success(f"Test passed: {test_name}")
else:
test_passed = False
error(f"Test failed: {test_name} - CLI command failed")
except subprocess.TimeoutExpired:
test_passed = False
error(f"Test failed: {test_name} - Command timeout")
except Exception as e:
test_passed = False
error(f"Test failed: {test_name} - {e}")
except Exception as e:
test_passed = False
error(f"Test failed: {test_name} - {e}")
if test_passed:
passed_tests += 1
earned_weight += weight
test_results.append({
"test_name": test_name,
"operation": operation,
"passed": test_passed,
"weight": weight
})
score = (earned_weight / total_weight * 100) if total_weight > 0 else 0
if score >= passing_score:
success(f"Validation passed: {score:.1f}% (required: {passing_score}%)")
else:
error(f"Validation failed: {score:.1f}% (required: {passing_score}%)")
ctx.exit(1)
output({
"agent_id": agent_id,
"stage": stage,
"total_tests": len(exam_tests),
"passed_tests": passed_tests,
"score": f"{score:.1f}%",
"passing_score": f"{passing_score}%",
"validation": "passed" if score >= passing_score else "failed",
"test_results": test_results
}, ctx.obj['output_format'])
@train.command()
@click.option("--agent-id", required=True, help="Agent ID to certify")
@click.pass_context
def certify(ctx, agent_id: str):
"""Certify agent mastery"""
config = ctx.obj['config']
click.echo(f"Certifying agent {agent_id}...")
# Check all stages
stages = [
"stage1_foundation",
"stage2_operations_mastery",
"stage3_ai_operations",
"stage4_marketplace_economics",
"stage5_expert_operations",
"stage6_agent_identity_sdk",
"stage7_cross_node_training"
]
certified_stages = []
failed_stages = []
for stage in stages:
click.echo(f"Checking {stage}...")
# Simulate stage validation (replace with actual validation)
stage_passed = True # Placeholder
if stage_passed:
certified_stages.append(stage)
success(f"Stage certified: {stage}")
else:
failed_stages.append(stage)
warning(f"Stage not certified: {stage}")
if len(failed_stages) == 0:
success(f"Agent {agent_id} fully certified!")
certification_status = "fully_certified"
elif len(certified_stages) > 0:
warning(f"Agent {agent_id} partially certified ({len(certified_stages)}/{len(stages)} stages)")
certification_status = "partially_certified"
else:
error(f"Agent {agent_id} not certified")
ctx.exit(1)
output({
"agent_id": agent_id,
"certification_status": certification_status,
"certified_stages": certified_stages,
"failed_stages": failed_stages,
"total_stages": len(stages),
"certified_count": len(certified_stages)
}, ctx.obj['output_format'])
@hermes.command()
@click.argument("deployment_id")
@click.pass_context
def terminate(ctx, deployment_id: str):
"""Terminate hermes deployment"""
config = ctx.obj['config']
if not click.confirm(f"Terminate deployment {deployment_id}? This action cannot be undone."):
click.echo("Operation cancelled")
return
try:
with httpx.Client() as client:
response = client.delete(
f"{config.coordinator_url}/hermes/deployments/{deployment_id}",
headers={"X-Api-Key": config.api_key or ""}
)
if response.status_code == 200:
result = response.json()
success(f"Deployment {deployment_id} terminated")
output(result, ctx.obj['output_format'])
else:
error(f"Failed to terminate deployment: {response.status_code}")
if response.text:
error(response.text)
ctx.exit(1)
except Exception as e:
error(f"Network error: {e}")
ctx.exit(1)