Files
aitbc/cli/handlers/workflow.py
aitbc b58ca5db7c
Some checks failed
Blockchain Synchronization Verification / sync-verification (push) Waiting to run
Cross-Chain Functionality Tests / test-cross-chain-sync (push) Waiting to run
Cross-Chain Functionality Tests / test-cross-chain-transactions (push) Waiting to run
Cross-Chain Functionality Tests / test-multi-chain-consensus (push) Waiting to run
Cross-Chain Functionality Tests / aggregate-results (push) Blocked by required conditions
Multi-Chain Island Architecture Tests / test-multi-chain-island (push) Waiting to run
Multi-Node Blockchain Health Monitoring / health-check (push) Waiting to run
Node Failover Simulation / failover-test (push) Waiting to run
P2P Network Verification / p2p-verification (push) Waiting to run
API Endpoint Tests / test-api-endpoints (push) Has been cancelled
Coverage Phase 1 (70% Target) / test-coverage-70 (push) Has been cancelled
Coverage Phase 2 (85% Target) / test-coverage-85 (push) Has been cancelled
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Production Tests / Production Integration Tests (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
CLI Tests / test-cli (push) Has been cancelled
Remove zero-address dev mode fallback and harden security: update JWT secret, enforce authentication, add SSL verification option, implement actual resource handlers with system metrics
2026-05-27 12:20:07 +02:00

130 lines
4.1 KiB
Python

"""Workflow command handlers for AITBC CLI."""
import json
import logging
import requests
from datetime import datetime
logger = logging.getLogger(__name__)
COORDINATOR_URL = "http://localhost:8011"
CLIENT_API_KEY = "aitbc-client-key-secure-token-production"
def handle_workflow_create(args, render_mapping):
"""Handle workflow create command - creates an AI job as a workflow."""
name = getattr(args, "name", None) or "unnamed-workflow"
template = getattr(args, "template", "custom")
model = getattr(args, "model", "llama2:7b")
prompt = getattr(args, "prompt", "Hello")
# Create a job through the coordinator API
job_data = {
"payload": {
"type": "inference",
"model": model,
"prompt": prompt
},
"constraints": {
"max_price": 0.1,
"region": "localhost"
},
"ttl_seconds": 900
}
headers = {
"X-Api-Key": CLIENT_API_KEY,
"Content-Type": "application/json"
}
try:
response = requests.post(
f"{COORDINATOR_URL}/v1/jobs",
json=job_data,
headers=headers,
timeout=10
)
response.raise_for_status()
result = response.json()
workflow_data = {
"workflow_id": result.get("job_id"),
"name": name,
"template": template,
"status": "created",
"model": model,
"estimated_duration": "1-2 minutes"
}
logger.info(f"Workflow created: {workflow_data['workflow_id']}")
render_mapping("Workflow:", workflow_data)
except Exception as e:
logger.error(f"Failed to create workflow: {e}")
render_mapping("Error:", {"message": str(e)})
def handle_workflow_schedule(args, render_mapping):
"""Handle workflow schedule command - schedules recurring AI jobs."""
name = getattr(args, "name", None)
cron = getattr(args, "cron", None)
command = getattr(args, "command", None)
# For now, return scheduling info (actual scheduling would require a scheduler service)
schedule_data = {
"schedule_id": f"schedule_{int(datetime.now().timestamp())}",
"workflow_name": name,
"cron_expression": cron,
"command": command,
"status": "scheduled",
"next_run": "pending",
"note": "Scheduler service integration required for actual execution"
}
logger.info(f"Workflow scheduled: {schedule_data['schedule_id']}")
render_mapping("Schedule:", schedule_data)
def handle_workflow_monitor(args, output_format, render_mapping):
"""Handle workflow monitor command - monitors job status through coordinator."""
name = getattr(args, "name", None)
headers = {
"X-Api-Key": CLIENT_API_KEY,
"Content-Type": "application/json"
}
try:
response = requests.get(
f"{COORDINATOR_URL}/v1/jobs",
headers=headers,
timeout=10
)
response.raise_for_status()
result = response.json()
jobs = result.get("items", [])
running = sum(1 for j in jobs if j.get("state") == "RUNNING")
completed = sum(1 for j in jobs if j.get("state") == "COMPLETED")
failed = sum(1 for j in jobs if j.get("state") == "FAILED")
monitor_data = {
"status": "active",
"workflows_running": running,
"workflows_completed": completed,
"workflows_failed": failed,
"total_jobs": len(jobs),
"last_check": datetime.now().isoformat()
}
if output_format(args) == "json":
logger.info(json.dumps(monitor_data, indent=2))
else:
render_mapping("Workflow Monitor:", monitor_data)
except Exception as e:
logger.error(f"Failed to monitor workflows: {e}")
monitor_data = {
"status": "error",
"message": str(e),
"last_check": datetime.now().isoformat()
}
render_mapping("Workflow Monitor:", monitor_data)