This commit is contained in:
aitbc1
2026-05-07 19:00:16 +02:00
19 changed files with 3098 additions and 97 deletions

View File

@@ -0,0 +1,174 @@
# AITBC Training - Agent Coordinator Architecture
## Important: Real Coordinator Location
The actual AITBC Agent Coordinator service is located at:
- **Path:** `/opt/aitbc/apps/agent-coordinator/src/app/`
- **Port:** 9001
- **Service:** `aitbc-agent-coordinator.service`
## DO NOT Use
- **Wrong location:** `/opt/aitbc/apps/agent-services/agent-coordinator/src/coordinator.py`
- This is a different/older implementation and is NOT the active service
## Key Components
### Core Files
- `agent_discovery.py` - Redis-backed agent registry with persistence
- `load_balancer.py` - Load balancer with multiple strategies (least_connections, round_robin, etc.)
- `routers/agents.py` - Agent management REST API endpoints
- `routers/tasks.py` - Task submission and distribution API endpoints
- `lifespan.py` - Service initialization and component startup
- `state.py` - Global state management for coordinator components
### Service Initialization
The service initializes in `lifespan.py`:
1. Creates `AgentRegistry()` with Redis backing
2. Starts registry Redis connection
3. Creates `LoadBalancer(registry)` with least_connections strategy
4. Creates `TaskDistributor(balancer)` with priority queues
5. Starts background task distribution loop
## Agent Registration
### API Endpoint
```
POST /agents/register
```
### Example
```bash
curl -X POST http://localhost:9001/agents/register \
-H "Content-Type: application/json" \
-d '{
"agent_id": "hermes-agent",
"agent_type": "worker",
"capabilities": ["data-processing", "analysis", "general"],
"services": ["task-execution", "analysis"],
"endpoints": {"http": "http://localhost:9002"},
"metadata": {"version": "1.0.0"}
}'
```
### Response
```json
{
"status": "success",
"message": "Agent hermes-agent registered successfully",
"agent_id": "hermes-agent",
"registered_at": "2026-05-07T16:26:55.464178+00:00"
}
```
## Task Distribution
### API Endpoint
```
POST /tasks/submit
```
### Example
```bash
curl -X POST http://localhost:9001/tasks/submit \
-H "Content-Type: application/json" \
-d '{
"task_data": {
"model": "llama2",
"prompt": "test prompt"
},
"priority": "normal",
"requirements": {}
}'
```
### Distribution Flow
1. Task submitted to `TaskDistributor`
2. Distributor finds eligible agents via `AgentRegistry.discover_agents()`
3. Load balancer selects agent using configured strategy (default: least_connections)
4. Task assigned to selected agent
5. Agent metrics updated in Redis
## Task Status
### API Endpoint
```
GET /tasks/status
```
### Example
```bash
curl http://localhost:9001/tasks/status
```
### Response
```json
{
"status": "success",
"stats": {
"tasks_distributed": 1,
"tasks_completed": 1,
"tasks_failed": 0,
"load_balancer_stats": {
"strategy": "least_connections",
"active_agents": 1,
"total_assignments": 1,
"avg_agent_load": 1
}
}
}
```
## Agent Discovery
### API Endpoint
```
POST /agents/discover
```
### Example
```bash
curl -X POST http://localhost:9001/agents/discover \
-H "Content-Type: application/json" \
-d '{
"status": "active",
"agent_type": "worker"
}'
```
## Redis Persistence
The agent registry uses Redis for persistence:
- Agent data stored as hashes: `agent:{agent_id}`
- Active agents indexed in set: `agents:active`
- Load metrics tracked per agent
- Health scores calculated from heartbeats
## Service Status
### Health Check
```bash
curl http://localhost:9001/health
```
### Service Management
```bash
systemctl status aitbc-agent-coordinator.service
systemctl restart aitbc-agent-coordinator.service
journalctl -u aitbc-agent-coordinator.service -f
```
## Cross-Node Distribution
For multi-node setups, register agents on each node:
```bash
# Register agent on aitbc1
curl -X POST http://aitbc1:9001/agents/register \
-d '{"agent_id":"aitbc1-agent", ...}'
# Submit task on localhost
curl -X POST http://localhost:9001/tasks/submit \
-d '{"task_data":{...}}'
# Task will be distributed to any active agent across nodes
```

View File

@@ -140,3 +140,36 @@ async def update_agent_status(agent_id: str, request: AgentStatusUpdate):
except Exception as e:
logger.error(f"Error updating agent status: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Agent heartbeat
@router.post("/agents/{agent_id}/heartbeat")
async def agent_heartbeat(agent_id: str):
"""Receive heartbeat from agent"""
try:
if not state.agent_registry:
raise HTTPException(status_code=503, detail="Agent registry not available")
from ..routing.agent_discovery import AgentStatus
# Update heartbeat timestamp and mark as active
success = await state.agent_registry.update_agent_status(
agent_id,
AgentStatus.ACTIVE,
{}
)
if success:
return {
"status": "success",
"message": f"Heartbeat received from {agent_id}",
"agent_id": agent_id,
"heartbeat_at": datetime.now(timezone.utc).isoformat()
}
else:
raise HTTPException(status_code=404, detail="Agent not found")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing heartbeat: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -78,3 +78,78 @@ async def get_task_status():
except Exception as e:
logger.error(f"Error getting task status: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Task queue management
@router.get("/tasks/queues")
async def get_queue_sizes():
"""Get task queue sizes"""
try:
if not state.task_distributor:
raise HTTPException(status_code=503, detail="Task distributor not available")
queue_sizes = state.task_distributor.get_queue_sizes()
return {
"status": "success",
"queue_sizes": queue_sizes,
"timestamp": datetime.now(timezone.utc).isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting queue sizes: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/tasks/queues/{priority}/clear")
async def clear_queue(priority: str):
"""Clear a priority queue"""
try:
if not state.task_distributor:
raise HTTPException(status_code=503, detail="Task distributor not available")
from ..routing.load_balancer import TaskPriority
try:
priority_enum = TaskPriority(priority)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid priority: {priority}")
cleared_count = await state.task_distributor.clear_queue(priority_enum)
return {
"status": "success",
"message": f"Cleared {cleared_count} tasks from {priority} queue",
"priority": priority,
"cleared_count": cleared_count,
"timestamp": datetime.now(timezone.utc).isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error clearing queue: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/tasks/queues/stats")
async def get_queue_stats():
"""Get detailed queue statistics"""
try:
if not state.task_distributor:
raise HTTPException(status_code=503, detail="Task distributor not available")
queue_sizes = state.task_distributor.get_queue_sizes()
distribution_stats = state.task_distributor.get_distribution_stats()
return {
"status": "success",
"queue_sizes": queue_sizes,
"distribution_stats": distribution_stats,
"timestamp": datetime.now(timezone.utc).isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting queue stats: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -684,6 +684,29 @@ class TaskDistributor:
for priority, queue in self.priority_queues.items()
}
}
def get_queue_sizes(self) -> Dict[str, int]:
"""Get sizes of all priority queues"""
return {
priority.value: queue.qsize()
for priority, queue in self.priority_queues.items()
}
async def clear_queue(self, priority: TaskPriority) -> int:
"""Clear all tasks from a priority queue"""
queue = self.priority_queues[priority]
cleared_count = 0
# Drain the queue
while not queue.empty():
try:
queue.get_nowait()
cleared_count += 1
except asyncio.QueueEmpty:
break
logger.info(f"Cleared {cleared_count} tasks from {priority.value} queue")
return cleared_count
# Example usage
async def example_usage():

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python3
"""
Main entry point for AITBC Agent Coordinator Service
"""
import sys
import os
from pathlib import Path
# Add parent directory to path to import coordinator
sys.path.insert(0, str(Path(__file__).parent.parent))
from coordinator import app
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=9001)

View File

@@ -9,11 +9,17 @@ from pydantic import BaseModel
from typing import List, Optional, Dict, Any
import json
import uuid
import os
from datetime import datetime, UTC
import sqlite3
from contextlib import contextmanager
from contextlib import asynccontextmanager
# Use absolute path for database in /var/lib/aitbc for persistence
DB_DIR = "/var/lib/aitbc"
os.makedirs(DB_DIR, exist_ok=True)
DB_PATH = os.path.join(DB_DIR, "agent_coordinator.db")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
@@ -26,7 +32,7 @@ app = FastAPI(title="AITBC Agent Coordinator API", version="1.0.0", lifespan=lif
# Database setup
def get_db():
conn = sqlite3.connect('agent_coordinator.db')
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
@@ -41,6 +47,7 @@ def get_db_connection():
# Initialize database
def init_db():
with get_db_connection() as conn:
# Tasks table (existing)
conn.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
@@ -54,6 +61,41 @@ def init_db():
result TEXT
)
''')
# Agents table (new)
conn.execute('''
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
status TEXT NOT NULL,
capabilities TEXT NOT NULL,
services TEXT NOT NULL,
endpoints TEXT NOT NULL,
metadata TEXT,
last_heartbeat TIMESTAMP,
registration_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
load_metrics TEXT,
health_score REAL DEFAULT 1.0
)
''')
# Agent assignments table (new)
conn.execute('''
CREATE TABLE IF NOT EXISTS agent_assignments (
id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
assigned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
status TEXT DEFAULT 'pending',
response_time REAL,
success BOOLEAN DEFAULT 0,
error_message TEXT
)
''')
# Commit the transaction
conn.commit()
# Models
class Task(BaseModel):
@@ -71,11 +113,23 @@ class TaskCreation(BaseModel):
required_capabilities: List[str]
priority: str = "normal"
class AgentRegistrationRequest(BaseModel):
agent_id: str
agent_type: str
capabilities: List[str]
services: List[str]
endpoints: Dict[str, str]
metadata: Optional[Dict[str, Any]] = {}
class AgentStatusUpdate(BaseModel):
status: str
load_metrics: Optional[Dict[str, float]] = {}
# API Endpoints
@app.post("/api/tasks", response_model=Task)
async def create_task(task: TaskCreation):
"""Create a new task"""
"""Create a new task and attempt to assign it to an agent"""
task_id = str(uuid.uuid4())
with get_db_connection() as conn:
@@ -87,13 +141,22 @@ async def create_task(task: TaskCreation):
json.dumps(task.required_capabilities), task.priority, "pending"
))
# Attempt to assign task to an agent
assigned_agent_id = assign_task_to_agent(task_id, task.required_capabilities)
if assigned_agent_id:
print(f"Task {task_id} assigned to agent {assigned_agent_id}")
else:
print(f"Task {task_id} - no eligible agents found")
return Task(
id=task_id,
task_type=task.task_type,
payload=task.payload,
required_capabilities=task.required_capabilities,
priority=task.priority,
status="pending"
status="assigned" if assigned_agent_id else "pending",
assigned_agent_id=assigned_agent_id
)
@app.get("/api/tasks", response_model=List[Task])
@@ -125,7 +188,318 @@ async def list_tasks(status: Optional[str] = None):
@app.get("/api/health")
async def health_check():
"""Health check endpoint"""
return {"status": "ok", "timestamp": datetime.now(datetime.UTC)}
return {"status": "ok", "timestamp": datetime.now(timezone.utc)}
@app.get("/tasks/status")
async def get_task_status():
"""Get task distribution statistics including active agents"""
print(f"DEBUG: Querying tasks/status, DB_PATH={DB_PATH}")
with get_db_connection() as conn:
# Get task statistics
tasks = conn.execute("SELECT * FROM tasks").fetchall()
tasks_distributed = len([t for t in tasks if t["assigned_agent_id"]])
tasks_completed = len([t for t in tasks if t["status"] == "completed"])
tasks_failed = len([t for t in tasks if t["status"] == "failed"])
# Get active agents count
agents = conn.execute("SELECT * FROM agents WHERE status = ?", ("active",)).fetchall()
print(f"DEBUG: Found {len(agents)} active agents")
active_agents = len(agents)
# Calculate load balancer stats
agent_weights = len(agents)
total_assignments = len(tasks_distributed)
successful_assignments = tasks_completed
failed_assignments = tasks_failed
# Calculate average agent load
total_load = 0
for agent in agents:
load_metrics = json.loads(agent["load_metrics"]) if agent["load_metrics"] else {}
total_load += load_metrics.get("active_connections", 0) + load_metrics.get("pending_tasks", 0)
avg_agent_load = total_load / active_agents if active_agents > 0 else 0
# Get queue sizes (simulated from pending tasks)
queue_sizes = {
"urgent": 0,
"critical": 0,
"high": 0,
"normal": len([t for t in tasks if t["status"] == "pending" and t["priority"] == "normal"]),
"low": 0
}
return {
"status": "success",
"stats": {
"tasks_distributed": tasks_distributed,
"tasks_completed": tasks_completed,
"tasks_failed": tasks_failed,
"avg_distribution_time": 0.0,
"load_balancer_stats": {
"strategy": "least_connections",
"total_assignments": total_assignments,
"successful_assignments": successful_assignments,
"failed_assignments": failed_assignments,
"success_rate": successful_assignments / max(1, total_assignments),
"active_agents": active_agents,
"agent_weights": agent_weights,
"avg_agent_load": avg_agent_load
},
"queue_sizes": queue_sizes
},
"timestamp": datetime.now(datetime.UTC).isoformat()
}
# Agent Management Endpoints
@app.post("/agents/register")
async def register_agent(request: AgentRegistrationRequest):
"""Register a new agent"""
try:
print(f"DEBUG: Attempting to register agent {request.agent_id}")
print(f"DEBUG: Database path: {DB_PATH}")
conn = get_db()
try:
print(f"DEBUG: Database connection established")
conn.execute('''
INSERT INTO agents (id, agent_type, status, capabilities, services, endpoints, metadata, last_heartbeat, health_score)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
request.agent_id,
request.agent_type,
"active",
json.dumps(request.capabilities),
json.dumps(request.services),
json.dumps(request.endpoints),
json.dumps(request.metadata),
datetime.now(datetime.UTC),
1.0
))
conn.commit()
print(f"DEBUG: Agent {request.agent_id} inserted and committed")
finally:
conn.close()
return {
"status": "success",
"message": f"Agent {request.agent_id} registered successfully",
"agent_id": request.agent_id,
"registered_at": datetime.now(datetime.UTC).isoformat()
}
except Exception as e:
print(f"ERROR: Failed to register agent: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to register agent: {str(e)}")
@app.post("/agents/discover")
async def discover_agents(query: Dict[str, Any]):
"""Discover agents based on criteria"""
try:
with get_db_connection() as conn:
# Build query
sql = "SELECT * FROM agents WHERE status = ?"
params = ["active"]
if "agent_type" in query:
sql += " AND agent_type = ?"
params.append(query["agent_type"])
agents = conn.execute(sql, params).fetchall()
# Filter by capabilities if specified
if "capabilities" in query:
required_capabilities = set(query["capabilities"])
filtered_agents = []
for agent in agents:
agent_capabilities = set(json.loads(agent["capabilities"]))
if required_capabilities.issubset(agent_capabilities):
filtered_agents.append(agent)
agents = filtered_agents
# Filter by services if specified
if "services" in query:
required_services = set(query["services"])
filtered_agents = []
for agent in agents:
agent_services = set(json.loads(agent["services"]))
if required_services.issubset(agent_services):
filtered_agents.append(agent)
agents = filtered_agents
# Sort by health score (highest first)
agents = sorted(agents, key=lambda a: a["health_score"], reverse=True)
return {
"status": "success",
"query": query,
"agents": [
{
"agent_id": agent["id"],
"agent_type": agent["agent_type"],
"status": agent["status"],
"capabilities": json.loads(agent["capabilities"]),
"services": json.loads(agent["services"]),
"endpoints": json.loads(agent["endpoints"]),
"health_score": agent["health_score"],
"last_heartbeat": agent["last_heartbeat"]
}
for agent in agents
],
"count": len(agents),
"timestamp": datetime.now(datetime.UTC).isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error discovering agents: {str(e)}")
@app.get("/agents/{agent_id}")
async def get_agent(agent_id: str):
"""Get agent information by ID"""
try:
with get_db_connection() as conn:
agent = conn.execute("SELECT * FROM agents WHERE id = ?", (agent_id,)).fetchone()
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
return {
"status": "success",
"agent": {
"agent_id": agent["id"],
"agent_type": agent["agent_type"],
"status": agent["status"],
"capabilities": json.loads(agent["capabilities"]),
"services": json.loads(agent["services"]),
"endpoints": json.loads(agent["endpoints"]),
"metadata": json.loads(agent["metadata"]) if agent["metadata"] else {},
"last_heartbeat": agent["last_heartbeat"],
"registration_time": agent["registration_time"],
"health_score": agent["health_score"]
},
"timestamp": datetime.now(datetime.UTC).isoformat()
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error getting agent: {str(e)}")
@app.put("/agents/{agent_id}/status")
async def update_agent_status(agent_id: str, request: AgentStatusUpdate):
"""Update agent status"""
try:
with get_db_connection() as conn:
# Update status and heartbeat
conn.execute('''
UPDATE agents SET status = ?, last_heartbeat = ?
WHERE id = ?
''', (request.status, datetime.now(datetime.UTC), agent_id))
# Update load metrics if provided
if request.load_metrics:
conn.execute('''
UPDATE agents SET load_metrics = ?
WHERE id = ?
''', (json.dumps(request.load_metrics), agent_id))
return {
"status": "success",
"message": f"Agent {agent_id} status updated",
"agent_id": agent_id,
"new_status": request.status,
"updated_at": datetime.now(datetime.UTC).isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error updating agent status: {str(e)}")
@app.post("/agents/{agent_id}/heartbeat")
async def agent_heartbeat(agent_id: str):
"""Agent heartbeat endpoint"""
try:
with get_db_connection() as conn:
conn.execute('''
UPDATE agents SET last_heartbeat = ?
WHERE id = ?
''', (datetime.now(datetime.UTC), agent_id))
return {
"status": "success",
"message": f"Heartbeat received for agent {agent_id}",
"timestamp": datetime.now(datetime.UTC).isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error updating heartbeat: {str(e)}")
# Agent Matching and Task Distribution
def find_eligible_agents(required_capabilities: List[str], agent_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""Find eligible agents for task"""
with get_db_connection() as conn:
# Build query
sql = "SELECT * FROM agents WHERE status = ?"
params = ["active"]
if agent_type:
sql += " AND agent_type = ?"
params.append(agent_type)
agents = conn.execute(sql, params).fetchall()
# Filter by capabilities
if required_capabilities:
required_set = set(required_capabilities)
eligible_agents = []
for agent in agents:
agent_capabilities = set(json.loads(agent["capabilities"]))
if required_set.issubset(agent_capabilities):
eligible_agents.append(agent)
agents = eligible_agents
# Sort by health score (highest first)
agents = sorted(agents, key=lambda a: a["health_score"], reverse=True)
return [
{
"agent_id": agent["id"],
"agent_type": agent["agent_type"],
"health_score": agent["health_score"],
"load_metrics": json.loads(agent["load_metrics"]) if agent["load_metrics"] else {}
}
for agent in agents
]
def assign_task_to_agent(task_id: str, required_capabilities: List[str], agent_type: Optional[str] = None) -> Optional[str]:
"""Assign task to best available agent using least_connections strategy"""
# Find eligible agents
eligible_agents = find_eligible_agents(required_capabilities, agent_type)
if not eligible_agents:
return None
# Select agent with least connections (load balancing)
selected_agent = min(eligible_agents, key=lambda a: a["load_metrics"].get("active_connections", 0))
# Record assignment
assignment_id = str(uuid.uuid4())
with get_db_connection() as conn:
conn.execute('''
INSERT INTO agent_assignments (id, task_id, agent_id, status)
VALUES (?, ?, ?, ?)
''', (assignment_id, task_id, selected_agent["agent_id"], "assigned"))
# Update task with assigned agent
conn.execute('''
UPDATE tasks SET assigned_agent_id = ?, status = ?
WHERE id = ?
''', (selected_agent["agent_id"], "assigned", task_id))
# Update agent load metrics
load_metrics = selected_agent["load_metrics"]
load_metrics["active_connections"] = load_metrics.get("active_connections", 0) + 1
load_metrics["pending_tasks"] = load_metrics.get("pending_tasks", 0) + 1
conn.execute('''
UPDATE agents SET load_metrics = ?
WHERE id = ?
''', (json.dumps(load_metrics), selected_agent["agent_id"]))
return selected_agent["agent_id"]
if __name__ == "__main__":
import uvicorn

View File

@@ -203,6 +203,28 @@ def handle_ai_stats(args, default_rpc_url, output_format, render_mapping):
sys.exit(1)
def handle_ai_distribution_stats(args, default_coordinator_url, output_format, render_mapping):
"""Handle task distribution statistics query from agent coordinator."""
coordinator_url = getattr(args, 'coordinator_url', None) or default_coordinator_url
print(f"Getting task distribution statistics from {coordinator_url}...")
try:
response = requests.get(f"{coordinator_url}/tasks/status", timeout=10)
if response.status_code == 200:
stats = response.json()
if output_format(args) == "json":
print(json.dumps(stats, indent=2))
else:
render_mapping("Task distribution statistics:", stats)
else:
print(f"Query failed: {response.status_code}")
print(f"Error: {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error getting distribution stats: {e}")
sys.exit(1)
def handle_ai_service_list(args, ai_operations, render_mapping):
"""Handle AI service list command."""
result = ai_operations("service_list")

View File

@@ -128,45 +128,145 @@ def handle_agent_sdk_action(args, render_mapping):
print(f"Agent SDK created: {name}")
render_mapping("Agent SDK:", sdk_data)
elif action == "update-status":
agent_id = getattr(args, "agent_id", None)
status = getattr(args, "status", None)
load_metrics = getattr(args, "load_metrics", {})
coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001")
if not agent_id or not status:
print("Error: --agent-id and --status are required")
sys.exit(1)
status_update_request = {
"status": status,
"load_metrics": load_metrics if isinstance(load_metrics, dict) else {}
}
print(f"Updating agent {agent_id} status to {status}...")
try:
import requests
response = requests.put(
f"{coordinator_url}/agents/{agent_id}/status",
json=status_update_request,
timeout=30
)
if response.status_code == 200:
result = response.json()
print(f"Agent status updated successfully")
render_mapping("Status Update:", result)
else:
print(f"Status update failed: {response.status_code}")
print(f"Error: {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error updating agent status: {e}")
sys.exit(1)
elif action == "register":
agent_id = getattr(args, "agent_id", None)
agent_type = getattr(args, "type", "worker")
capabilities = getattr(args, "capabilities", [])
services = getattr(args, "services", [])
endpoints = getattr(args, "endpoints", {})
metadata = getattr(args, "metadata", {})
coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001")
registration_data = {
# Build registration request
registration_request = {
"agent_id": agent_id,
"registered": True,
"coordinator": getattr(args, "coordinator_url", "http://localhost:9001"),
"timestamp": __import__('datetime').datetime.now().isoformat()
"agent_type": agent_type,
"capabilities": capabilities if isinstance(capabilities, list) else (capabilities.split(",") if capabilities else []),
"services": services if isinstance(services, list) else (services.split(",") if services else []),
"endpoints": endpoints if isinstance(endpoints, dict) else (json.loads(endpoints) if endpoints else {}),
"metadata": metadata if isinstance(metadata, dict) else (json.loads(metadata) if metadata else {})
}
print(f"Agent registered: {agent_id}")
render_mapping("Registration:", registration_data)
print(f"Registering agent {agent_id} with coordinator at {coordinator_url}...")
try:
import requests
response = requests.post(
f"{coordinator_url}/agents/register",
json=registration_request,
timeout=30
)
if response.status_code in (200, 201):
result = response.json()
print(f"Agent registered successfully")
render_mapping("Registration:", result)
else:
print(f"Registration failed: {response.status_code}")
print(f"Error: {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error registering agent: {e}")
sys.exit(1)
elif action == "list":
agents_data = {
"agents": [
{"agent_id": "agent_1", "name": "data-analyzer", "status": "active"},
{"agent_id": "agent_2", "name": "trading-bot", "status": "completed"},
{"agent_id": "agent_3", "name": "content-generator", "status": "failed"}
],
"total_count": 3
}
# Agent discovery via coordinator
coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001")
status = getattr(args, "status", None)
agent_type = getattr(args, "agent_type", None)
print("Local agents listed")
render_mapping("Agents:", agents_data)
query = {}
if status:
query["status"] = status
if agent_type:
query["agent_type"] = agent_type
print(f"Discovering agents from coordinator at {coordinator_url}...")
try:
import requests
response = requests.post(
f"{coordinator_url}/agents/discover",
json=query,
timeout=30
)
if response.status_code == 200:
result = response.json()
print(f"Found {result.get('count', 0)} agents")
render_mapping("Agents:", result)
else:
print(f"Discovery failed: {response.status_code}")
print(f"Error: {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error discovering agents: {e}")
sys.exit(1)
elif action == "status":
agent_id = getattr(args, "agent_id", None)
coordinator_url = getattr(args, "coordinator_url", "http://localhost:9001")
status_data = {
"agent_id": agent_id,
"status": "active",
"uptime": "2h 30m",
"jobs_completed": 15,
"success_rate": "93%"
}
print(f"Getting agent info for {agent_id} from coordinator at {coordinator_url}...")
print(f"Agent status: {agent_id}")
render_mapping("Status:", status_data)
try:
import requests
response = requests.get(
f"{coordinator_url}/agents/{agent_id}",
timeout=30
)
if response.status_code == 200:
result = response.json()
print(f"Agent info retrieved")
render_mapping("Agent:", result)
elif response.status_code == 404:
print(f"Agent not found: {agent_id}")
sys.exit(1)
else:
print(f"Query failed: {response.status_code}")
print(f"Error: {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error getting agent info: {e}")
sys.exit(1)
elif action == "capabilities":
caps_data = {

View File

@@ -72,6 +72,11 @@ def register(subparsers: argparse._SubParsersAction, ctx: ParserContext) -> None
# agent sdk register
agent_sdk_register_parser = agent_sdk_subparsers.add_parser("register", help="Register agent with coordinator")
agent_sdk_register_parser.add_argument("--agent-id", required=True, help="Agent ID")
agent_sdk_register_parser.add_argument("--type", choices=["provider", "consumer", "general", "worker"], default="worker", help="Agent type")
agent_sdk_register_parser.add_argument("--capabilities", help="Comma-separated agent capabilities")
agent_sdk_register_parser.add_argument("--services", help="Comma-separated available services")
agent_sdk_register_parser.add_argument("--endpoints", help="JSON string of service endpoints")
agent_sdk_register_parser.add_argument("--metadata", help="JSON string of metadata")
agent_sdk_register_parser.add_argument("--coordinator-url", default="http://localhost:9001", help="Coordinator URL")
agent_sdk_register_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="register")
@@ -83,8 +88,17 @@ def register(subparsers: argparse._SubParsersAction, ctx: ParserContext) -> None
# agent sdk status
agent_sdk_status_parser = agent_sdk_subparsers.add_parser("status", help="Get agent status")
agent_sdk_status_parser.add_argument("--agent-id", required=True, help="Agent ID")
agent_sdk_status_parser.add_argument("--coordinator-url", default="http://localhost:9001", help="Coordinator URL")
agent_sdk_status_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="status")
# agent sdk update-status
agent_sdk_update_status_parser = agent_sdk_subparsers.add_parser("update-status", help="Update agent status")
agent_sdk_update_status_parser.add_argument("--agent-id", required=True, help="Agent ID")
agent_sdk_update_status_parser.add_argument("--status", required=True, help="New status (active, inactive, busy)")
agent_sdk_update_status_parser.add_argument("--load-metrics", help="JSON string of load metrics")
agent_sdk_update_status_parser.add_argument("--coordinator-url", default="http://localhost:9001", help="Coordinator URL")
agent_sdk_update_status_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="update-status")
# agent sdk capabilities
agent_sdk_caps_parser = agent_sdk_subparsers.add_parser("capabilities", help="Show system capabilities")
agent_sdk_caps_parser.set_defaults(handler=ctx.handle_agent_sdk_action, agent_sdk_action="capabilities")

View File

@@ -78,3 +78,7 @@ def register(subparsers: argparse._SubParsersAction, ctx: ParserContext) -> None
ai_stats_parser.add_argument("--chain-id", help="Chain ID")
ai_stats_parser.add_argument("--rpc-url", default=ctx.default_rpc_url)
ai_stats_parser.set_defaults(handler=ctx.handle_ai_stats)
ai_distribution_stats_parser = ai_subparsers.add_parser("distribution-stats", help="Task distribution statistics from agent coordinator")
ai_distribution_stats_parser.add_argument("--coordinator-url", default=ctx.default_coordinator_url)
ai_distribution_stats_parser.set_defaults(handler=ctx.handle_ai_distribution_stats)

View File

@@ -538,6 +538,9 @@ def run_cli(argv, core):
def handle_ai_stats(args):
ai_handlers.handle_ai_stats(args, default_rpc_url, output_format, render_mapping)
def handle_ai_distribution_stats(args):
ai_handlers.handle_ai_distribution_stats(args, default_coordinator_url, output_format, render_mapping)
def handle_ai_service_list(args):
ai_handlers.handle_ai_service_list(args, ai_operations, render_mapping)
@@ -726,6 +729,7 @@ def run_cli(argv, core):
"handle_ai_job": handle_ai_job,
"handle_ai_cancel": handle_ai_cancel,
"handle_ai_stats": handle_ai_stats,
"handle_ai_distribution_stats": handle_ai_distribution_stats,
"handle_ai_service_list": handle_ai_service_list,
"handle_ai_service_status": handle_ai_service_status,
"handle_ai_service_test": handle_ai_service_test,

View File

@@ -0,0 +1,448 @@
# AITBC Agent Coordinator - API Reference
## Base URL
```
http://localhost:9001
```
## Authentication
Currently, the API does not require authentication. Future versions may support API key authentication and JWT tokens.
## Agent Management API
### Register Agent
Register a new agent with the coordinator.
**Endpoint:** `POST /agents/register`
**Request Body:**
```json
{
"agent_id": "string (required)",
"agent_type": "string (required)",
"capabilities": ["string"],
"services": ["string"],
"endpoints": {"string": "string"},
"metadata": {"string": "any"}
}
```
**Parameters:**
- `agent_id` (required): Unique identifier for the agent
- `agent_type` (required): Type of agent (worker, provider, consumer, general)
- `capabilities` (optional): Array of agent capabilities
- `services` (optional): Array of available services
- `endpoints` (optional): Object mapping service names to URLs
- `metadata` (optional): Additional metadata as key-value pairs
**Response (200 OK):**
```json
{
"status": "success",
"message": "Agent {agent_id} registered successfully",
"agent_id": "string",
"registered_at": "ISO 8601 timestamp"
}
```
**Response (422 Unprocessable Entity):**
```json
{
"detail": "Validation error message"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Failed to register agent: {error message}"
}
```
**Example:**
```bash
curl -X POST http://localhost:9001/agents/register \
-H "Content-Type: application/json" \
-d '{
"agent_id": "hermes-agent",
"agent_type": "worker",
"capabilities": ["data-processing", "analysis"],
"services": ["task-execution"],
"endpoints": {"http": "http://localhost:9002"},
"metadata": {"version": "1.0.0"}
}'
```
### Discover Agents
Discover agents based on filtering criteria.
**Endpoint:** `POST /agents/discover`
**Request Body:**
```json
{
"status": "string (optional)",
"agent_type": "string (optional)",
"capabilities": ["string (optional)"],
"services": ["string (optional)"]
}
```
**Parameters:**
- `status` (optional): Filter by agent status (active, inactive, busy, stale)
- `agent_type` (optional): Filter by agent type
- `capabilities` (optional): Filter by required capabilities
- `services` (optional): Filter by available services
**Response (200 OK):**
```json
{
"status": "success",
"query": {},
"agents": [
{
"agent_id": "string",
"agent_type": "string",
"status": "string",
"capabilities": ["string"],
"services": ["string"],
"endpoints": {"string": "string"},
"metadata": {"string": "any"},
"last_heartbeat": "ISO 8601 timestamp",
"registration_time": "ISO 8601 timestamp",
"load_metrics": {"string": "number"},
"health_score": 0.0-1.0,
"version": "string",
"tags": ["string"]
}
],
"count": 0,
"timestamp": "ISO 8601 timestamp"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Error discovering agents: {error message}"
}
```
**Example:**
```bash
curl -X POST http://localhost:9001/agents/discover \
-H "Content-Type: application/json" \
-d '{
"status": "active",
"agent_type": "worker"
}'
```
### Get Agent Information
Retrieve detailed information about a specific agent.
**Endpoint:** `GET /agents/{agent_id}`
**URL Parameters:**
- `agent_id` (required): The unique identifier of the agent
**Response (200 OK):**
```json
{
"status": "success",
"agent": {
"agent_id": "string",
"agent_type": "string",
"status": "string",
"capabilities": ["string"],
"services": ["string"],
"endpoints": {"string": "string"},
"metadata": {"string": "any"},
"last_heartbeat": "ISO 8601 timestamp",
"registration_time": "ISO 8601 timestamp",
"load_metrics": {"string": "number"},
"health_score": 0.0-1.0,
"version": "string",
"tags": ["string"]
},
"timestamp": "ISO 8601 timestamp"
}
```
**Response (404 Not Found):**
```json
{
"detail": "Agent not found"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Error getting agent: {error message}"
}
```
**Example:**
```bash
curl http://localhost:9001/agents/hermes-agent
```
### Update Agent Status
Update the status and load metrics of an agent.
**Endpoint:** `PUT /agents/{agent_id}/status`
**URL Parameters:**
- `agent_id` (required): The unique identifier of the agent
**Request Body:**
```json
{
"status": "string (required)",
"load_metrics": {
"active_connections": 0,
"pending_tasks": 0,
"cpu_usage": 0.0,
"memory_usage": 0.0
}
}
```
**Parameters:**
- `status` (required): New agent status (active, inactive, busy, stale)
- `load_metrics` (optional): Object containing load metrics
**Response (200 OK):**
```json
{
"status": "success",
"message": "Agent {agent_id} status updated",
"agent_id": "string",
"new_status": "string",
"updated_at": "ISO 8601 timestamp"
}
```
**Response (422 Unprocessable Entity):**
```json
{
"detail": "Validation error message"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Error updating agent status: {error message}"
}
```
**Example:**
```bash
curl -X PUT http://localhost:9001/agents/hermes-agent/status \
-H "Content-Type: application/json" \
-d '{
"status": "busy",
"load_metrics": {
"active_connections": 5,
"pending_tasks": 2
}
}'
```
## Task Management API
### Submit Task
Submit a task for distribution to agents.
**Endpoint:** `POST /tasks/submit`
**Request Body:**
```json
{
"task_data": {
"task_type": "string",
"model": "string",
"prompt": "string",
"parameters": {"string": "any"}
},
"priority": "string (required)",
"requirements": {
"capabilities": ["string"],
"agent_type": "string"
}
}
```
**Parameters:**
- `task_data` (required): Object containing task information
- `priority` (required): Task priority (urgent, critical, high, normal, low)
- `requirements` (optional): Object specifying agent requirements
**Response (200 OK):**
```json
{
"status": "success",
"message": "Task submitted successfully",
"task_id": "UUID string",
"priority": "string",
"submitted_at": "ISO 8601 timestamp"
}
```
**Response (400 Bad Request):**
```json
{
"detail": "Invalid priority: {priority}"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Task distributor not available"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Error submitting task: {error message}"
}
```
**Example:**
```bash
curl -X POST http://localhost:9001/tasks/submit \
-H "Content-Type: application/json" \
-d '{
"task_data": {
"model": "llama2",
"prompt": "test prompt"
},
"priority": "normal",
"requirements": {}
}'
```
### Get Task Status
Get task distribution statistics and load balancer metrics.
**Endpoint:** `GET /tasks/status`
**Response (200 OK):**
```json
{
"status": "success",
"stats": {
"tasks_distributed": 0,
"tasks_completed": 0,
"tasks_failed": 0,
"avg_distribution_time": 0.0,
"load_balancer_stats": {
"strategy": "least_connections",
"total_assignments": 0,
"successful_assignments": 0,
"failed_assignments": 0,
"success_rate": 0.0,
"active_agents": 0,
"agent_weights": 0,
"avg_agent_load": 0
},
"queue_sizes": {
"urgent": 0,
"critical": 0,
"high": 0,
"normal": 0,
"low": 0
}
},
"timestamp": "ISO 8601 timestamp"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Task distributor not available"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Error getting task status: {error message}"
}
```
**Example:**
```bash
curl http://localhost:9001/tasks/status
```
## Health Check
### Service Health
Check the health of the agent coordinator service.
**Endpoint:** `GET /health`
**Response (200 OK):**
```json
{
"status": "healthy",
"version": "string",
"timestamp": "ISO 8601 timestamp"
}
```
**Example:**
```bash
curl http://localhost:9001/health
```
## Error Codes
| Status Code | Description |
|-------------|-------------|
| 200 | Success |
| 400 | Bad Request - Invalid parameters |
| 404 | Not Found - Resource not found |
| 422 | Unprocessable Entity - Validation error |
| 500 | Internal Server Error |
| 503 | Service Unavailable - Component not ready |
## Rate Limiting
Currently, rate limiting is not implemented. Future versions may include rate limiting to prevent abuse.
## WebSocket Support
WebSocket support is planned for future releases to provide real-time updates on:
- Agent status changes
- Task distribution events
- Load balancer metrics updates
## OpenAPI Specification
The API follows OpenAPI 3.0 specification. An OpenAPI JSON schema can be generated from the FastAPI application by visiting:
```
http://localhost:9001/openapi.json
```
Interactive API documentation is available at:
```
http://localhost:9001/docs
```

View File

@@ -0,0 +1,288 @@
# AITBC Agent Coordinator - Architecture Documentation
## System Overview
The AITBC Agent Coordinator is a distributed task distribution system that manages AI agents, coordinates task assignment, and provides load balancing across multiple agent instances. The system uses Redis for persistence and FastAPI for REST API endpoints.
## Service Location
**Actual Service:** `/opt/aitbc/apps/agent-coordinator/src/app/`
**Port:** 9001
**Systemd Service:** `aitbc-agent-coordinator.service`
**DO NOT USE:** `/opt/aitbc/apps/agent-services/agent-coordinator/src/coordinator.py` (this is an older/incorrect implementation)
## Core Components
### 1. Agent Registry (`agent_discovery.py`)
The Agent Registry is the central component for managing agent lifecycle and discovery.
**Key Features:**
- Redis-backed persistence for agent data
- Agent registration and deregistration
- Agent discovery with filtering (by type, status, capabilities)
- Health score calculation based on heartbeat frequency
- Load metrics tracking (active connections, pending tasks)
**Data Model:**
- Agent data stored as Redis hashes: `agent:{agent_id}`
- Active agents indexed in Redis set: `agents:active`
- Agent status tracked: active, inactive, busy, stale
**Key Classes:**
- `AgentInfo` - Dataclass representing agent information
- `AgentRegistry` - Main registry class with Redis integration
- `AgentDiscoveryService` - Service for discovering agents with criteria
### 2. Load Balancer (`load_balancer.py`)
The Load Balancer distributes tasks across eligible agents using configurable strategies.
**Load Balancing Strategies:**
- `LEAST_CONNECTIONS` - Selects agent with fewest active connections (default)
- `ROUND_ROBIN` - Distributes tasks in circular order
- `WEIGHTED_ROUND_ROBIN` - Based on agent performance weights
- `RESOURCE_BASED` - Based on CPU/memory metrics
- `GEOGRAPHIC` - Based on agent location
- `RANDOM` - For testing purposes
**Key Classes:**
- `LoadBalancer` - Main load balancer class
- `TaskDistributor` - Manages task priority queues and distribution
- `TaskPriority` - Enum for task priorities (urgent, critical, high, normal, low)
**Task Distribution Flow:**
1. Task submitted to `TaskDistributor.submit_task()`
2. Task placed in appropriate priority queue
3. Background distribution loop processes queues
4. Load balancer finds eligible agents via `find_eligible_agents()`
5. Agent selected using configured strategy
6. Task assigned and agent metrics updated
### 3. REST API Routers
#### Agent Management (`routers/agents.py`)
**Endpoints:**
- `POST /agents/register` - Register new agent
- `POST /agents/discover` - Discover agents with filtering
- `GET /agents/{agent_id}` - Get agent information
- `PUT /agents/{agent_id}/status` - Update agent status
#### Task Management (`routers/tasks.py`)
**Endpoints:**
- `POST /tasks/submit` - Submit task for distribution
- `GET /tasks/status` - Get task distribution statistics
## Service Initialization
The service initializes in `lifespan.py` during FastAPI startup:
```python
async def lifespan(app: FastAPI):
# Create AgentRegistry with Redis backing
state.agent_registry = AgentRegistry()
await state.agent_registry.start()
# Create LoadBalancer with registry
state.load_balancer = LoadBalancer(state.agent_registry)
state.load_balancer.set_strategy(LoadBalancingStrategy.LEAST_CONNECTIONS)
# Create TaskDistributor
state.task_distributor = TaskDistributor(state.load_balancer)
# Start background tasks
asyncio.create_task(state.task_distributor.start_distribution())
asyncio.create_task(state.message_processor.start_processing())
```
## Redis Persistence Model
### Agent Data Structure
**Hash Key:** `agent:{agent_id}`
**Fields:**
- `agent_id` - Unique identifier
- `agent_type` - Type (worker, provider, consumer, general)
- `status` - Current status (active, inactive, busy, stale)
- `capabilities` - JSON array of capabilities
- `services` - JSON array of available services
- `endpoints` - JSON object of service endpoints
- `metadata` - JSON object of additional metadata
- `last_heartbeat` - Timestamp of last heartbeat
- `registration_time` - Timestamp of registration
- `load_metrics` - JSON object of load metrics
- `health_score` - Calculated health score (0.0-1.0)
- `version` - Agent version
- `tags` - JSON array of tags
### Indexes
**Set Key:** `agents:active` - Contains IDs of all active agents
## Agent Lifecycle
### Registration
1. Agent sends POST /agents/register with agent information
2. Coordinator validates agent data
3. Agent info stored in Redis
4. Agent added to active agents set
5. Success response returned
### Heartbeat
1. Agent sends heartbeat (not yet implemented as endpoint)
2. Last heartbeat timestamp updated
3. Health score recalculated
4. Stale agents marked as inactive (configurable timeout)
### Status Update
1. Agent sends PUT /agents/{agent_id}/status
2. Status and load metrics updated
3. Load balancer uses updated metrics for task assignment
### Deregistration
1. Agent marked as inactive
2. Removed from active agents set
3. Data retained in Redis for historical purposes
## Task Distribution Flow
### Task Submission
```mermaid
sequenceDiagram
participant Client
participant Coordinator
participant LoadBalancer
participant AgentRegistry
participant Redis
participant Agent
Client->>Coordinator: POST /tasks/submit
Coordinator->>TaskDistributor: submit_task()
TaskDistributor->>TaskDistributor: add to priority queue
TaskDistributor->>LoadBalancer: find_eligible_agents()
LoadBalancer->>AgentRegistry: discover_agents(criteria)
AgentRegistry->>Redis: query active agents
Redis-->>AgentRegistry: agent data
AgentRegistry-->>LoadBalancer: eligible agents
LoadBalancer->>LoadBalancer: select_agent(strategy)
LoadBalancer->>Redis: update agent metrics
LoadBalancer-->>TaskDistributor: selected agent
TaskDistributor->>Agent: assign task
Coordinator-->>Client: task submitted
```
### Load Balancing
The load balancer uses the following criteria to select agents:
1. Agent status must be "active"
2. Agent must have required capabilities
3. Agent type must match requirements
4. Health score must be above threshold
5. Load metrics must be within limits
## Configuration
### Environment Variables
- `AITBC_REDIS_URL` - Redis connection URL (default: redis://localhost:6379)
- `AITBC_COORDINATOR_PORT` - Coordinator service port (default: 9001)
- `AITBC_LOG_LEVEL` - Logging level (default: INFO)
### Load Balancing Configuration
- Default strategy: LEAST_CONNECTIONS
- Strategy can be changed via LoadBalancer.set_strategy()
- Priority queues: urgent, critical, high, normal, low
### Health Check Configuration
- Heartbeat timeout: 300 seconds (configurable)
- Health score threshold: 0.5 (configurable)
- Stale agent detection: enabled by default
## Monitoring
### Metrics Available
- Active agents count
- Tasks distributed/completed/failed
- Average distribution time
- Load balancer success rate
- Agent load distribution
- Queue sizes per priority
### Monitoring Endpoints
- `GET /tasks/status` - Task distribution statistics
- `GET /health` - Service health check
- Future: Prometheus metrics endpoint
## Security
### Authentication
- API key authentication via middleware (optional)
- JWT token support (optional)
- Role-based access control (optional)
### Rate Limiting
- Not currently implemented
- Can be added via FastAPI middleware
## Scalability
### Horizontal Scaling
- Multiple coordinator instances can run behind a load balancer
- Redis provides shared state across instances
- Agent registry is distributed via Redis
### Performance Considerations
- Redis operations are O(1) or O(log N)
- Task distribution is asynchronous
- Priority queues prevent starvation
- Load balancing strategies can be tuned
## Troubleshooting
### Common Issues
**No active agents:**
- Check Redis connection
- Verify agents are registered
- Check agent status (may be inactive/stale)
**Tasks not distributing:**
- Check task distributor is running
- Verify eligible agents exist
- Check load balancer strategy
- Review task requirements
**Agent not discovered:**
- Verify agent registration succeeded
- Check agent status is active
- Verify capabilities match query
- Check Redis connection
### Debug Commands
```bash
# Check service status
systemctl status aitbc-agent-coordinator.service
# View logs
journalctl -u aitbc-agent-coordinator.service -f
# Check Redis
redis-cli
> KEYS agent:*
> SMEMBERS agents:active
# Test API
curl http://localhost:9001/health
curl http://localhost:9001/tasks/status
```
## Future Enhancements
Planned improvements (see Phase 3):
- Agent heartbeat mechanism
- Additional load balancing strategies
- Task priority queue management
- Agent metrics dashboard
- WebSocket support for real-time updates

View File

@@ -0,0 +1,471 @@
# AITBC Agent Coordinator - CLI Reference
The AITBC CLI provides commands for interacting with the Agent Coordinator service for agent management and task distribution.
## Agent SDK Commands
### Register Agent
Register a new agent with the coordinator service.
**Command:**
```bash
aitbc-cli agent sdk register --agent-id <ID> [OPTIONS]
```
**Required Arguments:**
- `--agent-id`: Unique identifier for the agent
**Optional Arguments:**
- `--type`: Agent type (provider, consumer, general, worker) - default: worker
- `--capabilities`: Comma-separated list of agent capabilities
- `--services`: Comma-separated list of available services
- `--endpoints`: JSON string of service endpoints
- `--metadata`: JSON string of metadata
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
# Basic registration
aitbc-cli agent sdk register --agent-id hermes-agent --type worker
# Full registration with all parameters
aitbc-cli agent sdk register \
--agent-id hermes-agent \
--type worker \
--capabilities "data-processing,analysis,debugging" \
--services "task-execution,coordination" \
--endpoints '{"http":"http://localhost:9002"}' \
--metadata '{"version":"1.0.0","owner":"aitbc"}'
```
**Output:**
```
Registering agent hermes-agent with coordinator at http://localhost:9001...
Agent registered successfully
Registration:
Status: success
Message: Agent hermes-agent registered successfully
Agent Id: hermes-agent
Registered At: 2026-05-07T16:26:55.464178+00:00
```
### List Agents
Discover and list agents from the coordinator.
**Command:**
```bash
aitbc-cli agent sdk list [OPTIONS]
```
**Optional Arguments:**
- `--status`: Filter by agent status (active, inactive, busy, stale)
- `--agent-type`: Filter by agent type
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
# List all agents
aitbc-cli agent sdk list
# List only active agents
aitbc-cli agent sdk list --status active
# List worker type agents
aitbc-cli agent sdk list --agent-type worker
```
**Output:**
```
Discovering agents from coordinator at http://localhost:9001...
Found 2 agents
Agents:
Status: success
Query: {}
Agents:
- Agent details...
Count: 2
Timestamp: 2026-05-07T16:39:34.254450+00:00
```
### Get Agent Status
Retrieve detailed information about a specific agent.
**Command:**
```bash
aitbc-cli agent sdk status --agent-id <ID> [OPTIONS]
```
**Required Arguments:**
- `--agent-id`: Unique identifier of the agent
**Optional Arguments:**
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
aitbc-cli agent sdk status --agent-id hermes-agent
```
**Output:**
```
Getting agent info for hermes-agent from coordinator at http://localhost:9001...
Agent info retrieved
Agent:
Status: success
Agent: Agent details...
Timestamp: 2026-05-07T16:39:42.744729+00:00
```
### Update Agent Status
Update the status and load metrics of an agent.
**Command:**
```bash
aitbc-cli agent sdk update-status --agent-id <ID> --status <STATUS> [OPTIONS]
```
**Required Arguments:**
- `--agent-id`: Unique identifier of the agent
- `--status`: New status (active, inactive, busy, stale)
**Optional Arguments:**
- `--load-metrics`: JSON string of load metrics
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
# Mark agent as busy
aitbc-cli agent sdk update-status --agent-id hermes-agent --status busy
# Update status with load metrics
aitbc-cli agent sdk update-status \
--agent-id hermes-agent \
--status busy \
--load-metrics '{"active_connections":5,"pending_tasks":2}'
```
**Output:**
```
Updating agent hermes-agent status to busy...
Agent status updated successfully
Status Update:
Status: success
Message: Agent hermes-agent status updated
Agent Id: hermes-agent
New Status: busy
Updated At: 2026-05-07T16:40:03.536877+00:00
```
## AI Commands
### Submit AI Job
Submit an AI job to the coordinator for distribution.
**Command:**
```bash
aitbc-cli ai submit --wallet <WALLET> --type <TYPE> --prompt <PROMPT> [OPTIONS]
```
**Required Arguments:**
- `--wallet`: Wallet name for the transaction
- `--type`: Job type or model name
- `--prompt`: Prompt for the AI job
**Optional Arguments:**
- `--payment`: Payment amount
- `--password`: Wallet password
- `--password-file`: Path to password file
- `--chain-id`: Chain ID
- `--rpc-url`: RPC URL
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
aitbc-cli ai submit \
--wallet openclaw-trainee \
--type llama2 \
--prompt "Explain quantum computing"
```
**Output:**
```
Submitting AI job to http://localhost:9001...
AI job submitted successfully
Job: Job details...
```
### Get Task Distribution Statistics
Get task distribution statistics from the agent coordinator.
**Command:**
```bash
aitbc-cli ai distribution-stats [OPTIONS]
```
**Optional Arguments:**
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
aitbc-cli ai distribution-stats
```
**Output:**
```
Getting task distribution statistics from http://localhost:9001...
Task distribution statistics:
Status: success
Stats:
tasks_distributed: 1
tasks_completed: 1
tasks_failed: 0
load_balancer_stats:
strategy: least_connections
active_agents: 1
total_assignments: 1
...
Timestamp: 2026-05-07T16:38:40.722733+00:00
```
### AI Service Status
Check the status of AI services (agent coordinator + blockchain AI).
**Command:**
```bash
aitbc-cli ai status [OPTIONS]
```
**Optional Arguments:**
- `--coordinator-url`: Coordinator URL
- `--rpc-url`: RPC URL
- `--chain-id`: Chain ID
**Examples:**
```bash
aitbc-cli ai status
```
**Output:**
```
Checking Agent Coordinator at http://localhost:9001...
Agent Coordinator: healthy (v1.0.0)
Checking Blockchain AI stats at http://localhost:8006...
Blockchain AI Stats: Available
Overall Status: operational
Agent Coordinator: Operational
Blockchain AI: Operational
```
## Common Options
### Output Format
All CLI commands support different output formats:
```bash
aitbc-cli --output json agent sdk list
aitbc-cli --output yaml agent sdk status --agent-id hermes-agent
aitbc-cli --output table ai distribution-stats
```
### Verbose Mode
Enable verbose output for debugging:
```bash
aitbc-cli --verbose agent sdk register --agent-id test-agent
```
### Debug Mode
Enable debug mode for detailed troubleshooting:
```bash
aitbc-cli --debug agent sdk list
```
## Workflows
### Register and Verify Agent
```bash
# Register agent
aitbc-cli agent sdk register \
--agent-id my-agent \
--type worker \
--capabilities "data-processing,analysis"
# Verify registration
aitbc-cli agent sdk status --agent-id my-agent
# Check if agent appears in discovery
aitbc-cli agent sdk list --status active
```
### Submit and Monitor Task
```bash
# Submit task
aitbc-cli ai submit \
--wallet openclaw-trainee \
--type llama2 \
--prompt "test prompt"
# Check distribution stats
aitbc-cli ai distribution-stats
# Monitor active agents
aitbc-cli agent sdk list --status active
```
### Update Agent Load
```bash
# Mark agent as busy
aitbc-cli agent sdk update-status \
--agent-id my-agent \
--status busy \
--load-metrics '{"active_connections":10,"pending_tasks":5}'
# Mark agent as available again
aitbc-cli agent sdk update-status \
--agent-id my-agent \
--status active \
--load-metrics '{"active_connections":0,"pending_tasks":0}'
```
## Error Handling
### Common Errors
**Agent not found:**
```
Agent not found: my-agent
```
Solution: Verify the agent ID is correct and the agent is registered.
**Coordinator unavailable:**
```
Error registering agent: Connection refused
```
Solution: Check that the coordinator service is running on port 9001.
**Invalid parameters:**
```
Error: --agent-id and --status are required
```
Solution: Provide all required arguments.
### Troubleshooting
**Check service status:**
```bash
systemctl status aitbc-agent-coordinator.service
```
**View service logs:**
```bash
journalctl -u aitbc-agent-coordinator.service -f
```
**Test coordinator health:**
```bash
curl http://localhost:9001/health
```
**Test coordinator API directly:**
```bash
curl http://localhost:9001/tasks/status
```
## Environment Variables
The CLI respects the following environment variables:
- `AITBC_COORDINATOR_URL`: Default coordinator URL
- `AITBC_RPC_URL`: Default RPC URL
- `AITBC_CHAIN_ID`: Default chain ID
Example:
```bash
export AITBC_COORDINATOR_URL=http://localhost:9001
aitbc-cli agent sdk list
```
## Configuration
The CLI configuration is stored in:
- `~/.aitbc/config.json` - User-specific configuration
- `/etc/aitbc/config.json` - System-wide configuration
Configuration file format:
```json
{
"coordinator_url": "http://localhost:9001",
"rpc_url": "http://localhost:8006",
"chain_id": "ait-mainnet",
"default_wallet": "openclaw-trainee"
}
```
## Integration with Other CLI Commands
The agent coordinator CLI integrates with other AITBC CLI commands:
- `aitbc-cli wallet` - For wallet management
- `aitbc-cli blockchain` - For blockchain operations
- `aitbc-cli ai` - For AI job submission and monitoring
- `aitbc-cli system` - For system status and operations
## Advanced Usage
### Batch Agent Registration
```bash
#!/bin/bash
# Register multiple agents
for i in {1..5}; do
aitbc-cli agent sdk register \
--agent-id "agent-$i" \
--type worker \
--capabilities "data-processing"
done
```
### Monitoring Script
```bash
#!/bin/bash
# Monitor agent coordinator
while true; do
clear
echo "=== Agent Coordinator Status ==="
aitbc-cli ai distribution-stats
echo ""
echo "=== Active Agents ==="
aitbc-cli agent sdk list --status active
sleep 5
done
```
### Load Testing
```bash
#!/bin/bash
# Submit multiple tasks
for i in {1..10}; do
aitbc-cli ai submit \
--wallet openclaw-trainee \
--type llama2 \
--prompt "Test task $i" &
done
wait
```

View File

@@ -0,0 +1,645 @@
# AITBC Agent Coordinator - Operator Guide
This guide provides operators with the knowledge to deploy, configure, monitor, and troubleshoot the AITBC Agent Coordinator service.
## Service Deployment
### Prerequisites
- Redis server running on localhost or remote host
- Python 3.13+
- Systemd (for service management)
- AITBC blockchain node (optional, for blockchain integration)
### Installation
1. **Install dependencies:**
```bash
cd /opt/aitbc/apps/agent-coordinator
pip install -r requirements.txt
```
2. **Configure environment:**
```bash
# Edit /etc/aitbc/.env
export AITBC_REDIS_URL=redis://localhost:6379
export AITBC_COORDINATOR_PORT=9001
export AITBC_LOG_LEVEL=INFO
```
3. **Start Redis:**
```bash
systemctl start redis
systemctl enable redis
```
4. **Start coordinator service:**
```bash
systemctl start aitbc-agent-coordinator.service
systemctl enable aitbc-agent-coordinator.service
```
### Service Configuration
**Service file location:** `/etc/systemd/system/aitbc-agent-coordinator.service`
**Key configuration parameters:**
- `PYTHONPATH=apps/agent-coordinator/src` - Python module path
- `uvicorn app.main:app` - FastAPI application entry point
- `--host 0.0.0.0` - Bind to all interfaces
- `--port 9001` - Service port
### Redis Configuration
**Connection URL:** `redis://localhost:6379/0`
**Redis data persistence:**
- Agent data: `agent:{agent_id}` (hash)
- Active agents: `agents:active` (set)
- Load metrics: Stored in agent hash
**Redis monitoring:**
```bash
redis-cli
> KEYS agent:*
> SMEMBERS agents:active
> HGETALL agent:hermes-agent
```
## Agent Registration Procedures
### Manual Registration via CLI
**Basic registration:**
```bash
aitbc-cli agent sdk register \
--agent-id my-agent \
--type worker \
--coordinator-url http://localhost:9001
```
**Full registration with capabilities:**
```bash
aitbc-cli agent sdk register \
--agent-id my-agent \
--type worker \
--capabilities "data-processing,analysis,debugging" \
--services "task-execution,coordination" \
--endpoints '{"http":"http://my-host:9002"}' \
--metadata '{"version":"1.0.0","owner":"my-team"}' \
--coordinator-url http://localhost:9001
```
### Automated Registration Script
```bash
#!/bin/bash
# register_agents.sh
COORDINATOR_URL="http://localhost:9001"
register_agent() {
local agent_id=$1
local agent_type=$2
local capabilities=$3
aitbc-cli agent sdk register \
--agent-id "$agent_id" \
--type "$agent_type" \
--capabilities "$capabilities" \
--coordinator-url "$COORDINATOR_URL"
}
# Register agents
register_agent "worker-1" "worker" "data-processing,analysis"
register_agent "worker-2" "worker" "data-processing,analysis"
register_agent "worker-3" "worker" "inference,training"
```
### Cross-Node Registration
Register agents on multiple nodes for distributed task distribution:
```bash
# Register agent on aitbc1
curl -X POST http://aitbc1:9001/agents/register \
-H "Content-Type: application/json" \
-d '{
"agent_id": "aitbc1-worker",
"agent_type": "worker",
"capabilities": ["data-processing"],
"endpoints": {"http": "http://aitbc1:9002"}
}'
# Register agent on aitbc2
curl -X POST http://aitbc2:9001/agents/register \
-H "Content-Type: application/json" \
-d '{
"agent_id": "aitbc2-worker",
"agent_type": "worker",
"capabilities": ["inference"],
"endpoints": {"http": "http://aitbc2:9002"}
}'
```
## Monitoring and Troubleshooting
### Health Checks
**Service health:**
```bash
curl http://localhost:9001/health
```
**Expected response:**
```json
{
"status": "healthy",
"version": "1.0.0",
"timestamp": "2026-05-07T16:00:00.000000+00:00"
}
```
**Task distribution stats:**
```bash
curl http://localhost:9001/tasks/status
```
**CLI health check:**
```bash
aitbc-cli ai status
```
### Service Status
**Check systemd service:**
```bash
systemctl status aitbc-agent-coordinator.service
```
**View service logs:**
```bash
journalctl -u aitbc-agent-coordinator.service -f
```
**View recent logs:**
```bash
journalctl -u aitbc-agent-coordinator.service -n 100
```
### Agent Monitoring
**List all agents:**
```bash
aitbc-cli agent sdk list
```
**List active agents only:**
```bash
aitbc-cli agent sdk list --status active
```
**Check specific agent:**
```bash
aitbc-cli agent sdk status --agent-id my-agent
```
**Monitor distribution stats:**
```bash
aitbc-cli ai distribution-stats
```
### Redis Monitoring
**Check Redis connection:**
```bash
redis-cli ping
```
**View all registered agents:**
```bash
redis-cli
> KEYS agent:*
```
**View active agents:**
```bash
redis-cli
> SMEMBERS agents:active
```
**View agent details:**
```bash
redis-cli
> HGETALL agent:my-agent
```
**Monitor Redis memory:**
```bash
redis-cli INFO memory
```
### Common Issues and Solutions
#### Service won't start
**Symptoms:**
```
Failed to start aitbc-agent-coordinator.service
```
**Solutions:**
1. Check Redis is running:
```bash
systemctl status redis
```
2. Check Redis connection:
```bash
redis-cli ping
```
3. Check service logs:
```bash
journalctl -u aitbc-agent-coordinator.service -n 50
```
4. Verify PYTHONPATH:
```bash
echo $PYTHONPATH
# Should include: /opt/aitbc/apps/agent-coordinator/src
```
#### No agents discovered
**Symptoms:**
```bash
aitbc-cli agent sdk list
Found 0 agents
```
**Solutions:**
1. Check if agents are registered:
```bash
redis-cli SMEMBERS agents:active
```
2. Register an agent:
```bash
aitbc-cli agent sdk register --agent-id test-agent --type worker
```
3. Check agent status:
```bash
aitbc-cli agent sdk status --agent-id test-agent
```
#### Tasks not distributing
**Symptoms:**
- Tasks submitted but not assigned
- `tasks_distributed` count not increasing
**Solutions:**
1. Check for active agents:
```bash
aitbc-cli agent sdk list --status active
```
2. Check task distributor status:
```bash
curl http://localhost:9001/tasks/status
```
3. Verify agent capabilities match task requirements
4. Check load balancer strategy
5. Review service logs for errors
#### Agent marked as stale
**Symptoms:**
- Agent status changes from active to stale
- Agent not receiving new tasks
**Solutions:**
1. Update agent status:
```bash
aitbc-cli agent sdk update-status --agent-id my-agent --status active
```
2. Check heartbeat mechanism (if implemented)
3. Verify agent is still running
4. Check network connectivity
#### Redis connection errors
**Symptoms:**
```
Error connecting to Redis
```
**Solutions:**
1. Check Redis service:
```bash
systemctl status redis
```
2. Restart Redis:
```bash
systemctl restart redis
```
3. Check Redis configuration:
```bash
redis-cli INFO server
```
4. Verify Redis URL in environment:
```bash
echo $AITBC_REDIS_URL
```
## Performance Tuning
### Load Balancing Strategies
**Current default:** `LEAST_CONNECTIONS`
**Available strategies:**
- `LEAST_CONNECTIONS` - Fewest active connections
- `ROUND_ROBIN` - Circular distribution
- `WEIGHTED_ROUND_ROBIN` - Performance-based
- `RESOURCE_BASED` - CPU/memory metrics
- `GEOGRAPHIC` - Location-based
- `RANDOM` - Random selection (testing)
**Changing strategy:** (requires code modification in `lifespan.py`)
### Priority Queue Configuration
**Priority levels:**
1. urgent
2. critical
3. high
4. normal
5. low
**Queue sizing:** Configured in `TaskDistributor` class
**Monitoring queue sizes:**
```bash
curl http://localhost:9001/tasks/status | jq .stats.queue_sizes
```
### Resource Limits
**Redis memory limits:**
```bash
redis-cli CONFIG SET maxmemory 1gb
redis-cli CONFIG SET maxmemory-policy allkeys-lru
```
**Service memory limits:** (configure in systemd service file)
```
MemoryLimit=2G
MemorySwap=2G
```
**Connection limits:** (configure in uvicorn startup)
```
--limit-concurrency 100
```
## Security Considerations
### Network Security
**Bind to specific interface:**
```bash
# In service file, change --host 0.0.0.0 to --host 127.0.0.1 for local only
--host 127.0.0.1
```
**Use firewall:**
```bash
# Allow only specific IPs
ufw allow from 192.168.1.0/24 to any port 9001
```
### Authentication
**Future implementation:** API key authentication and JWT tokens
**Current status:** No authentication (open access)
**Recommendation:** Deploy behind reverse proxy with authentication
### Data Encryption
**Redis encryption:** Configure Redis with TLS
**API encryption:** Use HTTPS in production
## Backup and Recovery
### Redis Backup
**Manual backup:**
```bash
redis-cli SAVE
cp /var/lib/redis/dump.rdb /backup/redis-$(date +%Y%m%d).rdb
```
**Automated backup:**
```bash
#!/bin/bash
# backup_redis.sh
redis-cli BGSAVE
sleep 5
cp /var/lib/redis/dump.rdb /backup/redis-$(date +%Y%m%d-%H%M%S).rdb
# Keep last 7 days
find /backup -name "redis-*.rdb" -mtime +7 -delete
```
**Restore from backup:**
```bash
systemctl stop redis
cp /backup/redis-20260507.rdb /var/lib/redis/dump.rdb
chown redis:redis /var/lib/redis/dump.rdb
systemctl start redis
```
### Service Configuration Backup
**Backup service file:**
```bash
cp /etc/systemd/system/aitbc-agent-coordinator.service /backup/
```
**Backup environment:**
```bash
cp /etc/aitbc/.env /backup/
```
## Scaling
### Horizontal Scaling
**Multiple coordinator instances:**
1. Deploy multiple coordinator instances behind load balancer
2. Use shared Redis instance
3. Configure consistent PYTHONPATH across instances
**Load balancer configuration:**
```nginx
upstream coordinator {
server localhost:9001;
server localhost:9002;
server localhost:9003;
}
server {
listen 80;
location / {
proxy_pass http://coordinator;
}
}
```
### Redis Clustering
**For high availability:**
- Use Redis Sentinel for failover
- Use Redis Cluster for sharding
- Configure coordinator to use Redis Sentinel
## Maintenance
### Regular Maintenance Tasks
**Daily:**
- Monitor service health
- Check task distribution stats
- Review error logs
**Weekly:**
- Backup Redis data
- Review agent registrations
- Clean up stale agents
**Monthly:**
- Review performance metrics
- Update software dependencies
- Audit security configurations
### Agent Cleanup
**Remove inactive agents:**
```bash
redis-cli
> SREM agents:active "stale-agent-id"
> DEL agent:stale-agent-id
```
**Bulk cleanup script:**
```bash
#!/bin/bash
# cleanup_stale_agents.sh
redis-cli --scan --pattern "agent:*" | while read key; do
status=$(redis-cli HGET "$key" status)
if [ "$status" = "stale" ]; then
agent_id=$(echo "$key" | cut -d: -f2)
redis-cli SREM agents:active "$agent_id"
redis-cli DEL "$key"
echo "Removed stale agent: $agent_id"
fi
done
```
### Service Restart
**Graceful restart:**
```bash
systemctl reload aitbc-agent-coordinator.service
```
**Force restart:**
```bash
systemctl restart aitbc-agent-coordinator.service
```
**Rolling restart (multiple instances):**
```bash
for i in {1..3}; do
systemctl restart aitbc-agent-coordinator@$i.service
sleep 10
done
```
## Alerting
### Recommended Alerts
**Service alerts:**
- Service down (health check fails)
- High error rate (> 5%)
- High response time (> 5s)
**Agent alerts:**
- No active agents
- Agent registration failures
- Agent stale count increasing
**Task alerts:**
- Task queue backlog (> 100 tasks)
- Task failure rate (> 10%)
- Distribution time increasing
**Redis alerts:**
- Redis connection failures
- Redis memory usage > 80%
- Redis latency > 100ms
### Monitoring Tools
**Prometheus metrics:** (future implementation)
- Export metrics at `/metrics` endpoint
- Use Grafana for visualization
**Log aggregation:**
- Send logs to ELK stack
- Use Loki for log storage
- Configure alerting based on log patterns
## Troubleshooting Checklist
When issues occur, check in this order:
1. **Service status**
- [ ] Service running?
- [ ] Health check passing?
- [ ] Logs showing errors?
2. **Redis status**
- [ ] Redis running?
- [ ] Connection successful?
- [ ] Memory usage normal?
3. **Agent status**
- [ ] Agents registered?
- [ ] Agents active?
- [ ] Agent capabilities valid?
4. **Task status**
- [ ] Tasks submitting?
- [ ] Tasks distributing?
- [ ] Tasks completing?
5. **Network**
- [ ] Connectivity to Redis?
- [ ] Connectivity to agents?
- [ ] Firewall rules correct?
6. **Configuration**
- [ ] Environment variables set?
- [ ] PYTHONPATH correct?
- [ ] Port available?

View File

@@ -1,68 +0,0 @@
{
"stage": 1,
"title": "Foundation Wallets & Accounts",
"prerequisites": [
"AITBC node running",
"Genesis wallet funded (999,999,890 AIT)"
],
"commands": [
{
"cmd": "wallet create",
"args": ["training-w1", "--password", "abc123"],
"exit_code": 0
},
{
"cmd": "wallet list",
"args": [],
"re": "training-w1"
},
{
"cmd": "wallet",
"args": ["--wallet-name", "genesis", "send", "proposer", "1000000", "--fee", "10", "--password", ""],
"exit_code": 0
},
{
"cmd": "mining",
"args": ["start"],
"exit_code": 0
},
{
"cmd": "sleep",
"args": ["15"],
"exit_code": 0
},
{
"cmd": "wallet",
"args": ["--wallet-name", "proposer", "send", "training-w1", "100", "--fee", "10", "--password", ""],
"exit_code": 0
},
{
"cmd": "mining",
"args": ["start"],
"exit_code": 0
},
{
"cmd": "sleep",
"args": ["15"],
"exit_code": 0
},
{
"cmd": "wallet balance",
"args": ["training-w1"],
"re": "100"
}
],
"expected": {
"wallet_exists": {
"type": "value",
"value": true
},
"balance": {
"type": "value",
"value": {
"symbol": "AIT",
"amount": 100
}
}
}
}

View File

@@ -0,0 +1,29 @@
#!/bin/bash
# Register Hermes Agent with AITBC Agent Coordinator
COORDINATOR_URL="http://localhost:9001"
AGENT_ID="hermes-agent"
AGENT_TYPE="worker"
CAPABILITIES='["data-processing", "analysis", "general", "debugging", "planning"]'
SERVICES='["task-execution", "analysis", "coordination"]'
ENDPOINTS='{"http": "http://localhost:9002", "callback": "http://localhost:9002/callback"}'
echo "Registering Hermes Agent with coordinator at $COORDINATOR_URL"
curl -X POST "$COORDINATOR_URL/agents/register" \
-H "Content-Type: application/json" \
-d "{
\"agent_id\": \"$AGENT_ID\",
\"agent_type\": \"$AGENT_TYPE\",
\"capabilities\": $CAPABILITIES,
\"services\": $SERVICES,
\"endpoints\": $ENDPOINTS,
\"metadata\": {
\"version\": \"1.0.0\",
\"owner\": \"aitbc\",
\"description\": \"Hermes AI agent for task coordination and analysis\"
}
}"
echo ""
echo "Hermes Agent registration complete"

View File

@@ -0,0 +1,348 @@
"""Integration tests for AITBC Agent Coordinator service."""
import pytest
import asyncio
import httpx
from typing import Dict, Any
@pytest.fixture
async def coordinator_client():
"""Create an HTTP client for coordinator API."""
async with httpx.AsyncClient(base_url="http://localhost:9001", timeout=30) as client:
yield client
@pytest.fixture
def sample_agent_data():
"""Sample agent registration data."""
return {
"agent_id": "test-integration-agent",
"agent_type": "worker",
"capabilities": ["data-processing", "analysis"],
"services": ["task-execution"],
"endpoints": {"http": "http://localhost:9002"},
"metadata": {"version": "1.0.0", "test": True}
}
@pytest.fixture
def sample_task_data():
"""Sample task submission data."""
return {
"task_data": {
"model": "llama2",
"prompt": "test prompt"
},
"priority": "normal",
"requirements": {}
}
class TestAgentRegistration:
"""Test agent registration endpoints."""
@pytest.mark.asyncio
async def test_register_agent_success(self, coordinator_client, sample_agent_data):
"""Test successful agent registration."""
response = await coordinator_client.post("/agents/register", json=sample_agent_data)
assert response.status_code in (200, 201)
data = response.json()
assert data["status"] == "success"
assert data["agent_id"] == sample_agent_data["agent_id"]
@pytest.mark.asyncio
async def test_register_agent_duplicate(self, coordinator_client, sample_agent_data):
"""Test registering duplicate agent."""
# Register first time
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Try to register again
response = await coordinator_client.post("/agents/register", json=sample_agent_data)
# Should succeed (update existing) or fail depending on implementation
assert response.status_code in (200, 201, 409)
@pytest.mark.asyncio
async def test_register_agent_invalid_data(self, coordinator_client):
"""Test registration with invalid data."""
invalid_data = {"agent_id": "invalid"} # Missing required fields
response = await coordinator_client.post("/agents/register", json=invalid_data)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_register_agent_missing_agent_id(self, coordinator_client):
"""Test registration without agent ID."""
invalid_data = {"agent_type": "worker"}
response = await coordinator_client.post("/agents/register", json=invalid_data)
assert response.status_code == 422
class TestAgentDiscovery:
"""Test agent discovery endpoints."""
@pytest.mark.asyncio
async def test_discover_all_agents(self, coordinator_client, sample_agent_data):
"""Test discovering all agents."""
# Register an agent first
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Discover all agents
response = await coordinator_client.post("/agents/discover", json={})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "agents" in data
assert "count" in data
@pytest.mark.asyncio
async def test_discover_by_status(self, coordinator_client, sample_agent_data):
"""Test discovering agents by status."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Discover active agents
response = await coordinator_client.post("/agents/discover", json={"status": "active"})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
@pytest.mark.asyncio
async def test_discover_by_type(self, coordinator_client, sample_agent_data):
"""Test discovering agents by type."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Discover worker agents
response = await coordinator_client.post("/agents/discover", json={"agent_type": "worker"})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
@pytest.mark.asyncio
async def test_discover_empty_result(self, coordinator_client):
"""Test discovering agents with no matches."""
# Search for non-existent type
response = await coordinator_client.post("/agents/discover", json={"agent_type": "nonexistent"})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["count"] == 0
class TestAgentStatus:
"""Test agent status endpoints."""
@pytest.mark.asyncio
async def test_get_agent_info(self, coordinator_client, sample_agent_data):
"""Test getting agent information."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Get agent info
response = await coordinator_client.get(f"/agents/{sample_agent_data['agent_id']}")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["agent"]["agent_id"] == sample_agent_data["agent_id"]
@pytest.mark.asyncio
async def test_get_agent_not_found(self, coordinator_client):
"""Test getting non-existent agent."""
response = await coordinator_client.get("/agents/nonexistent-agent")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_update_agent_status(self, coordinator_client, sample_agent_data):
"""Test updating agent status."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Update status
response = await coordinator_client.put(
f"/agents/{sample_agent_data['agent_id']}/status",
json={"status": "busy", "load_metrics": {"active_connections": 5}}
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["new_status"] == "busy"
@pytest.mark.asyncio
async def test_update_agent_status_invalid(self, coordinator_client, sample_agent_data):
"""Test updating with invalid status."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Try invalid status
response = await coordinator_client.put(
f"/agents/{sample_agent_data['agent_id']}/status",
json={"status": "invalid_status"}
)
assert response.status_code in (400, 422)
class TestTaskDistribution:
"""Test task distribution endpoints."""
@pytest.mark.asyncio
async def test_submit_task_success(self, coordinator_client, sample_task_data):
"""Test successful task submission."""
response = await coordinator_client.post("/tasks/submit", json=sample_task_data)
assert response.status_code in (200, 201)
data = response.json()
assert data["status"] == "success"
assert "task_id" in data
@pytest.mark.asyncio
async def test_submit_task_invalid_priority(self, coordinator_client):
"""Test task submission with invalid priority."""
invalid_data = {
"task_data": {"model": "llama2", "prompt": "test"},
"priority": "invalid_priority",
"requirements": {}
}
response = await coordinator_client.post("/tasks/submit", json=invalid_data)
assert response.status_code == 400
@pytest.mark.asyncio
async def test_task_distribution_stats(self, coordinator_client):
"""Test getting task distribution statistics."""
response = await coordinator_client.get("/tasks/status")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "stats" in data
assert "load_balancer_stats" in data["stats"]
@pytest.mark.asyncio
async def test_task_assignment_with_active_agent(self, coordinator_client, sample_agent_data, sample_task_data):
"""Test task assignment to active agent."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Submit task
response = await coordinator_client.post("/tasks/submit", json=sample_task_data)
assert response.status_code in (200, 201)
# Check stats
await asyncio.sleep(1) # Give time for distribution
stats_response = await coordinator_client.get("/tasks/status")
stats_data = stats_response.json()
assert stats_data["stats"]["tasks_distributed"] >= 1
class TestLoadBalancing:
"""Test load balancing functionality."""
@pytest.mark.asyncio
async def test_least_connections_strategy(self, coordinator_client):
"""Test least connections load balancing strategy."""
# Register multiple agents
agents = [
{"agent_id": "agent-1", "agent_type": "worker"},
{"agent_id": "agent-2", "agent_type": "worker"},
{"agent_id": "agent-3", "agent_type": "worker"}
]
for agent in agents:
await coordinator_client.post("/agents/register", json=agent)
# Submit multiple tasks
for i in range(5):
await coordinator_client.post("/tasks/submit", json={
"task_data": {"task": f"task-{i}"},
"priority": "normal",
"requirements": {}
})
# Check distribution
await asyncio.sleep(2)
stats_response = await coordinator_client.get("/tasks/status")
stats_data = stats_response.json()
assert stats_data["stats"]["load_balancer_stats"]["active_agents"] >= 3
@pytest.mark.asyncio
async def test_no_eligible_agents(self, coordinator_client, sample_task_data):
"""Test task submission when no eligible agents exist."""
# Submit task without any agents registered
response = await coordinator_client.post("/tasks/submit", json=sample_task_data)
# Should succeed (task queued) or fail depending on implementation
assert response.status_code in (200, 201, 503)
class TestQueueManagement:
"""Test task queue management endpoints."""
@pytest.mark.asyncio
async def test_get_queue_sizes(self, coordinator_client):
"""Test getting queue sizes."""
response = await coordinator_client.get("/tasks/queues")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "queue_sizes" in data
@pytest.mark.asyncio
async def test_clear_queue(self, coordinator_client, sample_task_data):
"""Test clearing a priority queue."""
# Submit some tasks
for i in range(3):
await coordinator_client.post("/tasks/submit", json=sample_task_data)
# Clear normal priority queue
response = await coordinator_client.post("/tasks/queues/normal/clear")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "cleared_count" in data
@pytest.mark.asyncio
async def test_clear_invalid_queue(self, coordinator_client):
"""Test clearing with invalid priority."""
response = await coordinator_client.post("/tasks/queues/invalid/clear")
assert response.status_code == 400
@pytest.mark.asyncio
async def test_get_queue_stats(self, coordinator_client):
"""Test getting detailed queue statistics."""
response = await coordinator_client.get("/tasks/queues/stats")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "queue_sizes" in data
assert "distribution_stats" in data
class TestHeartbeat:
"""Test agent heartbeat functionality."""
@pytest.mark.asyncio
async def test_agent_heartbeat(self, coordinator_client, sample_agent_data):
"""Test agent heartbeat endpoint."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Send heartbeat
response = await coordinator_client.post(f"/agents/{sample_agent_data['agent_id']}/heartbeat")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "heartbeat_at" in data
@pytest.mark.asyncio
async def test_heartbeat_nonexistent_agent(self, coordinator_client):
"""Test heartbeat for non-existent agent."""
response = await coordinator_client.post("/agents/nonexistent/heartbeat")
assert response.status_code == 404
class TestHealthCheck:
"""Test health check endpoints."""
@pytest.mark.asyncio
async def test_health_check(self, coordinator_client):
"""Test service health check."""
response = await coordinator_client.get("/health")
assert response.status_code == 200
data = response.json()
assert "status" in data