Add message storage, broadcast, and peer management features to agent coordinator
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Has been cancelled
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Has been cancelled
Production Tests / Production Integration Tests (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
Documentation Validation / validate-docs (push) Has been cancelled
Documentation Validation / validate-policies-strict (push) Has been cancelled

- Import MessageStorage and PeerStorage in lifespan
- Initialize message_storage and peer_storage with Redis URL
- Add start/stop lifecycle management for storage services
- Add protocol field to MessageRequest model with validation
- Add BroadcastRequest model with agent_type and capabilities filters
- Store sent messages in Redis with metadata (message_id, sender, receiver, type, priority, protocol, timestamp)
- Add /
This commit is contained in:
aitbc
2026-05-07 20:16:50 +02:00
parent a41a1c0005
commit 10a595a788
6 changed files with 934 additions and 13 deletions

View File

@@ -389,6 +389,349 @@ Get task distribution statistics and load balancer metrics.
curl http://localhost:9001/tasks/status
```
## Message Management API
### Send Message
Send a message to a specific agent using a specified communication protocol.
**Endpoint:** `POST /messages/send`
**Request Body:**
```json
{
"receiver_id": "string (required)",
"message_type": "string (required)",
"payload": {"string": "any"},
"priority": "string (default: normal)",
"protocol": "string (default: hierarchical)"
}
```
**Parameters:**
- `receiver_id` (required): Target agent ID
- `message_type` (required): Message type (direct, broadcast, hierarchical, peer_to_peer, etc.)
- `payload` (required): Message data
- `priority` (optional): Message priority (low, normal, high, critical)
- `protocol` (optional): Communication protocol (hierarchical, peer_to_peer, broadcast)
**Response (200 OK):**
```json
{
"status": "success",
"message": "Message sent successfully",
"message_id": "UUID string",
"receiver_id": "string",
"protocol": "string",
"sent_at": "ISO 8601 timestamp"
}
```
**Response (400 Bad Request):**
```json
{
"detail": "Invalid protocol: {protocol}. Valid protocols: hierarchical, peer_to_peer, broadcast"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Communication manager not available"
}
```
**Example:**
```bash
curl -X POST http://localhost:9001/messages/send \
-H "Content-Type: application/json" \
-d '{
"receiver_id": "hermes-agent",
"message_type": "direct",
"payload": {"task": "process_data"},
"priority": "normal",
"protocol": "hierarchical"
}'
```
### Broadcast Message
Broadcast a message to multiple agents with optional filtering.
**Endpoint:** `POST /messages/broadcast`
**Request Body:**
```json
{
"message_type": "string (required)",
"payload": {"string": "any"},
"priority": "string (default: normal)",
"agent_type": "string (optional)",
"capabilities": ["string (optional)"]
}
```
**Parameters:**
- `message_type` (required): Message type
- `payload` (required): Message data
- `priority` (optional): Message priority (low, normal, high, critical)
- `agent_type` (optional): Filter by agent type
- `capabilities` (optional): Filter by capabilities
**Response (200 OK):**
```json
{
"status": "success",
"message": "Broadcast sent to {count} agents",
"recipients": ["string"],
"count": 0,
"broadcast_at": "ISO 8601 timestamp"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Communication manager not available"
}
```
**Example:**
```bash
curl -X POST http://localhost:9001/messages/broadcast \
-H "Content-Type: application/json" \
-d '{
"message_type": "broadcast",
"payload": {"announcement": "system_update"},
"agent_type": "worker"
}'
```
### Get Message History
Retrieve message history with optional filtering.
**Endpoint:** `GET /messages/history`
**Query Parameters:**
- `sender_id` (optional): Filter by sender ID
- `receiver_id` (optional): Filter by receiver ID
- `limit` (optional): Maximum number of messages (default: 100)
- `offset` (optional): Pagination offset (default: 0)
**Response (200 OK):**
```json
{
"status": "success",
"messages": [
{
"message_id": "string",
"sender_id": "string",
"receiver_id": "string",
"message_type": "string",
"priority": "string",
"payload": {"string": "any"},
"protocol": "string",
"timestamp": "ISO 8601 timestamp"
}
],
"count": 0,
"limit": 100,
"offset": 0,
"timestamp": "ISO 8601 timestamp"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Message storage not available"
}
```
**Example:**
```bash
curl "http://localhost:9001/messages/history?sender_id=agent-1&limit=50"
```
### Get Specific Message
Retrieve a specific message by ID.
**Endpoint:** `GET /messages/{message_id}`
**URL Parameters:**
- `message_id` (required): The unique message identifier
**Response (200 OK):**
```json
{
"status": "success",
"message": {
"message_id": "string",
"sender_id": "string",
"receiver_id": "string",
"message_type": "string",
"priority": "string",
"payload": {"string": "any"},
"protocol": "string",
"timestamp": "ISO 8601 timestamp"
},
"timestamp": "ISO 8601 timestamp"
}
```
**Response (404 Not Found):**
```json
{
"detail": "Message {message_id} not found"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Message storage not available"
}
```
**Example:**
```bash
curl http://localhost:9001/messages/{message_id}
```
## Peer Management API
### Add Peer Connection
Add a peer connection for an agent.
**Endpoint:** `POST /peers/add`
**Query Parameters:**
- `agent_id` (required): Agent ID
- `peer_id` (required): Peer agent ID
**Response (200 OK):**
```json
{
"status": "success",
"message": "Peer {peer_id} added for agent {agent_id}",
"agent_id": "string",
"peer_id": "string",
"connected_at": "ISO 8601 timestamp"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Peer storage not available"
}
```
**Example:**
```bash
curl -X POST "http://localhost:9001/peers/add?agent_id=agent-1&peer_id=agent-2"
```
### Remove Peer Connection
Remove a peer connection for an agent.
**Endpoint:** `POST /peers/remove`
**Query Parameters:**
- `agent_id` (required): Agent ID
- `peer_id` (required): Peer agent ID
**Response (200 OK):**
```json
{
"status": "success",
"message": "Peer {peer_id} removed for agent {agent_id}",
"agent_id": "string",
"peer_id": "string",
"removed_at": "ISO 8601 timestamp"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Peer storage not available"
}
```
**Example:**
```bash
curl -X POST "http://localhost:9001/peers/remove?agent_id=agent-1&peer_id=agent-2"
```
### Get Agent Peers
Get all peers for a specific agent.
**Endpoint:** `GET /peers/{agent_id}`
**URL Parameters:**
- `agent_id` (required): Agent ID
**Response (200 OK):**
```json
{
"status": "success",
"agent_id": "string",
"peers": ["string"],
"count": 0,
"timestamp": "ISO 8601 timestamp"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Peer storage not available"
}
```
**Example:**
```bash
curl http://localhost:9001/peers/agent-1
```
### Get All Peer Connections
Get all peer connections in the system.
**Endpoint:** `GET /peers`
**Response (200 OK):**
```json
{
"status": "success",
"connections": {
"agent_id": ["peer_id", ...],
...
},
"total_agents": 0,
"total_peers": 0,
"timestamp": "ISO 8601 timestamp"
}
```
**Response (503 Service Unavailable):**
```json
{
"detail": "Peer storage not available"
}
```
**Example:**
```bash
curl http://localhost:9001/peers
```
## Health Check
### Service Health

View File

@@ -135,8 +135,26 @@ AgentMessage:
5. TTL ensures expired messages are discarded
6. Priority levels ensure important messages are processed first
**Current Implementation:**
The coordinator provides REST APIs for agent registration, discovery, and status updates. Agent communication is currently implemented via HTTP endpoints on the coordinator service (port 9001). The communication protocols are defined in the code but require additional implementation for full peer-to-peer and hierarchical messaging.
**Current Implementation Status:**
**Implemented:**
- `POST /messages/send` - Send messages (hardcoded to "hierarchical" protocol only)
- `GET /load-balancer/stats` - Load balancer statistics
- `GET /registry/stats` - Agent registry statistics
- `GET /agents/service/{service}` - Find agents by service
- `GET /agents/capability/{capability}` - Find agents by capability
- `PUT /load-balancer/strategy` - Change load balancing strategy
**Missing / Incomplete:**
1. `POST /messages/send` only uses "hierarchical" protocol - doesn't support:
- `peer_to_peer` protocol
- `broadcast` protocol
- Other protocols defined in MessageType enum
2. No broadcast endpoint - Can't send broadcast messages via API
3. No message history/storage - Messages aren't persisted
4. No peer management endpoints - Can't add/remove peers via API
**Note:** The protocols (Hierarchical, P2P, Broadcast) are well-implemented in `communication.py`, but the API layer (`messages.py`) doesn't fully expose them yet.
## Service Initialization