Integrate agent coordinator API calls into CLI for agent registration, discovery, and task distribution stats

- Add handle_ai_distribution_stats() to query task distribution statistics from coordinator
- Update handle_agent_sdk_action() register to call /agents/register endpoint with full registration data
- Update handle_agent_sdk_action() list to call /agents/discover endpoint with filtering
- Update handle_agent_sdk_action() status to call /agents/{agent_id} endpoint
- Add handle_agent_sdk_action() update-status to call /agents/{agent_id}/status endpoint
- Add CLI
This commit is contained in:
aitbc
2026-05-07 18:41:03 +02:00
parent 8d79522e4b
commit 5343d20f6d
6 changed files with 343 additions and 25 deletions

View File

@@ -0,0 +1,174 @@
# AITBC Training - Agent Coordinator Architecture
## Important: Real Coordinator Location
The actual AITBC Agent Coordinator service is located at:
- **Path:** `/opt/aitbc/apps/agent-coordinator/src/app/`
- **Port:** 9001
- **Service:** `aitbc-agent-coordinator.service`
## DO NOT Use
- **Wrong location:** `/opt/aitbc/apps/agent-services/agent-coordinator/src/coordinator.py`
- This is a different/older implementation and is NOT the active service
## Key Components
### Core Files
- `agent_discovery.py` - Redis-backed agent registry with persistence
- `load_balancer.py` - Load balancer with multiple strategies (least_connections, round_robin, etc.)
- `routers/agents.py` - Agent management REST API endpoints
- `routers/tasks.py` - Task submission and distribution API endpoints
- `lifespan.py` - Service initialization and component startup
- `state.py` - Global state management for coordinator components
### Service Initialization
The service initializes in `lifespan.py`:
1. Creates `AgentRegistry()` with Redis backing
2. Starts registry Redis connection
3. Creates `LoadBalancer(registry)` with least_connections strategy
4. Creates `TaskDistributor(balancer)` with priority queues
5. Starts background task distribution loop
## Agent Registration
### API Endpoint
```
POST /agents/register
```
### Example
```bash
curl -X POST http://localhost:9001/agents/register \
-H "Content-Type: application/json" \
-d '{
"agent_id": "hermes-agent",
"agent_type": "worker",
"capabilities": ["data-processing", "analysis", "general"],
"services": ["task-execution", "analysis"],
"endpoints": {"http": "http://localhost:9002"},
"metadata": {"version": "1.0.0"}
}'
```
### Response
```json
{
"status": "success",
"message": "Agent hermes-agent registered successfully",
"agent_id": "hermes-agent",
"registered_at": "2026-05-07T16:26:55.464178+00:00"
}
```
## Task Distribution
### API Endpoint
```
POST /tasks/submit
```
### Example
```bash
curl -X POST http://localhost:9001/tasks/submit \
-H "Content-Type: application/json" \
-d '{
"task_data": {
"model": "llama2",
"prompt": "test prompt"
},
"priority": "normal",
"requirements": {}
}'
```
### Distribution Flow
1. Task submitted to `TaskDistributor`
2. Distributor finds eligible agents via `AgentRegistry.discover_agents()`
3. Load balancer selects agent using configured strategy (default: least_connections)
4. Task assigned to selected agent
5. Agent metrics updated in Redis
## Task Status
### API Endpoint
```
GET /tasks/status
```
### Example
```bash
curl http://localhost:9001/tasks/status
```
### Response
```json
{
"status": "success",
"stats": {
"tasks_distributed": 1,
"tasks_completed": 1,
"tasks_failed": 0,
"load_balancer_stats": {
"strategy": "least_connections",
"active_agents": 1,
"total_assignments": 1,
"avg_agent_load": 1
}
}
}
```
## Agent Discovery
### API Endpoint
```
POST /agents/discover
```
### Example
```bash
curl -X POST http://localhost:9001/agents/discover \
-H "Content-Type: application/json" \
-d '{
"status": "active",
"agent_type": "worker"
}'
```
## Redis Persistence
The agent registry uses Redis for persistence:
- Agent data stored as hashes: `agent:{agent_id}`
- Active agents indexed in set: `agents:active`
- Load metrics tracked per agent
- Health scores calculated from heartbeats
## Service Status
### Health Check
```bash
curl http://localhost:9001/health
```
### Service Management
```bash
systemctl status aitbc-agent-coordinator.service
systemctl restart aitbc-agent-coordinator.service
journalctl -u aitbc-agent-coordinator.service -f
```
## Cross-Node Distribution
For multi-node setups, register agents on each node:
```bash
# Register agent on aitbc1
curl -X POST http://aitbc1:9001/agents/register \
-d '{"agent_id":"aitbc1-agent", ...}'
# Submit task on localhost
curl -X POST http://localhost:9001/tasks/submit \
-d '{"task_data":{...}}'
# Task will be distributed to any active agent across nodes
```

View File

@@ -203,6 +203,28 @@ def handle_ai_stats(args, default_rpc_url, output_format, render_mapping):
sys.exit(1)
def handle_ai_distribution_stats(args, default_coordinator_url, output_format, render_mapping):
"""Handle task distribution statistics query from agent coordinator."""
coordinator_url = getattr(args, 'coordinator_url', None) or default_coordinator_url
print(f"Getting task distribution statistics from {coordinator_url}...")
try:
response = requests.get(f"{coordinator_url}/tasks/status", timeout=10)
if response.status_code == 200:
stats = response.json()
if output_format(args) == "json":
print(json.dumps(stats, indent=2))
else:
render_mapping("Task distribution statistics:", stats)
else:
print(f"Query failed: {response.status_code}")
print(f"Error: {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error getting distribution stats: {e}")
sys.exit(1)
def handle_ai_service_list(args, ai_operations, render_mapping):
"""Handle AI service list command."""
result = ai_operations("service_list")

View File

@@ -128,45 +128,145 @@ def handle_agent_sdk_action(args, render_mapping):
print(f"Agent SDK created: {name}")
render_mapping("Agent SDK:", sdk_data)
elif action == "update-status":
agent_id = getattr(args, "agent_id", None)
status = getattr(args, "status", None)
load_metrics = getattr(args, "load_metrics", {})
coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001")
if not agent_id or not status:
print("Error: --agent-id and --status are required")
sys.exit(1)
status_update_request = {
"status": status,
"load_metrics": load_metrics if isinstance(load_metrics, dict) else {}
}
print(f"Updating agent {agent_id} status to {status}...")
try:
import requests
response = requests.put(
f"{coordinator_url}/agents/{agent_id}/status",
json=status_update_request,
timeout=30
)
if response.status_code == 200:
result = response.json()
print(f"Agent status updated successfully")
render_mapping("Status Update:", result)
else:
print(f"Status update failed: {response.status_code}")
print(f"Error: {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error updating agent status: {e}")
sys.exit(1)
elif action == "register":
agent_id = getattr(args, "agent_id", None)
agent_type = getattr(args, "type", "worker")
capabilities = getattr(args, "capabilities", [])
services = getattr(args, "services", [])
endpoints = getattr(args, "endpoints", {})
metadata = getattr(args, "metadata", {})
coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001")
registration_data = {
# Build registration request
registration_request = {
"agent_id": agent_id,
"registered": True,
"coordinator": getattr(args, "coordinator_url", "http://localhost:9001"),
"timestamp": __import__('datetime').datetime.now().isoformat()
"agent_type": agent_type,
"capabilities": capabilities if isinstance(capabilities, list) else (capabilities.split(",") if capabilities else []),
"services": services if isinstance(services, list) else (services.split(",") if services else []),
"endpoints": endpoints if isinstance(endpoints, dict) else (json.loads(endpoints) if endpoints else {}),
"metadata": metadata if isinstance(metadata, dict) else (json.loads(metadata) if metadata else {})
}
print(f"Agent registered: {agent_id}")
render_mapping("Registration:", registration_data)
print(f"Registering agent {agent_id} with coordinator at {coordinator_url}...")
try:
import requests
response = requests.post(
f"{coordinator_url}/agents/register",
json=registration_request,
timeout=30
)
if response.status_code in (200, 201):
result = response.json()
print(f"Agent registered successfully")
render_mapping("Registration:", result)
else:
print(f"Registration failed: {response.status_code}")
print(f"Error: {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error registering agent: {e}")
sys.exit(1)
elif action == "list":
agents_data = {
"agents": [
{"agent_id": "agent_1", "name": "data-analyzer", "status": "active"},
{"agent_id": "agent_2", "name": "trading-bot", "status": "completed"},
{"agent_id": "agent_3", "name": "content-generator", "status": "failed"}
],
"total_count": 3
}
# Agent discovery via coordinator
coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001")
status = getattr(args, "status", None)
agent_type = getattr(args, "agent_type", None)
print("Local agents listed")
render_mapping("Agents:", agents_data)
query = {}
if status:
query["status"] = status
if agent_type:
query["agent_type"] = agent_type
print(f"Discovering agents from coordinator at {coordinator_url}...")
try:
import requests
response = requests.post(
f"{coordinator_url}/agents/discover",
json=query,
timeout=30
)
if response.status_code == 200:
result = response.json()
print(f"Found {result.get('count', 0)} agents")
render_mapping("Agents:", result)
else:
print(f"Discovery failed: {response.status_code}")
print(f"Error: {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error discovering agents: {e}")
sys.exit(1)
elif action == "status":
agent_id = getattr(args, "agent_id", None)
coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001")
status_data = {
"agent_id": agent_id,
"status": "active",
"uptime": "2h 30m",
"jobs_completed": 15,
"success_rate": "93%"
}
print(f"Getting agent info for {agent_id} from coordinator at {coordinator_url}...")
print(f"Agent status: {agent_id}")
render_mapping("Status:", status_data)
try:
import requests
response = requests.get(
f"{coordinator_url}/agents/{agent_id}",
timeout=30
)
if response.status_code == 200:
result = response.json()
print(f"Agent info retrieved")
render_mapping("Agent:", result)
elif response.status_code == 404:
print(f"Agent not found: {agent_id}")
sys.exit(1)
else:
print(f"Query failed: {response.status_code}")
print(f"Error: {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error getting agent info: {e}")
sys.exit(1)
elif action == "capabilities":
caps_data = {

View File

@@ -72,6 +72,11 @@ def register(subparsers: argparse._SubParsersAction, ctx: ParserContext) -> None
# agent sdk register
agent_sdk_register_parser = agent_sdk_subparsers.add_parser("register", help="Register agent with coordinator")
agent_sdk_register_parser.add_argument("--agent-id", required=True, help="Agent ID")
agent_sdk_register_parser.add_argument("--type", choices=["provider", "consumer", "general", "worker"], default="worker", help="Agent type")
agent_sdk_register_parser.add_argument("--capabilities", help="Comma-separated agent capabilities")
agent_sdk_register_parser.add_argument("--services", help="Comma-separated available services")
agent_sdk_register_parser.add_argument("--endpoints", help="JSON string of service endpoints")
agent_sdk_register_parser.add_argument("--metadata", help="JSON string of metadata")
agent_sdk_register_parser.add_argument("--coordinator-url", default="http://localhost:9001", help="Coordinator URL")
agent_sdk_register_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="register")
@@ -83,8 +88,17 @@ def register(subparsers: argparse._SubParsersAction, ctx: ParserContext) -> None
# agent sdk status
agent_sdk_status_parser = agent_sdk_subparsers.add_parser("status", help="Get agent status")
agent_sdk_status_parser.add_argument("--agent-id", required=True, help="Agent ID")
agent_sdk_status_parser.add_argument("--coordinator-url", default="http://localhost:9001", help="Coordinator URL")
agent_sdk_status_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="status")
# agent sdk update-status
agent_sdk_update_status_parser = agent_sdk_subparsers.add_parser("update-status", help="Update agent status")
agent_sdk_update_status_parser.add_argument("--agent-id", required=True, help="Agent ID")
agent_sdk_update_status_parser.add_argument("--status", required=True, help="New status (active, inactive, busy)")
agent_sdk_update_status_parser.add_argument("--load-metrics", help="JSON string of load metrics")
agent_sdk_update_status_parser.add_argument("--coordinator-url", default="http://localhost:9001", help="Coordinator URL")
agent_sdk_update_status_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="update-status")
# agent sdk capabilities
agent_sdk_caps_parser = agent_sdk_subparsers.add_parser("capabilities", help="Show system capabilities")
agent_sdk_caps_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="capabilities")

View File

@@ -78,3 +78,7 @@ def register(subparsers: argparse._SubParsersAction, ctx: ParserContext) -> None
ai_stats_parser.add_argument("--chain-id", help="Chain ID")
ai_stats_parser.add_argument("--rpc-url", default=ctx.default_rpc_url)
ai_stats_parser.set_defaults(handler=ctx.handle_ai_stats)
ai_distribution_stats_parser = ai_subparsers.add_parser("distribution-stats", help="Task distribution statistics from agent coordinator")
ai_distribution_stats_parser.add_argument("--coordinator-url", default=ctx.default_coordinator_url)
ai_distribution_stats_parser.set_defaults(handler=ctx.handle_ai_distribution_stats)

View File

@@ -538,6 +538,9 @@ def run_cli(argv, core):
def handle_ai_stats(args):
ai_handlers.handle_ai_stats(args, default_rpc_url, output_format, render_mapping)
def handle_ai_distribution_stats(args):
ai_handlers.handle_ai_distribution_stats(args, default_coordinator_url, output_format, render_mapping)
def handle_ai_service_list(args):
ai_handlers.handle_ai_service_list(args, ai_operations, render_mapping)
@@ -726,6 +729,7 @@ def run_cli(argv, core):
"handle_ai_job": handle_ai_job,
"handle_ai_cancel": handle_ai_cancel,
"handle_ai_stats": handle_ai_stats,
"handle_ai_distribution_stats": handle_ai_distribution_stats,
"handle_ai_service_list": handle_ai_service_list,
"handle_ai_service_status": handle_ai_service_status,
"handle_ai_service_test": handle_ai_service_test,