Add agent heartbeat and task queue management endpoints to coordinator API
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Has been cancelled
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled
Production Tests / Production Integration Tests (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
CLI Tests / test-cli (push) Has been cancelled
Documentation Validation / validate-docs (push) Has been cancelled
Documentation Validation / validate-policies-strict (push) Has been cancelled

- Add /agents/{agent_id}/heartbeat endpoint to receive and process agent heartbeats
- Add /tasks/queues endpoint to retrieve task queue sizes across all priorities
- Add /tasks/queues/{priority}/clear endpoint to clear specific priority queues
- Add /tasks/queues/stats endpoint to get detailed queue and distribution statistics
- Implement get_queue_sizes() method in TaskDistributor to return queue sizes by priority
- Implement clear_queue() method in TaskDistributor to drain
This commit is contained in:
aitbc
2026-05-07 18:49:17 +02:00
parent 5343d20f6d
commit a9e727dac8
8 changed files with 2331 additions and 0 deletions

View File

@@ -140,3 +140,36 @@ async def update_agent_status(agent_id: str, request: AgentStatusUpdate):
except Exception as e: except Exception as e:
logger.error(f"Error updating agent status: {e}") logger.error(f"Error updating agent status: {e}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
# Agent heartbeat
@router.post("/agents/{agent_id}/heartbeat")
async def agent_heartbeat(agent_id: str):
"""Receive heartbeat from agent"""
try:
if not state.agent_registry:
raise HTTPException(status_code=503, detail="Agent registry not available")
from ..routing.agent_discovery import AgentStatus
# Update heartbeat timestamp and mark as active
success = await state.agent_registry.update_agent_status(
agent_id,
AgentStatus.ACTIVE,
{}
)
if success:
return {
"status": "success",
"message": f"Heartbeat received from {agent_id}",
"agent_id": agent_id,
"heartbeat_at": datetime.now(timezone.utc).isoformat()
}
else:
raise HTTPException(status_code=404, detail="Agent not found")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing heartbeat: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -78,3 +78,78 @@ async def get_task_status():
except Exception as e: except Exception as e:
logger.error(f"Error getting task status: {e}") logger.error(f"Error getting task status: {e}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
# Task queue management
@router.get("/tasks/queues")
async def get_queue_sizes():
"""Get task queue sizes"""
try:
if not state.task_distributor:
raise HTTPException(status_code=503, detail="Task distributor not available")
queue_sizes = state.task_distributor.get_queue_sizes()
return {
"status": "success",
"queue_sizes": queue_sizes,
"timestamp": datetime.now(timezone.utc).isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting queue sizes: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/tasks/queues/{priority}/clear")
async def clear_queue(priority: str):
"""Clear a priority queue"""
try:
if not state.task_distributor:
raise HTTPException(status_code=503, detail="Task distributor not available")
from ..routing.load_balancer import TaskPriority
try:
priority_enum = TaskPriority(priority)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid priority: {priority}")
cleared_count = await state.task_distributor.clear_queue(priority_enum)
return {
"status": "success",
"message": f"Cleared {cleared_count} tasks from {priority} queue",
"priority": priority,
"cleared_count": cleared_count,
"timestamp": datetime.now(timezone.utc).isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error clearing queue: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/tasks/queues/stats")
async def get_queue_stats():
"""Get detailed queue statistics"""
try:
if not state.task_distributor:
raise HTTPException(status_code=503, detail="Task distributor not available")
queue_sizes = state.task_distributor.get_queue_sizes()
distribution_stats = state.task_distributor.get_distribution_stats()
return {
"status": "success",
"queue_sizes": queue_sizes,
"distribution_stats": distribution_stats,
"timestamp": datetime.now(timezone.utc).isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting queue stats: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -684,6 +684,29 @@ class TaskDistributor:
for priority, queue in self.priority_queues.items() for priority, queue in self.priority_queues.items()
} }
} }
def get_queue_sizes(self) -> Dict[str, int]:
"""Get sizes of all priority queues"""
return {
priority.value: queue.qsize()
for priority, queue in self.priority_queues.items()
}
async def clear_queue(self, priority: TaskPriority) -> int:
"""Clear all tasks from a priority queue"""
queue = self.priority_queues[priority]
cleared_count = 0
# Drain the queue
while not queue.empty():
try:
queue.get_nowait()
cleared_count += 1
except asyncio.QueueEmpty:
break
logger.info(f"Cleared {cleared_count} tasks from {priority.value} queue")
return cleared_count
# Example usage # Example usage
async def example_usage(): async def example_usage():

View File

@@ -0,0 +1,448 @@
# AITBC Agent Coordinator - API Reference
## Base URL
```
http://localhost:9001
```
## Authentication
Currently, the API does not require authentication. Future versions may support API key authentication and JWT tokens.
## Agent Management API
### Register Agent
Register a new agent with the coordinator.
**Endpoint:** `POST /agents/register`
**Request Body:**
```json
{
"agent_id": "string (required)",
"agent_type": "string (required)",
"capabilities": ["string"],
"services": ["string"],
"endpoints": {"string": "string"},
"metadata": {"string": "any"}
}
```
**Parameters:**
- `agent_id` (required): Unique identifier for the agent
- `agent_type` (required): Type of agent (worker, provider, consumer, general)
- `capabilities` (optional): Array of agent capabilities
- `services` (optional): Array of available services
- `endpoints` (optional): Object mapping service names to URLs
- `metadata` (optional): Additional metadata as key-value pairs
**Response (200 OK):**
```json
{
"status": "success",
"message": "Agent {agent_id} registered successfully",
"agent_id": "string",
"registered_at": "ISO 8601 timestamp"
}
```
**Response (422 Unprocessable Entity):**
```json
{
"detail": "Validation error message"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Failed to register agent: {error message}"
}
```
**Example:**
```bash
curl -X POST http://localhost:9001/agents/register \
-H "Content-Type: application/json" \
-d '{
"agent_id": "hermes-agent",
"agent_type": "worker",
"capabilities": ["data-processing", "analysis"],
"services": ["task-execution"],
"endpoints": {"http": "http://localhost:9002"},
"metadata": {"version": "1.0.0"}
}'
```
### Discover Agents
Discover agents based on filtering criteria.
**Endpoint:** `POST /agents/discover`
**Request Body:**
```json
{
"status": "string (optional)",
"agent_type": "string (optional)",
"capabilities": ["string (optional)"],
"services": ["string (optional)"]
}
```
**Parameters:**
- `status` (optional): Filter by agent status (active, inactive, busy, stale)
- `agent_type` (optional): Filter by agent type
- `capabilities` (optional): Filter by required capabilities
- `services` (optional): Filter by available services
**Response (200 OK):**
```json
{
"status": "success",
"query": {},
"agents": [
{
"agent_id": "string",
"agent_type": "string",
"status": "string",
"capabilities": ["string"],
"services": ["string"],
"endpoints": {"string": "string"},
"metadata": {"string": "any"},
"last_heartbeat": "ISO 8601 timestamp",
"registration_time": "ISO 8601 timestamp",
"load_metrics": {"string": "number"},
"health_score": 0.0-1.0,
"version": "string",
"tags": ["string"]
}
],
"count": 0,
"timestamp": "ISO 8601 timestamp"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Error discovering agents: {error message}"
}
```
**Example:**
```bash
curl -X POST http://localhost:9001/agents/discover \
-H "Content-Type: application/json" \
-d '{
"status": "active",
"agent_type": "worker"
}'
```
### Get Agent Information
Retrieve detailed information about a specific agent.
**Endpoint:** `GET /agents/{agent_id}`
**URL Parameters:**
- `agent_id` (required): The unique identifier of the agent
**Response (200 OK):**
```json
{
"status": "success",
"agent": {
"agent_id": "string",
"agent_type": "string",
"status": "string",
"capabilities": ["string"],
"services": ["string"],
"endpoints": {"string": "string"},
"metadata": {"string": "any"},
"last_heartbeat": "ISO 8601 timestamp",
"registration_time": "ISO 8601 timestamp",
"load_metrics": {"string": "number"},
"health_score": 0.0-1.0,
"version": "string",
"tags": ["string"]
},
"timestamp": "ISO 8601 timestamp"
}
```
**Response (404 Not Found):**
```json
{
"detail": "Agent not found"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Error getting agent: {error message}"
}
```
**Example:**
```bash
curl http://localhost:9001/agents/hermes-agent
```
### Update Agent Status
Update the status and load metrics of an agent.
**Endpoint:** `PUT /agents/{agent_id}/status`
**URL Parameters:**
- `agent_id` (required): The unique identifier of the agent
**Request Body:**
```json
{
"status": "string (required)",
"load_metrics": {
"active_connections": 0,
"pending_tasks": 0,
"cpu_usage": 0.0,
"memory_usage": 0.0
}
}
```
**Parameters:**
- `status` (required): New agent status (active, inactive, busy, stale)
- `load_metrics` (optional): Object containing load metrics
**Response (200 OK):**
```json
{
"status": "success",
"message": "Agent {agent_id} status updated",
"agent_id": "string",
"new_status": "string",
"updated_at": "ISO 8601 timestamp"
}
```
**Response (422 Unprocessable Entity):**
```json
{
"detail": "Validation error message"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Error updating agent status: {error message}"
}
```
**Example:**
```bash
curl -X PUT http://localhost:9001/agents/hermes-agent/status \
-H "Content-Type: application/json" \
-d '{
"status": "busy",
"load_metrics": {
"active_connections": 5,
"pending_tasks": 2
}
}'
```
## Task Management API
### Submit Task
Submit a task for distribution to agents.
**Endpoint:** `POST /tasks/submit`
**Request Body:**
```json
{
"task_data": {
"task_type": "string",
"model": "string",
"prompt": "string",
"parameters": {"string": "any"}
},
"priority": "string (required)",
"requirements": {
"capabilities": ["string"],
"agent_type": "string"
}
}
```
**Parameters:**
- `task_data` (required): Object containing task information
- `priority` (required): Task priority (urgent, critical, high, normal, low)
- `requirements` (optional): Object specifying agent requirements
**Response (200 OK):**
```json
{
"status": "success",
"message": "Task submitted successfully",
"task_id": "UUID string",
"priority": "string",
"submitted_at": "ISO 8601 timestamp"
}
```
**Response (400 Bad Request):**
```json
{
"detail": "Invalid priority: {priority}"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Task distributor not available"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Error submitting task: {error message}"
}
```
**Example:**
```bash
curl -X POST http://localhost:9001/tasks/submit \
-H "Content-Type: application/json" \
-d '{
"task_data": {
"model": "llama2",
"prompt": "test prompt"
},
"priority": "normal",
"requirements": {}
}'
```
### Get Task Status
Get task distribution statistics and load balancer metrics.
**Endpoint:** `GET /tasks/status`
**Response (200 OK):**
```json
{
"status": "success",
"stats": {
"tasks_distributed": 0,
"tasks_completed": 0,
"tasks_failed": 0,
"avg_distribution_time": 0.0,
"load_balancer_stats": {
"strategy": "least_connections",
"total_assignments": 0,
"successful_assignments": 0,
"failed_assignments": 0,
"success_rate": 0.0,
"active_agents": 0,
"agent_weights": 0,
"avg_agent_load": 0
},
"queue_sizes": {
"urgent": 0,
"critical": 0,
"high": 0,
"normal": 0,
"low": 0
}
},
"timestamp": "ISO 8601 timestamp"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Task distributor not available"
}
```
**Response (500 Internal Server Error):**
```json
{
"detail": "Error getting task status: {error message}"
}
```
**Example:**
```bash
curl http://localhost:9001/tasks/status
```
## Health Check
### Service Health
Check the health of the agent coordinator service.
**Endpoint:** `GET /health`
**Response (200 OK):**
```json
{
"status": "healthy",
"version": "string",
"timestamp": "ISO 8601 timestamp"
}
```
**Example:**
```bash
curl http://localhost:9001/health
```
## Error Codes
| Status Code | Description |
|-------------|-------------|
| 200 | Success |
| 400 | Bad Request - Invalid parameters |
| 404 | Not Found - Resource not found |
| 422 | Unprocessable Entity - Validation error |
| 500 | Internal Server Error |
| 503 | Service Unavailable - Component not ready |
## Rate Limiting
Currently, rate limiting is not implemented. Future versions may include rate limiting to prevent abuse.
## WebSocket Support
WebSocket support is planned for future releases to provide real-time updates on:
- Agent status changes
- Task distribution events
- Load balancer metrics updates
## OpenAPI Specification
The API follows OpenAPI 3.0 specification. An OpenAPI JSON schema can be generated from the FastAPI application by visiting:
```
http://localhost:9001/openapi.json
```
Interactive API documentation is available at:
```
http://localhost:9001/docs
```

View File

@@ -0,0 +1,288 @@
# AITBC Agent Coordinator - Architecture Documentation
## System Overview
The AITBC Agent Coordinator is a distributed task distribution system that manages AI agents, coordinates task assignment, and provides load balancing across multiple agent instances. The system uses Redis for persistence and FastAPI for REST API endpoints.
## Service Location
**Actual Service:** `/opt/aitbc/apps/agent-coordinator/src/app/`
**Port:** 9001
**Systemd Service:** `aitbc-agent-coordinator.service`
**DO NOT USE:** `/opt/aitbc/apps/agent-services/agent-coordinator/src/coordinator.py` (this is an older/incorrect implementation)
## Core Components
### 1. Agent Registry (`agent_discovery.py`)
The Agent Registry is the central component for managing agent lifecycle and discovery.
**Key Features:**
- Redis-backed persistence for agent data
- Agent registration and deregistration
- Agent discovery with filtering (by type, status, capabilities)
- Health score calculation based on heartbeat frequency
- Load metrics tracking (active connections, pending tasks)
**Data Model:**
- Agent data stored as Redis hashes: `agent:{agent_id}`
- Active agents indexed in Redis set: `agents:active`
- Agent status tracked: active, inactive, busy, stale
**Key Classes:**
- `AgentInfo` - Dataclass representing agent information
- `AgentRegistry` - Main registry class with Redis integration
- `AgentDiscoveryService` - Service for discovering agents with criteria
### 2. Load Balancer (`load_balancer.py`)
The Load Balancer distributes tasks across eligible agents using configurable strategies.
**Load Balancing Strategies:**
- `LEAST_CONNECTIONS` - Selects agent with fewest active connections (default)
- `ROUND_ROBIN` - Distributes tasks in circular order
- `WEIGHTED_ROUND_ROBIN` - Based on agent performance weights
- `RESOURCE_BASED` - Based on CPU/memory metrics
- `GEOGRAPHIC` - Based on agent location
- `RANDOM` - For testing purposes
**Key Classes:**
- `LoadBalancer` - Main load balancer class
- `TaskDistributor` - Manages task priority queues and distribution
- `TaskPriority` - Enum for task priorities (urgent, critical, high, normal, low)
**Task Distribution Flow:**
1. Task submitted to `TaskDistributor.submit_task()`
2. Task placed in appropriate priority queue
3. Background distribution loop processes queues
4. Load balancer finds eligible agents via `find_eligible_agents()`
5. Agent selected using configured strategy
6. Task assigned and agent metrics updated
### 3. REST API Routers
#### Agent Management (`routers/agents.py`)
**Endpoints:**
- `POST /agents/register` - Register new agent
- `POST /agents/discover` - Discover agents with filtering
- `GET /agents/{agent_id}` - Get agent information
- `PUT /agents/{agent_id}/status` - Update agent status
#### Task Management (`routers/tasks.py`)
**Endpoints:**
- `POST /tasks/submit` - Submit task for distribution
- `GET /tasks/status` - Get task distribution statistics
## Service Initialization
The service initializes in `lifespan.py` during FastAPI startup:
```python
async def lifespan(app: FastAPI):
# Create AgentRegistry with Redis backing
state.agent_registry = AgentRegistry()
await state.agent_registry.start()
# Create LoadBalancer with registry
state.load_balancer = LoadBalancer(state.agent_registry)
state.load_balancer.set_strategy(LoadBalancingStrategy.LEAST_CONNECTIONS)
# Create TaskDistributor
state.task_distributor = TaskDistributor(state.load_balancer)
# Start background tasks
asyncio.create_task(state.task_distributor.start_distribution())
asyncio.create_task(state.message_processor.start_processing())
```
## Redis Persistence Model
### Agent Data Structure
**Hash Key:** `agent:{agent_id}`
**Fields:**
- `agent_id` - Unique identifier
- `agent_type` - Type (worker, provider, consumer, general)
- `status` - Current status (active, inactive, busy, stale)
- `capabilities` - JSON array of capabilities
- `services` - JSON array of available services
- `endpoints` - JSON object of service endpoints
- `metadata` - JSON object of additional metadata
- `last_heartbeat` - Timestamp of last heartbeat
- `registration_time` - Timestamp of registration
- `load_metrics` - JSON object of load metrics
- `health_score` - Calculated health score (0.0-1.0)
- `version` - Agent version
- `tags` - JSON array of tags
### Indexes
**Set Key:** `agents:active` - Contains IDs of all active agents
## Agent Lifecycle
### Registration
1. Agent sends POST /agents/register with agent information
2. Coordinator validates agent data
3. Agent info stored in Redis
4. Agent added to active agents set
5. Success response returned
### Heartbeat
1. Agent sends heartbeat (not yet implemented as endpoint)
2. Last heartbeat timestamp updated
3. Health score recalculated
4. Stale agents marked as inactive (configurable timeout)
### Status Update
1. Agent sends PUT /agents/{agent_id}/status
2. Status and load metrics updated
3. Load balancer uses updated metrics for task assignment
### Deregistration
1. Agent marked as inactive
2. Removed from active agents set
3. Data retained in Redis for historical purposes
## Task Distribution Flow
### Task Submission
```mermaid
sequenceDiagram
participant Client
participant Coordinator
participant LoadBalancer
participant AgentRegistry
participant Redis
participant Agent
Client->>Coordinator: POST /tasks/submit
Coordinator->>TaskDistributor: submit_task()
TaskDistributor->>TaskDistributor: add to priority queue
TaskDistributor->>LoadBalancer: find_eligible_agents()
LoadBalancer->>AgentRegistry: discover_agents(criteria)
AgentRegistry->>Redis: query active agents
Redis-->>AgentRegistry: agent data
AgentRegistry-->>LoadBalancer: eligible agents
LoadBalancer->>LoadBalancer: select_agent(strategy)
LoadBalancer->>Redis: update agent metrics
LoadBalancer-->>TaskDistributor: selected agent
TaskDistributor->>Agent: assign task
Coordinator-->>Client: task submitted
```
### Load Balancing
The load balancer uses the following criteria to select agents:
1. Agent status must be "active"
2. Agent must have required capabilities
3. Agent type must match requirements
4. Health score must be above threshold
5. Load metrics must be within limits
## Configuration
### Environment Variables
- `AITBC_REDIS_URL` - Redis connection URL (default: redis://localhost:6379)
- `AITBC_COORDINATOR_PORT` - Coordinator service port (default: 9001)
- `AITBC_LOG_LEVEL` - Logging level (default: INFO)
### Load Balancing Configuration
- Default strategy: LEAST_CONNECTIONS
- Strategy can be changed via LoadBalancer.set_strategy()
- Priority queues: urgent, critical, high, normal, low
### Health Check Configuration
- Heartbeat timeout: 300 seconds (configurable)
- Health score threshold: 0.5 (configurable)
- Stale agent detection: enabled by default
## Monitoring
### Metrics Available
- Active agents count
- Tasks distributed/completed/failed
- Average distribution time
- Load balancer success rate
- Agent load distribution
- Queue sizes per priority
### Monitoring Endpoints
- `GET /tasks/status` - Task distribution statistics
- `GET /health` - Service health check
- Future: Prometheus metrics endpoint
## Security
### Authentication
- API key authentication via middleware (optional)
- JWT token support (optional)
- Role-based access control (optional)
### Rate Limiting
- Not currently implemented
- Can be added via FastAPI middleware
## Scalability
### Horizontal Scaling
- Multiple coordinator instances can run behind a load balancer
- Redis provides shared state across instances
- Agent registry is distributed via Redis
### Performance Considerations
- Redis operations are O(1) or O(log N)
- Task distribution is asynchronous
- Priority queues prevent starvation
- Load balancing strategies can be tuned
## Troubleshooting
### Common Issues
**No active agents:**
- Check Redis connection
- Verify agents are registered
- Check agent status (may be inactive/stale)
**Tasks not distributing:**
- Check task distributor is running
- Verify eligible agents exist
- Check load balancer strategy
- Review task requirements
**Agent not discovered:**
- Verify agent registration succeeded
- Check agent status is active
- Verify capabilities match query
- Check Redis connection
### Debug Commands
```bash
# Check service status
systemctl status aitbc-agent-coordinator.service
# View logs
journalctl -u aitbc-agent-coordinator.service -f
# Check Redis
redis-cli
> KEYS agent:*
> SMEMBERS agents:active
# Test API
curl http://localhost:9001/health
curl http://localhost:9001/tasks/status
```
## Future Enhancements
Planned improvements (see Phase 3):
- Agent heartbeat mechanism
- Additional load balancing strategies
- Task priority queue management
- Agent metrics dashboard
- WebSocket support for real-time updates

View File

@@ -0,0 +1,471 @@
# AITBC Agent Coordinator - CLI Reference
The AITBC CLI provides commands for interacting with the Agent Coordinator service for agent management and task distribution.
## Agent SDK Commands
### Register Agent
Register a new agent with the coordinator service.
**Command:**
```bash
aitbc-cli agent sdk register --agent-id <ID> [OPTIONS]
```
**Required Arguments:**
- `--agent-id`: Unique identifier for the agent
**Optional Arguments:**
- `--type`: Agent type (provider, consumer, general, worker) - default: worker
- `--capabilities`: Comma-separated list of agent capabilities
- `--services`: Comma-separated list of available services
- `--endpoints`: JSON string of service endpoints
- `--metadata`: JSON string of metadata
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
# Basic registration
aitbc-cli agent sdk register --agent-id hermes-agent --type worker
# Full registration with all parameters
aitbc-cli agent sdk register \
--agent-id hermes-agent \
--type worker \
--capabilities "data-processing,analysis,debugging" \
--services "task-execution,coordination" \
--endpoints '{"http":"http://localhost:9002"}' \
--metadata '{"version":"1.0.0","owner":"aitbc"}'
```
**Output:**
```
Registering agent hermes-agent with coordinator at http://localhost:9001...
Agent registered successfully
Registration:
Status: success
Message: Agent hermes-agent registered successfully
Agent Id: hermes-agent
Registered At: 2026-05-07T16:26:55.464178+00:00
```
### List Agents
Discover and list agents from the coordinator.
**Command:**
```bash
aitbc-cli agent sdk list [OPTIONS]
```
**Optional Arguments:**
- `--status`: Filter by agent status (active, inactive, busy, stale)
- `--agent-type`: Filter by agent type
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
# List all agents
aitbc-cli agent sdk list
# List only active agents
aitbc-cli agent sdk list --status active
# List worker type agents
aitbc-cli agent sdk list --agent-type worker
```
**Output:**
```
Discovering agents from coordinator at http://localhost:9001...
Found 2 agents
Agents:
Status: success
Query: {}
Agents:
- Agent details...
Count: 2
Timestamp: 2026-05-07T16:39:34.254450+00:00
```
### Get Agent Status
Retrieve detailed information about a specific agent.
**Command:**
```bash
aitbc-cli agent sdk status --agent-id <ID> [OPTIONS]
```
**Required Arguments:**
- `--agent-id`: Unique identifier of the agent
**Optional Arguments:**
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
aitbc-cli agent sdk status --agent-id hermes-agent
```
**Output:**
```
Getting agent info for hermes-agent from coordinator at http://localhost:9001...
Agent info retrieved
Agent:
Status: success
Agent: Agent details...
Timestamp: 2026-05-07T16:39:42.744729+00:00
```
### Update Agent Status
Update the status and load metrics of an agent.
**Command:**
```bash
aitbc-cli agent sdk update-status --agent-id <ID> --status <STATUS> [OPTIONS]
```
**Required Arguments:**
- `--agent-id`: Unique identifier of the agent
- `--status`: New status (active, inactive, busy, stale)
**Optional Arguments:**
- `--load-metrics`: JSON string of load metrics
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
# Mark agent as busy
aitbc-cli agent sdk update-status --agent-id hermes-agent --status busy
# Update status with load metrics
aitbc-cli agent sdk update-status \
--agent-id hermes-agent \
--status busy \
--load-metrics '{"active_connections":5,"pending_tasks":2}'
```
**Output:**
```
Updating agent hermes-agent status to busy...
Agent status updated successfully
Status Update:
Status: success
Message: Agent hermes-agent status updated
Agent Id: hermes-agent
New Status: busy
Updated At: 2026-05-07T16:40:03.536877+00:00
```
## AI Commands
### Submit AI Job
Submit an AI job to the coordinator for distribution.
**Command:**
```bash
aitbc-cli ai submit --wallet <WALLET> --type <TYPE> --prompt <PROMPT> [OPTIONS]
```
**Required Arguments:**
- `--wallet`: Wallet name for the transaction
- `--type`: Job type or model name
- `--prompt`: Prompt for the AI job
**Optional Arguments:**
- `--payment`: Payment amount
- `--password`: Wallet password
- `--password-file`: Path to password file
- `--chain-id`: Chain ID
- `--rpc-url`: RPC URL
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
aitbc-cli ai submit \
--wallet openclaw-trainee \
--type llama2 \
--prompt "Explain quantum computing"
```
**Output:**
```
Submitting AI job to http://localhost:9001...
AI job submitted successfully
Job: Job details...
```
### Get Task Distribution Statistics
Get task distribution statistics from the agent coordinator.
**Command:**
```bash
aitbc-cli ai distribution-stats [OPTIONS]
```
**Optional Arguments:**
- `--coordinator-url`: Coordinator URL - default: http://localhost:9001
**Examples:**
```bash
aitbc-cli ai distribution-stats
```
**Output:**
```
Getting task distribution statistics from http://localhost:9001...
Task distribution statistics:
Status: success
Stats:
tasks_distributed: 1
tasks_completed: 1
tasks_failed: 0
load_balancer_stats:
strategy: least_connections
active_agents: 1
total_assignments: 1
...
Timestamp: 2026-05-07T16:38:40.722733+00:00
```
### AI Service Status
Check the status of AI services (agent coordinator + blockchain AI).
**Command:**
```bash
aitbc-cli ai status [OPTIONS]
```
**Optional Arguments:**
- `--coordinator-url`: Coordinator URL
- `--rpc-url`: RPC URL
- `--chain-id`: Chain ID
**Examples:**
```bash
aitbc-cli ai status
```
**Output:**
```
Checking Agent Coordinator at http://localhost:9001...
Agent Coordinator: healthy (v1.0.0)
Checking Blockchain AI stats at http://localhost:8006...
Blockchain AI Stats: Available
Overall Status: operational
Agent Coordinator: Operational
Blockchain AI: Operational
```
## Common Options
### Output Format
All CLI commands support different output formats:
```bash
aitbc-cli --output json agent sdk list
aitbc-cli --output yaml agent sdk status --agent-id hermes-agent
aitbc-cli --output table ai distribution-stats
```
### Verbose Mode
Enable verbose output for debugging:
```bash
aitbc-cli --verbose agent sdk register --agent-id test-agent
```
### Debug Mode
Enable debug mode for detailed troubleshooting:
```bash
aitbc-cli --debug agent sdk list
```
## Workflows
### Register and Verify Agent
```bash
# Register agent
aitbc-cli agent sdk register \
--agent-id my-agent \
--type worker \
--capabilities "data-processing,analysis"
# Verify registration
aitbc-cli agent sdk status --agent-id my-agent
# Check if agent appears in discovery
aitbc-cli agent sdk list --status active
```
### Submit and Monitor Task
```bash
# Submit task
aitbc-cli ai submit \
--wallet openclaw-trainee \
--type llama2 \
--prompt "test prompt"
# Check distribution stats
aitbc-cli ai distribution-stats
# Monitor active agents
aitbc-cli agent sdk list --status active
```
### Update Agent Load
```bash
# Mark agent as busy
aitbc-cli agent sdk update-status \
--agent-id my-agent \
--status busy \
--load-metrics '{"active_connections":10,"pending_tasks":5}'
# Mark agent as available again
aitbc-cli agent sdk update-status \
--agent-id my-agent \
--status active \
--load-metrics '{"active_connections":0,"pending_tasks":0}'
```
## Error Handling
### Common Errors
**Agent not found:**
```
Agent not found: my-agent
```
Solution: Verify the agent ID is correct and the agent is registered.
**Coordinator unavailable:**
```
Error registering agent: Connection refused
```
Solution: Check that the coordinator service is running on port 9001.
**Invalid parameters:**
```
Error: --agent-id and --status are required
```
Solution: Provide all required arguments.
### Troubleshooting
**Check service status:**
```bash
systemctl status aitbc-agent-coordinator.service
```
**View service logs:**
```bash
journalctl -u aitbc-agent-coordinator.service -f
```
**Test coordinator health:**
```bash
curl http://localhost:9001/health
```
**Test coordinator API directly:**
```bash
curl http://localhost:9001/tasks/status
```
## Environment Variables
The CLI respects the following environment variables:
- `AITBC_COORDINATOR_URL`: Default coordinator URL
- `AITBC_RPC_URL`: Default RPC URL
- `AITBC_CHAIN_ID`: Default chain ID
Example:
```bash
export AITBC_COORDINATOR_URL=http://localhost:9001
aitbc-cli agent sdk list
```
## Configuration
The CLI configuration is stored in:
- `~/.aitbc/config.json` - User-specific configuration
- `/etc/aitbc/config.json` - System-wide configuration
Configuration file format:
```json
{
"coordinator_url": "http://localhost:9001",
"rpc_url": "http://localhost:8006",
"chain_id": "ait-mainnet",
"default_wallet": "openclaw-trainee"
}
```
## Integration with Other CLI Commands
The agent coordinator CLI integrates with other AITBC CLI commands:
- `aitbc-cli wallet` - For wallet management
- `aitbc-cli blockchain` - For blockchain operations
- `aitbc-cli ai` - For AI job submission and monitoring
- `aitbc-cli system` - For system status and operations
## Advanced Usage
### Batch Agent Registration
```bash
#!/bin/bash
# Register multiple agents
for i in {1..5}; do
aitbc-cli agent sdk register \
--agent-id "agent-$i" \
--type worker \
--capabilities "data-processing"
done
```
### Monitoring Script
```bash
#!/bin/bash
# Monitor agent coordinator
while true; do
clear
echo "=== Agent Coordinator Status ==="
aitbc-cli ai distribution-stats
echo ""
echo "=== Active Agents ==="
aitbc-cli agent sdk list --status active
sleep 5
done
```
### Load Testing
```bash
#!/bin/bash
# Submit multiple tasks
for i in {1..10}; do
aitbc-cli ai submit \
--wallet openclaw-trainee \
--type llama2 \
--prompt "Test task $i" &
done
wait
```

View File

@@ -0,0 +1,645 @@
# AITBC Agent Coordinator - Operator Guide
This guide provides operators with the knowledge to deploy, configure, monitor, and troubleshoot the AITBC Agent Coordinator service.
## Service Deployment
### Prerequisites
- Redis server running on localhost or remote host
- Python 3.13+
- Systemd (for service management)
- AITBC blockchain node (optional, for blockchain integration)
### Installation
1. **Install dependencies:**
```bash
cd /opt/aitbc/apps/agent-coordinator
pip install -r requirements.txt
```
2. **Configure environment:**
```bash
# Edit /etc/aitbc/.env
export AITBC_REDIS_URL=redis://localhost:6379
export AITBC_COORDINATOR_PORT=9001
export AITBC_LOG_LEVEL=INFO
```
3. **Start Redis:**
```bash
systemctl start redis
systemctl enable redis
```
4. **Start coordinator service:**
```bash
systemctl start aitbc-agent-coordinator.service
systemctl enable aitbc-agent-coordinator.service
```
### Service Configuration
**Service file location:** `/etc/systemd/system/aitbc-agent-coordinator.service`
**Key configuration parameters:**
- `PYTHONPATH=apps/agent-coordinator/src` - Python module path
- `uvicorn app.main:app` - FastAPI application entry point
- `--host 0.0.0.0` - Bind to all interfaces
- `--port 9001` - Service port
### Redis Configuration
**Connection URL:** `redis://localhost:6379/0`
**Redis data persistence:**
- Agent data: `agent:{agent_id}` (hash)
- Active agents: `agents:active` (set)
- Load metrics: Stored in agent hash
**Redis monitoring:**
```bash
redis-cli
> KEYS agent:*
> SMEMBERS agents:active
> HGETALL agent:hermes-agent
```
## Agent Registration Procedures
### Manual Registration via CLI
**Basic registration:**
```bash
aitbc-cli agent sdk register \
--agent-id my-agent \
--type worker \
--coordinator-url http://localhost:9001
```
**Full registration with capabilities:**
```bash
aitbc-cli agent sdk register \
--agent-id my-agent \
--type worker \
--capabilities "data-processing,analysis,debugging" \
--services "task-execution,coordination" \
--endpoints '{"http":"http://my-host:9002"}' \
--metadata '{"version":"1.0.0","owner":"my-team"}' \
--coordinator-url http://localhost:9001
```
### Automated Registration Script
```bash
#!/bin/bash
# register_agents.sh
COORDINATOR_URL="http://localhost:9001"
register_agent() {
local agent_id=$1
local agent_type=$2
local capabilities=$3
aitbc-cli agent sdk register \
--agent-id "$agent_id" \
--type "$agent_type" \
--capabilities "$capabilities" \
--coordinator-url "$COORDINATOR_URL"
}
# Register agents
register_agent "worker-1" "worker" "data-processing,analysis"
register_agent "worker-2" "worker" "data-processing,analysis"
register_agent "worker-3" "worker" "inference,training"
```
### Cross-Node Registration
Register agents on multiple nodes for distributed task distribution:
```bash
# Register agent on aitbc1
curl -X POST http://aitbc1:9001/agents/register \
-H "Content-Type: application/json" \
-d '{
"agent_id": "aitbc1-worker",
"agent_type": "worker",
"capabilities": ["data-processing"],
"endpoints": {"http": "http://aitbc1:9002"}
}'
# Register agent on aitbc2
curl -X POST http://aitbc2:9001/agents/register \
-H "Content-Type: application/json" \
-d '{
"agent_id": "aitbc2-worker",
"agent_type": "worker",
"capabilities": ["inference"],
"endpoints": {"http": "http://aitbc2:9002"}
}'
```
## Monitoring and Troubleshooting
### Health Checks
**Service health:**
```bash
curl http://localhost:9001/health
```
**Expected response:**
```json
{
"status": "healthy",
"version": "1.0.0",
"timestamp": "2026-05-07T16:00:00.000000+00:00"
}
```
**Task distribution stats:**
```bash
curl http://localhost:9001/tasks/status
```
**CLI health check:**
```bash
aitbc-cli ai status
```
### Service Status
**Check systemd service:**
```bash
systemctl status aitbc-agent-coordinator.service
```
**View service logs:**
```bash
journalctl -u aitbc-agent-coordinator.service -f
```
**View recent logs:**
```bash
journalctl -u aitbc-agent-coordinator.service -n 100
```
### Agent Monitoring
**List all agents:**
```bash
aitbc-cli agent sdk list
```
**List active agents only:**
```bash
aitbc-cli agent sdk list --status active
```
**Check specific agent:**
```bash
aitbc-cli agent sdk status --agent-id my-agent
```
**Monitor distribution stats:**
```bash
aitbc-cli ai distribution-stats
```
### Redis Monitoring
**Check Redis connection:**
```bash
redis-cli ping
```
**View all registered agents:**
```bash
redis-cli
> KEYS agent:*
```
**View active agents:**
```bash
redis-cli
> SMEMBERS agents:active
```
**View agent details:**
```bash
redis-cli
> HGETALL agent:my-agent
```
**Monitor Redis memory:**
```bash
redis-cli INFO memory
```
### Common Issues and Solutions
#### Service won't start
**Symptoms:**
```
Failed to start aitbc-agent-coordinator.service
```
**Solutions:**
1. Check Redis is running:
```bash
systemctl status redis
```
2. Check Redis connection:
```bash
redis-cli ping
```
3. Check service logs:
```bash
journalctl -u aitbc-agent-coordinator.service -n 50
```
4. Verify PYTHONPATH:
```bash
echo $PYTHONPATH
# Should include: /opt/aitbc/apps/agent-coordinator/src
```
#### No agents discovered
**Symptoms:**
```bash
aitbc-cli agent sdk list
Found 0 agents
```
**Solutions:**
1. Check if agents are registered:
```bash
redis-cli SMEMBERS agents:active
```
2. Register an agent:
```bash
aitbc-cli agent sdk register --agent-id test-agent --type worker
```
3. Check agent status:
```bash
aitbc-cli agent sdk status --agent-id test-agent
```
#### Tasks not distributing
**Symptoms:**
- Tasks submitted but not assigned
- `tasks_distributed` count not increasing
**Solutions:**
1. Check for active agents:
```bash
aitbc-cli agent sdk list --status active
```
2. Check task distributor status:
```bash
curl http://localhost:9001/tasks/status
```
3. Verify agent capabilities match task requirements
4. Check load balancer strategy
5. Review service logs for errors
#### Agent marked as stale
**Symptoms:**
- Agent status changes from active to stale
- Agent not receiving new tasks
**Solutions:**
1. Update agent status:
```bash
aitbc-cli agent sdk update-status --agent-id my-agent --status active
```
2. Check heartbeat mechanism (if implemented)
3. Verify agent is still running
4. Check network connectivity
#### Redis connection errors
**Symptoms:**
```
Error connecting to Redis
```
**Solutions:**
1. Check Redis service:
```bash
systemctl status redis
```
2. Restart Redis:
```bash
systemctl restart redis
```
3. Check Redis configuration:
```bash
redis-cli INFO server
```
4. Verify Redis URL in environment:
```bash
echo $AITBC_REDIS_URL
```
## Performance Tuning
### Load Balancing Strategies
**Current default:** `LEAST_CONNECTIONS`
**Available strategies:**
- `LEAST_CONNECTIONS` - Fewest active connections
- `ROUND_ROBIN` - Circular distribution
- `WEIGHTED_ROUND_ROBIN` - Performance-based
- `RESOURCE_BASED` - CPU/memory metrics
- `GEOGRAPHIC` - Location-based
- `RANDOM` - Random selection (testing)
**Changing strategy:** (requires code modification in `lifespan.py`)
### Priority Queue Configuration
**Priority levels:**
1. urgent
2. critical
3. high
4. normal
5. low
**Queue sizing:** Configured in `TaskDistributor` class
**Monitoring queue sizes:**
```bash
curl http://localhost:9001/tasks/status | jq .stats.queue_sizes
```
### Resource Limits
**Redis memory limits:**
```bash
redis-cli CONFIG SET maxmemory 1gb
redis-cli CONFIG SET maxmemory-policy allkeys-lru
```
**Service memory limits:** (configure in systemd service file)
```
MemoryLimit=2G
MemorySwap=2G
```
**Connection limits:** (configure in uvicorn startup)
```
--limit-concurrency 100
```
## Security Considerations
### Network Security
**Bind to specific interface:**
```bash
# In service file, change --host 0.0.0.0 to --host 127.0.0.1 for local only
--host 127.0.0.1
```
**Use firewall:**
```bash
# Allow only specific IPs
ufw allow from 192.168.1.0/24 to any port 9001
```
### Authentication
**Future implementation:** API key authentication and JWT tokens
**Current status:** No authentication (open access)
**Recommendation:** Deploy behind reverse proxy with authentication
### Data Encryption
**Redis encryption:** Configure Redis with TLS
**API encryption:** Use HTTPS in production
## Backup and Recovery
### Redis Backup
**Manual backup:**
```bash
redis-cli SAVE
cp /var/lib/redis/dump.rdb /backup/redis-$(date +%Y%m%d).rdb
```
**Automated backup:**
```bash
#!/bin/bash
# backup_redis.sh
redis-cli BGSAVE
sleep 5
cp /var/lib/redis/dump.rdb /backup/redis-$(date +%Y%m%d-%H%M%S).rdb
# Keep last 7 days
find /backup -name "redis-*.rdb" -mtime +7 -delete
```
**Restore from backup:**
```bash
systemctl stop redis
cp /backup/redis-20260507.rdb /var/lib/redis/dump.rdb
chown redis:redis /var/lib/redis/dump.rdb
systemctl start redis
```
### Service Configuration Backup
**Backup service file:**
```bash
cp /etc/systemd/system/aitbc-agent-coordinator.service /backup/
```
**Backup environment:**
```bash
cp /etc/aitbc/.env /backup/
```
## Scaling
### Horizontal Scaling
**Multiple coordinator instances:**
1. Deploy multiple coordinator instances behind load balancer
2. Use shared Redis instance
3. Configure consistent PYTHONPATH across instances
**Load balancer configuration:**
```nginx
upstream coordinator {
server localhost:9001;
server localhost:9002;
server localhost:9003;
}
server {
listen 80;
location / {
proxy_pass http://coordinator;
}
}
```
### Redis Clustering
**For high availability:**
- Use Redis Sentinel for failover
- Use Redis Cluster for sharding
- Configure coordinator to use Redis Sentinel
## Maintenance
### Regular Maintenance Tasks
**Daily:**
- Monitor service health
- Check task distribution stats
- Review error logs
**Weekly:**
- Backup Redis data
- Review agent registrations
- Clean up stale agents
**Monthly:**
- Review performance metrics
- Update software dependencies
- Audit security configurations
### Agent Cleanup
**Remove inactive agents:**
```bash
redis-cli
> SREM agents:active "stale-agent-id"
> DEL agent:stale-agent-id
```
**Bulk cleanup script:**
```bash
#!/bin/bash
# cleanup_stale_agents.sh
redis-cli --scan --pattern "agent:*" | while read key; do
status=$(redis-cli HGET "$key" status)
if [ "$status" = "stale" ]; then
agent_id=$(echo "$key" | cut -d: -f2)
redis-cli SREM agents:active "$agent_id"
redis-cli DEL "$key"
echo "Removed stale agent: $agent_id"
fi
done
```
### Service Restart
**Graceful restart:**
```bash
systemctl reload aitbc-agent-coordinator.service
```
**Force restart:**
```bash
systemctl restart aitbc-agent-coordinator.service
```
**Rolling restart (multiple instances):**
```bash
for i in {1..3}; do
systemctl restart aitbc-agent-coordinator@$i.service
sleep 10
done
```
## Alerting
### Recommended Alerts
**Service alerts:**
- Service down (health check fails)
- High error rate (> 5%)
- High response time (> 5s)
**Agent alerts:**
- No active agents
- Agent registration failures
- Agent stale count increasing
**Task alerts:**
- Task queue backlog (> 100 tasks)
- Task failure rate (> 10%)
- Distribution time increasing
**Redis alerts:**
- Redis connection failures
- Redis memory usage > 80%
- Redis latency > 100ms
### Monitoring Tools
**Prometheus metrics:** (future implementation)
- Export metrics at `/metrics` endpoint
- Use Grafana for visualization
**Log aggregation:**
- Send logs to ELK stack
- Use Loki for log storage
- Configure alerting based on log patterns
## Troubleshooting Checklist
When issues occur, check in this order:
1. **Service status**
- [ ] Service running?
- [ ] Health check passing?
- [ ] Logs showing errors?
2. **Redis status**
- [ ] Redis running?
- [ ] Connection successful?
- [ ] Memory usage normal?
3. **Agent status**
- [ ] Agents registered?
- [ ] Agents active?
- [ ] Agent capabilities valid?
4. **Task status**
- [ ] Tasks submitting?
- [ ] Tasks distributing?
- [ ] Tasks completing?
5. **Network**
- [ ] Connectivity to Redis?
- [ ] Connectivity to agents?
- [ ] Firewall rules correct?
6. **Configuration**
- [ ] Environment variables set?
- [ ] PYTHONPATH correct?
- [ ] Port available?

View File

@@ -0,0 +1,348 @@
"""Integration tests for AITBC Agent Coordinator service."""
import pytest
import asyncio
import httpx
from typing import Dict, Any
@pytest.fixture
async def coordinator_client():
"""Create an HTTP client for coordinator API."""
async with httpx.AsyncClient(base_url="http://localhost:9001", timeout=30) as client:
yield client
@pytest.fixture
def sample_agent_data():
"""Sample agent registration data."""
return {
"agent_id": "test-integration-agent",
"agent_type": "worker",
"capabilities": ["data-processing", "analysis"],
"services": ["task-execution"],
"endpoints": {"http": "http://localhost:9002"},
"metadata": {"version": "1.0.0", "test": True}
}
@pytest.fixture
def sample_task_data():
"""Sample task submission data."""
return {
"task_data": {
"model": "llama2",
"prompt": "test prompt"
},
"priority": "normal",
"requirements": {}
}
class TestAgentRegistration:
"""Test agent registration endpoints."""
@pytest.mark.asyncio
async def test_register_agent_success(self, coordinator_client, sample_agent_data):
"""Test successful agent registration."""
response = await coordinator_client.post("/agents/register", json=sample_agent_data)
assert response.status_code in (200, 201)
data = response.json()
assert data["status"] == "success"
assert data["agent_id"] == sample_agent_data["agent_id"]
@pytest.mark.asyncio
async def test_register_agent_duplicate(self, coordinator_client, sample_agent_data):
"""Test registering duplicate agent."""
# Register first time
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Try to register again
response = await coordinator_client.post("/agents/register", json=sample_agent_data)
# Should succeed (update existing) or fail depending on implementation
assert response.status_code in (200, 201, 409)
@pytest.mark.asyncio
async def test_register_agent_invalid_data(self, coordinator_client):
"""Test registration with invalid data."""
invalid_data = {"agent_id": "invalid"} # Missing required fields
response = await coordinator_client.post("/agents/register", json=invalid_data)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_register_agent_missing_agent_id(self, coordinator_client):
"""Test registration without agent ID."""
invalid_data = {"agent_type": "worker"}
response = await coordinator_client.post("/agents/register", json=invalid_data)
assert response.status_code == 422
class TestAgentDiscovery:
"""Test agent discovery endpoints."""
@pytest.mark.asyncio
async def test_discover_all_agents(self, coordinator_client, sample_agent_data):
"""Test discovering all agents."""
# Register an agent first
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Discover all agents
response = await coordinator_client.post("/agents/discover", json={})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "agents" in data
assert "count" in data
@pytest.mark.asyncio
async def test_discover_by_status(self, coordinator_client, sample_agent_data):
"""Test discovering agents by status."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Discover active agents
response = await coordinator_client.post("/agents/discover", json={"status": "active"})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
@pytest.mark.asyncio
async def test_discover_by_type(self, coordinator_client, sample_agent_data):
"""Test discovering agents by type."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Discover worker agents
response = await coordinator_client.post("/agents/discover", json={"agent_type": "worker"})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
@pytest.mark.asyncio
async def test_discover_empty_result(self, coordinator_client):
"""Test discovering agents with no matches."""
# Search for non-existent type
response = await coordinator_client.post("/agents/discover", json={"agent_type": "nonexistent"})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["count"] == 0
class TestAgentStatus:
"""Test agent status endpoints."""
@pytest.mark.asyncio
async def test_get_agent_info(self, coordinator_client, sample_agent_data):
"""Test getting agent information."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Get agent info
response = await coordinator_client.get(f"/agents/{sample_agent_data['agent_id']}")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["agent"]["agent_id"] == sample_agent_data["agent_id"]
@pytest.mark.asyncio
async def test_get_agent_not_found(self, coordinator_client):
"""Test getting non-existent agent."""
response = await coordinator_client.get("/agents/nonexistent-agent")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_update_agent_status(self, coordinator_client, sample_agent_data):
"""Test updating agent status."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Update status
response = await coordinator_client.put(
f"/agents/{sample_agent_data['agent_id']}/status",
json={"status": "busy", "load_metrics": {"active_connections": 5}}
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["new_status"] == "busy"
@pytest.mark.asyncio
async def test_update_agent_status_invalid(self, coordinator_client, sample_agent_data):
"""Test updating with invalid status."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Try invalid status
response = await coordinator_client.put(
f"/agents/{sample_agent_data['agent_id']}/status",
json={"status": "invalid_status"}
)
assert response.status_code in (400, 422)
class TestTaskDistribution:
"""Test task distribution endpoints."""
@pytest.mark.asyncio
async def test_submit_task_success(self, coordinator_client, sample_task_data):
"""Test successful task submission."""
response = await coordinator_client.post("/tasks/submit", json=sample_task_data)
assert response.status_code in (200, 201)
data = response.json()
assert data["status"] == "success"
assert "task_id" in data
@pytest.mark.asyncio
async def test_submit_task_invalid_priority(self, coordinator_client):
"""Test task submission with invalid priority."""
invalid_data = {
"task_data": {"model": "llama2", "prompt": "test"},
"priority": "invalid_priority",
"requirements": {}
}
response = await coordinator_client.post("/tasks/submit", json=invalid_data)
assert response.status_code == 400
@pytest.mark.asyncio
async def test_task_distribution_stats(self, coordinator_client):
"""Test getting task distribution statistics."""
response = await coordinator_client.get("/tasks/status")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "stats" in data
assert "load_balancer_stats" in data["stats"]
@pytest.mark.asyncio
async def test_task_assignment_with_active_agent(self, coordinator_client, sample_agent_data, sample_task_data):
"""Test task assignment to active agent."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Submit task
response = await coordinator_client.post("/tasks/submit", json=sample_task_data)
assert response.status_code in (200, 201)
# Check stats
await asyncio.sleep(1) # Give time for distribution
stats_response = await coordinator_client.get("/tasks/status")
stats_data = stats_response.json()
assert stats_data["stats"]["tasks_distributed"] >= 1
class TestLoadBalancing:
"""Test load balancing functionality."""
@pytest.mark.asyncio
async def test_least_connections_strategy(self, coordinator_client):
"""Test least connections load balancing strategy."""
# Register multiple agents
agents = [
{"agent_id": "agent-1", "agent_type": "worker"},
{"agent_id": "agent-2", "agent_type": "worker"},
{"agent_id": "agent-3", "agent_type": "worker"}
]
for agent in agents:
await coordinator_client.post("/agents/register", json=agent)
# Submit multiple tasks
for i in range(5):
await coordinator_client.post("/tasks/submit", json={
"task_data": {"task": f"task-{i}"},
"priority": "normal",
"requirements": {}
})
# Check distribution
await asyncio.sleep(2)
stats_response = await coordinator_client.get("/tasks/status")
stats_data = stats_response.json()
assert stats_data["stats"]["load_balancer_stats"]["active_agents"] >= 3
@pytest.mark.asyncio
async def test_no_eligible_agents(self, coordinator_client, sample_task_data):
"""Test task submission when no eligible agents exist."""
# Submit task without any agents registered
response = await coordinator_client.post("/tasks/submit", json=sample_task_data)
# Should succeed (task queued) or fail depending on implementation
assert response.status_code in (200, 201, 503)
class TestQueueManagement:
"""Test task queue management endpoints."""
@pytest.mark.asyncio
async def test_get_queue_sizes(self, coordinator_client):
"""Test getting queue sizes."""
response = await coordinator_client.get("/tasks/queues")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "queue_sizes" in data
@pytest.mark.asyncio
async def test_clear_queue(self, coordinator_client, sample_task_data):
"""Test clearing a priority queue."""
# Submit some tasks
for i in range(3):
await coordinator_client.post("/tasks/submit", json=sample_task_data)
# Clear normal priority queue
response = await coordinator_client.post("/tasks/queues/normal/clear")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "cleared_count" in data
@pytest.mark.asyncio
async def test_clear_invalid_queue(self, coordinator_client):
"""Test clearing with invalid priority."""
response = await coordinator_client.post("/tasks/queues/invalid/clear")
assert response.status_code == 400
@pytest.mark.asyncio
async def test_get_queue_stats(self, coordinator_client):
"""Test getting detailed queue statistics."""
response = await coordinator_client.get("/tasks/queues/stats")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "queue_sizes" in data
assert "distribution_stats" in data
class TestHeartbeat:
"""Test agent heartbeat functionality."""
@pytest.mark.asyncio
async def test_agent_heartbeat(self, coordinator_client, sample_agent_data):
"""Test agent heartbeat endpoint."""
# Register an agent
await coordinator_client.post("/agents/register", json=sample_agent_data)
# Send heartbeat
response = await coordinator_client.post(f"/agents/{sample_agent_data['agent_id']}/heartbeat")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "heartbeat_at" in data
@pytest.mark.asyncio
async def test_heartbeat_nonexistent_agent(self, coordinator_client):
"""Test heartbeat for non-existent agent."""
response = await coordinator_client.post("/agents/nonexistent/heartbeat")
assert response.status_code == 404
class TestHealthCheck:
"""Test health check endpoints."""
@pytest.mark.asyncio
async def test_health_check(self, coordinator_client):
"""Test service health check."""
response = await coordinator_client.get("/health")
assert response.status_code == 200
data = response.json()
assert "status" in data