diff --git a/apps/agent-coordinator/src/app/lifespan.py b/apps/agent-coordinator/src/app/lifespan.py index 1f335376..d029e2f3 100644 --- a/apps/agent-coordinator/src/app/lifespan.py +++ b/apps/agent-coordinator/src/app/lifespan.py @@ -18,11 +18,12 @@ async def lifespan(app: FastAPI): from .protocols.message_types import MessageProcessor from .routing.agent_discovery import AgentDiscoveryService, AgentRegistry from .routing.load_balancer import LoadBalancer, LoadBalancingStrategy, TaskDistributor + from .storage.message_storage import MessageStorage, PeerStorage # Get Redis URL from environment variable redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/1") logger.info(f"Using Redis URL: {redis_url}") - + state.agent_registry = AgentRegistry(redis_url=redis_url) await state.agent_registry.start() @@ -32,6 +33,10 @@ async def lifespan(app: FastAPI): state.task_distributor = TaskDistributor(state.load_balancer) state.communication_manager = CommunicationManager("agent-coordinator") state.message_processor = MessageProcessor("agent-coordinator") + state.message_storage = MessageStorage(redis_url=redis_url) + state.peer_storage = PeerStorage(redis_url=redis_url) + await state.message_storage.start() + await state.peer_storage.start() asyncio.create_task(state.task_distributor.start_distribution()) asyncio.create_task(state.message_processor.start_processing()) @@ -43,4 +48,8 @@ async def lifespan(app: FastAPI): logger.info("Shutting down AITBC Agent Coordinator...") if state.agent_registry: await state.agent_registry.stop() + if state.message_storage: + await state.message_storage.stop() + if state.peer_storage: + await state.peer_storage.stop() logger.info("Agent Coordinator shut down") diff --git a/apps/agent-coordinator/src/app/models.py b/apps/agent-coordinator/src/app/models.py index f169d162..64d93974 100644 --- a/apps/agent-coordinator/src/app/models.py +++ b/apps/agent-coordinator/src/app/models.py @@ -28,3 +28,12 @@ class MessageRequest(BaseModel): message_type: str = Field(..., description="Message type") payload: Dict[str, Any] = Field(..., description="Message payload") priority: str = Field("normal", description="Message priority") + protocol: str = Field("hierarchical", description="Communication protocol (hierarchical, peer_to_peer, broadcast)") + + +class BroadcastRequest(BaseModel): + message_type: str = Field(..., description="Message type") + payload: Dict[str, Any] = Field(..., description="Message payload") + priority: str = Field("normal", description="Message priority") + agent_type: Optional[str] = Field(None, description="Filter by agent type") + capabilities: Optional[List[str]] = Field(None, description="Filter by capabilities") diff --git a/apps/agent-coordinator/src/app/routers/messages.py b/apps/agent-coordinator/src/app/routers/messages.py index d0d29a9c..e00ee75f 100644 --- a/apps/agent-coordinator/src/app/routers/messages.py +++ b/apps/agent-coordinator/src/app/routers/messages.py @@ -12,7 +12,7 @@ from ..auth.permissions import Permission, Role, permission_manager from ..ai.advanced_ai import ai_integration from ..ai.realtime_learning import learning_system from ..consensus.distributed_consensus import distributed_consensus -from ..models import AgentRegistrationRequest, AgentStatusUpdate, MessageRequest, TaskSubmission +from ..models import AgentRegistrationRequest, AgentStatusUpdate, MessageRequest, TaskSubmission, BroadcastRequest from ..monitoring.alerting import alert_manager from ..monitoring.prometheus_metrics import metrics_registry, performance_monitor from ..protocols.communication import MessageType, create_protocol @@ -30,21 +30,27 @@ async def send_message(request: MessageRequest): try: if not state.communication_manager: raise HTTPException(status_code=503, detail="Communication manager not available") - + from ..protocols.communication import AgentMessage, Priority - + + # Validate protocol + valid_protocols = ["hierarchical", "peer_to_peer", "broadcast"] + protocol = request.protocol.lower() + if protocol not in valid_protocols: + raise HTTPException(status_code=400, detail=f"Invalid protocol: {request.protocol}. Valid protocols: {', '.join(valid_protocols)}") + # Convert message type try: message_type = MessageType(request.message_type) except ValueError: raise HTTPException(status_code=400, detail=f"Invalid message type: {request.message_type}") - + # Convert priority try: priority = Priority(request.priority.lower()) except ValueError: raise HTTPException(status_code=400, detail=f"Invalid priority: {request.priority}") - + # Create message message = AgentMessage( sender_id="agent-coordinator", @@ -53,25 +59,175 @@ async def send_message(request: MessageRequest): priority=priority, payload=request.payload ) - - # Send message - success = await state.communication_manager.send_message("hierarchical", message) - + + # Send message with specified protocol + success = await state.communication_manager.send_message(protocol, message) + if success: + # Store message in Redis for history + if state.message_storage: + message_data = { + "message_id": message.id, + "sender_id": message.sender_id, + "receiver_id": message.receiver_id, + "message_type": message.message_type.value, + "priority": message.priority.value, + "payload": json.dumps(message.payload), + "protocol": protocol, + "timestamp": datetime.now(timezone.utc).isoformat() + } + await state.message_storage.store_message(message.id, message_data) + return { "status": "success", "message": "Message sent successfully", "message_id": message.id, "receiver_id": request.receiver_id, + "protocol": protocol, "sent_at": datetime.now(timezone.utc).isoformat() } else: raise HTTPException(status_code=500, detail="Failed to send message") - + + except HTTPException: + raise except Exception as e: logger.error(f"Error sending message: {e}") raise HTTPException(status_code=500, detail=str(e)) +# Broadcast message +@router.post("/messages/broadcast") +async def broadcast_message(request: BroadcastRequest): + """Broadcast message to multiple agents""" + try: + if not state.communication_manager: + raise HTTPException(status_code=503, detail="Communication manager not available") + + if not state.agent_registry: + raise HTTPException(status_code=503, detail="Agent registry not available") + + from ..protocols.communication import AgentMessage, Priority + + # Convert message type + try: + message_type = MessageType(request.message_type) + except ValueError: + raise HTTPException(status_code=400, detail=f"Invalid message type: {request.message_type}") + + # Convert priority + try: + priority = Priority(request.priority.lower()) + except ValueError: + raise HTTPException(status_code=400, detail=f"Invalid priority: {request.priority}") + + # Build discovery query for filtering + query = {} + if request.agent_type: + query["agent_type"] = request.agent_type + if request.capabilities: + query["capabilities"] = request.capabilities + + # Discover target agents + agents = await state.agent_registry.discover_agents(query) + + if not agents: + return { + "status": "success", + "message": "No matching agents found", + "recipients": [], + "count": 0, + "broadcast_at": datetime.now(timezone.utc).isoformat() + } + + # Send broadcast to each agent + recipients = [] + for agent in agents: + message = AgentMessage( + sender_id="agent-coordinator", + receiver_id=agent.agent_id, + message_type=message_type, + priority=priority, + payload=request.payload + ) + + success = await state.communication_manager.send_message("broadcast", message) + if success: + recipients.append(agent.agent_id) + + return { + "status": "success", + "message": f"Broadcast sent to {len(recipients)} agents", + "recipients": recipients, + "count": len(recipients), + "broadcast_at": datetime.now(timezone.utc).isoformat() + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error broadcasting message: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +# Get message history +@router.get("/messages/history") +async def get_message_history( + sender_id: Optional[str] = Query(None, description="Filter by sender ID"), + receiver_id: Optional[str] = Query(None, description="Filter by receiver ID"), + limit: int = Query(100, description="Maximum number of messages"), + offset: int = Query(0, description="Offset for pagination") +): + """Get message history with optional filters""" + try: + if not state.message_storage: + raise HTTPException(status_code=503, detail="Message storage not available") + + if sender_id: + messages = await state.message_storage.get_messages_by_sender(sender_id, limit, offset) + elif receiver_id: + messages = await state.message_storage.get_messages_by_receiver(receiver_id, limit, offset) + else: + messages = await state.message_storage.get_all_messages(limit, offset) + + return { + "status": "success", + "messages": messages, + "count": len(messages), + "limit": limit, + "offset": offset, + "timestamp": datetime.now(timezone.utc).isoformat() + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving message history: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +# Get specific message +@router.get("/messages/{message_id}") +async def get_message(message_id: str): + """Get a specific message by ID""" + try: + if not state.message_storage: + raise HTTPException(status_code=503, detail="Message storage not available") + + message = await state.message_storage.get_message(message_id) + + if not message: + raise HTTPException(status_code=404, detail=f"Message {message_id} not found") + + return { + "status": "success", + "message": message, + "timestamp": datetime.now(timezone.utc).isoformat() + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving message {message_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + # Load balancer statistics @router.get("/load-balancer/stats") async def get_load_balancer_stats(): @@ -183,3 +339,112 @@ async def set_load_balancing_strategy(strategy: str = Query(..., description="Lo except Exception as e: logger.error(f"Error setting load balancing strategy: {e}") raise HTTPException(status_code=500, detail=str(e)) + +# Peer management endpoints +@router.post("/peers/add") +async def add_peer(agent_id: str = Query(..., description="Agent ID"), peer_id: str = Query(..., description="Peer agent ID")): + """Add a peer connection for an agent""" + try: + from ..storage.message_storage import PeerStorage + + if not state.peer_storage: + raise HTTPException(status_code=503, detail="Peer storage not available") + + success = await state.peer_storage.add_peer(agent_id, peer_id, {"connected_at": datetime.now(timezone.utc).isoformat()}) + + if success: + return { + "status": "success", + "message": f"Peer {peer_id} added for agent {agent_id}", + "agent_id": agent_id, + "peer_id": peer_id, + "connected_at": datetime.now(timezone.utc).isoformat() + } + else: + raise HTTPException(status_code=500, detail="Failed to add peer") + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error adding peer: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/peers/remove") +async def remove_peer(agent_id: str = Query(..., description="Agent ID"), peer_id: str = Query(..., description="Peer agent ID")): + """Remove a peer connection for an agent""" + try: + from ..storage.message_storage import PeerStorage + + if not state.peer_storage: + raise HTTPException(status_code=503, detail="Peer storage not available") + + success = await state.peer_storage.remove_peer(agent_id, peer_id) + + if success: + return { + "status": "success", + "message": f"Peer {peer_id} removed for agent {agent_id}", + "agent_id": agent_id, + "peer_id": peer_id, + "removed_at": datetime.now(timezone.utc).isoformat() + } + else: + raise HTTPException(status_code=500, detail="Failed to remove peer") + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error removing peer: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/peers/{agent_id}") +async def get_agent_peers(agent_id: str): + """Get all peers for a specific agent""" + try: + from ..storage.message_storage import PeerStorage + + if not state.peer_storage: + raise HTTPException(status_code=503, detail="Peer storage not available") + + peers = await state.peer_storage.get_agent_peers(agent_id) + + return { + "status": "success", + "agent_id": agent_id, + "peers": peers, + "count": len(peers), + "timestamp": datetime.now(timezone.utc).isoformat() + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving peers for agent {agent_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/peers") +async def get_all_peers(): + """Get all peer connections in the system""" + try: + from ..storage.message_storage import PeerStorage + + if not state.peer_storage: + raise HTTPException(status_code=503, detail="Peer storage not available") + + connections = await state.peer_storage.get_all_peer_connections() + + total_peers = sum(len(peers) for peers in connections.values()) + + return { + "status": "success", + "connections": connections, + "total_agents": len(connections), + "total_peers": total_peers, + "timestamp": datetime.now(timezone.utc).isoformat() + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving all peer connections: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/apps/agent-coordinator/src/app/storage/message_storage.py b/apps/agent-coordinator/src/app/storage/message_storage.py new file mode 100644 index 00000000..ad2b7a21 --- /dev/null +++ b/apps/agent-coordinator/src/app/storage/message_storage.py @@ -0,0 +1,277 @@ +""" +Message storage layer for persisting agent communication messages in Redis +""" + +import json +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from aitbc import get_logger + +logger = get_logger(__name__) + + +class MessageStorage: + """Redis-based message storage for agent communication history""" + + def __init__(self, redis_url: str): + """Initialize message storage with Redis connection""" + import redis.asyncio as redis + self.redis_url = redis_url + self.redis: Optional[redis.Redis] = None + + async def start(self): + """Connect to Redis""" + import redis.asyncio as redis + self.redis = await redis.from_url(self.redis_url, decode_responses=True) + logger.info("Message storage connected to Redis") + + async def stop(self): + """Close Redis connection""" + if self.redis: + await self.redis.close() + logger.info("Message storage disconnected from Redis") + + async def store_message(self, message_id: str, message_data: Dict[str, Any]) -> bool: + """Store a message in Redis""" + try: + # Store message data + await self.redis.hset(f"message:{message_id}", mapping=message_data) + + # Index by sender + sender_id = message_data.get("sender_id") + if sender_id: + await self.redis.sadd(f"messages:sender:{sender_id}", message_id) + + # Index by receiver + receiver_id = message_data.get("receiver_id") + if receiver_id: + await self.redis.sadd(f"messages:receiver:{receiver_id}", message_id) + + # Index by timestamp (for time-based queries) + timestamp = message_data.get("timestamp", datetime.now(timezone.utc).isoformat()) + await self.redis.zadd(f"messages:timestamp", {message_id: timestamp}) + + logger.debug(f"Stored message {message_id} in Redis") + return True + + except Exception as e: + logger.error(f"Error storing message {message_id}: {e}") + return False + + async def get_message(self, message_id: str) -> Optional[Dict[str, Any]]: + """Retrieve a specific message by ID""" + try: + message_data = await self.redis.hgetall(f"message:{message_id}") + if message_data: + # Parse JSON fields + if "payload" in message_data: + message_data["payload"] = json.loads(message_data["payload"]) + return message_data + return None + + except Exception as e: + logger.error(f"Error retrieving message {message_id}: {e}") + return None + + async def get_messages_by_sender( + self, + sender_id: str, + limit: int = 100, + offset: int = 0 + ) -> List[Dict[str, Any]]: + """Get messages sent by a specific agent""" + try: + # Get message IDs for sender + message_ids = await self.redis.smembers(f"messages:sender:{sender_id}") + message_ids = list(message_ids) + + # Apply pagination + message_ids = message_ids[offset:offset + limit] + + # Retrieve messages + messages = [] + for message_id in message_ids: + message_data = await self.get_message(message_id) + if message_data: + messages.append(message_data) + + return messages + + except Exception as e: + logger.error(f"Error retrieving messages for sender {sender_id}: {e}") + return [] + + async def get_messages_by_receiver( + self, + receiver_id: str, + limit: int = 100, + offset: int = 0 + ) -> List[Dict[str, Any]]: + """Get messages received by a specific agent""" + try: + # Get message IDs for receiver + message_ids = await self.redis.smembers(f"messages:receiver:{receiver_id}") + message_ids = list(message_ids) + + # Apply pagination + message_ids = message_ids[offset:offset + limit] + + # Retrieve messages + messages = [] + for message_id in message_ids: + message_data = await self.get_message(message_id) + if message_data: + messages.append(message_data) + + return messages + + except Exception as e: + logger.error(f"Error retrieving messages for receiver {receiver_id}: {e}") + return [] + + async def get_all_messages( + self, + limit: int = 100, + offset: int = 0 + ) -> List[Dict[str, Any]]: + """Get all messages with pagination""" + try: + # Get message IDs by timestamp (most recent first) + message_ids = await self.redis.zrevrange(f"messages:timestamp", offset, offset + limit - 1) + + # Retrieve messages + messages = [] + for message_id in message_ids: + message_data = await self.get_message(message_id) + if message_data: + messages.append(message_data) + + return messages + + except Exception as e: + logger.error(f"Error retrieving all messages: {e}") + return [] + + async def delete_message(self, message_id: str) -> bool: + """Delete a specific message""" + try: + # Get message data before deletion + message_data = await self.get_message(message_id) + if not message_data: + return False + + # Remove from indexes + sender_id = message_data.get("sender_id") + if sender_id: + await self.redis.srem(f"messages:sender:{sender_id}", message_id) + + receiver_id = message_data.get("receiver_id") + if receiver_id: + await self.redis.srem(f"messages:receiver:{receiver_id}", message_id) + + # Remove from timestamp index + await self.redis.zrem(f"messages:timestamp", message_id) + + # Delete message data + await self.redis.delete(f"message:{message_id}") + + logger.debug(f"Deleted message {message_id} from Redis") + return True + + except Exception as e: + logger.error(f"Error deleting message {message_id}: {e}") + return False + + +class PeerStorage: + """Redis-based peer storage for persisting peer connections across restarts""" + + def __init__(self, redis_url: str): + """Initialize peer storage with Redis connection""" + import redis.asyncio as redis + self.redis_url = redis_url + self.redis: Optional[redis.Redis] = None + + async def start(self): + """Connect to Redis""" + import redis.asyncio as redis + self.redis = await redis.from_url(self.redis_url, decode_responses=True) + logger.info("Peer storage connected to Redis") + + async def stop(self): + """Close Redis connection""" + if self.redis: + await self.redis.close() + logger.info("Peer storage disconnected from Redis") + + async def add_peer(self, agent_id: str, peer_id: str, metadata: Optional[Dict[str, Any]] = None) -> bool: + """Add a peer connection for an agent""" + try: + # Add peer to agent's peer set + await self.redis.sadd(f"peers:{agent_id}", peer_id) + + # Store peer metadata + if metadata: + await self.redis.hset(f"peer_connection:{agent_id}:{peer_id}", mapping=metadata) + + logger.debug(f"Added peer {peer_id} for agent {agent_id}") + return True + + except Exception as e: + logger.error(f"Error adding peer {peer_id} for agent {agent_id}: {e}") + return False + + async def remove_peer(self, agent_id: str, peer_id: str) -> bool: + """Remove a peer connection for an agent""" + try: + # Remove peer from agent's peer set + await self.redis.srem(f"peers:{agent_id}", peer_id) + + # Remove peer metadata + await self.redis.delete(f"peer_connection:{agent_id}:{peer_id}") + + logger.debug(f"Removed peer {peer_id} for agent {agent_id}") + return True + + except Exception as e: + logger.error(f"Error removing peer {peer_id} for agent {agent_id}: {e}") + return False + + async def get_agent_peers(self, agent_id: str) -> List[str]: + """Get all peers for a specific agent""" + try: + peer_ids = await self.redis.smembers(f"peers:{agent_id}") + return list(peer_ids) + + except Exception as e: + logger.error(f"Error retrieving peers for agent {agent_id}: {e}") + return [] + + async def get_peer_metadata(self, agent_id: str, peer_id: str) -> Optional[Dict[str, Any]]: + """Get metadata for a specific peer connection""" + try: + metadata = await self.redis.hgetall(f"peer_connection:{agent_id}:{peer_id}") + return metadata if metadata else None + + except Exception as e: + logger.error(f"Error retrieving peer metadata for {agent_id}:{peer_id}: {e}") + return None + + async def get_all_peer_connections(self) -> Dict[str, List[str]]: + """Get all peer connections in the system""" + try: + # Get all peer set keys + peer_keys = await self.redis.keys("peers:*") + connections = {} + + for key in peer_keys: + agent_id = key.replace("peers:", "") + peer_ids = await self.redis.smembers(key) + connections[agent_id] = list(peer_ids) + + return connections + + except Exception as e: + logger.error(f"Error retrieving all peer connections: {e}") + return {} diff --git a/docs/agent-coordinator/API.md b/docs/agent-coordinator/API.md index 7c1c8572..c09b395e 100644 --- a/docs/agent-coordinator/API.md +++ b/docs/agent-coordinator/API.md @@ -389,6 +389,349 @@ Get task distribution statistics and load balancer metrics. curl http://localhost:9001/tasks/status ``` +## Message Management API + +### Send Message + +Send a message to a specific agent using a specified communication protocol. + +**Endpoint:** `POST /messages/send` + +**Request Body:** +```json +{ + "receiver_id": "string (required)", + "message_type": "string (required)", + "payload": {"string": "any"}, + "priority": "string (default: normal)", + "protocol": "string (default: hierarchical)" +} +``` + +**Parameters:** +- `receiver_id` (required): Target agent ID +- `message_type` (required): Message type (direct, broadcast, hierarchical, peer_to_peer, etc.) +- `payload` (required): Message data +- `priority` (optional): Message priority (low, normal, high, critical) +- `protocol` (optional): Communication protocol (hierarchical, peer_to_peer, broadcast) + +**Response (200 OK):** +```json +{ + "status": "success", + "message": "Message sent successfully", + "message_id": "UUID string", + "receiver_id": "string", + "protocol": "string", + "sent_at": "ISO 8601 timestamp" +} +``` + +**Response (400 Bad Request):** +```json +{ + "detail": "Invalid protocol: {protocol}. Valid protocols: hierarchical, peer_to_peer, broadcast" +} +``` + +**Response (503 Service Unavailable):** +```json +{ + "detail": "Communication manager not available" +} +``` + +**Example:** +```bash +curl -X POST http://localhost:9001/messages/send \ + -H "Content-Type: application/json" \ + -d '{ + "receiver_id": "hermes-agent", + "message_type": "direct", + "payload": {"task": "process_data"}, + "priority": "normal", + "protocol": "hierarchical" + }' +``` + +### Broadcast Message + +Broadcast a message to multiple agents with optional filtering. + +**Endpoint:** `POST /messages/broadcast` + +**Request Body:** +```json +{ + "message_type": "string (required)", + "payload": {"string": "any"}, + "priority": "string (default: normal)", + "agent_type": "string (optional)", + "capabilities": ["string (optional)"] +} +``` + +**Parameters:** +- `message_type` (required): Message type +- `payload` (required): Message data +- `priority` (optional): Message priority (low, normal, high, critical) +- `agent_type` (optional): Filter by agent type +- `capabilities` (optional): Filter by capabilities + +**Response (200 OK):** +```json +{ + "status": "success", + "message": "Broadcast sent to {count} agents", + "recipients": ["string"], + "count": 0, + "broadcast_at": "ISO 8601 timestamp" +} +``` + +**Response (503 Service Unavailable):** +```json +{ + "detail": "Communication manager not available" +} +``` + +**Example:** +```bash +curl -X POST http://localhost:9001/messages/broadcast \ + -H "Content-Type: application/json" \ + -d '{ + "message_type": "broadcast", + "payload": {"announcement": "system_update"}, + "agent_type": "worker" + }' +``` + +### Get Message History + +Retrieve message history with optional filtering. + +**Endpoint:** `GET /messages/history` + +**Query Parameters:** +- `sender_id` (optional): Filter by sender ID +- `receiver_id` (optional): Filter by receiver ID +- `limit` (optional): Maximum number of messages (default: 100) +- `offset` (optional): Pagination offset (default: 0) + +**Response (200 OK):** +```json +{ + "status": "success", + "messages": [ + { + "message_id": "string", + "sender_id": "string", + "receiver_id": "string", + "message_type": "string", + "priority": "string", + "payload": {"string": "any"}, + "protocol": "string", + "timestamp": "ISO 8601 timestamp" + } + ], + "count": 0, + "limit": 100, + "offset": 0, + "timestamp": "ISO 8601 timestamp" +} +``` + +**Response (503 Service Unavailable):** +```json +{ + "detail": "Message storage not available" +} +``` + +**Example:** +```bash +curl "http://localhost:9001/messages/history?sender_id=agent-1&limit=50" +``` + +### Get Specific Message + +Retrieve a specific message by ID. + +**Endpoint:** `GET /messages/{message_id}` + +**URL Parameters:** +- `message_id` (required): The unique message identifier + +**Response (200 OK):** +```json +{ + "status": "success", + "message": { + "message_id": "string", + "sender_id": "string", + "receiver_id": "string", + "message_type": "string", + "priority": "string", + "payload": {"string": "any"}, + "protocol": "string", + "timestamp": "ISO 8601 timestamp" + }, + "timestamp": "ISO 8601 timestamp" +} +``` + +**Response (404 Not Found):** +```json +{ + "detail": "Message {message_id} not found" +} +``` + +**Response (503 Service Unavailable):** +```json +{ + "detail": "Message storage not available" +} +``` + +**Example:** +```bash +curl http://localhost:9001/messages/{message_id} +``` + +## Peer Management API + +### Add Peer Connection + +Add a peer connection for an agent. + +**Endpoint:** `POST /peers/add` + +**Query Parameters:** +- `agent_id` (required): Agent ID +- `peer_id` (required): Peer agent ID + +**Response (200 OK):** +```json +{ + "status": "success", + "message": "Peer {peer_id} added for agent {agent_id}", + "agent_id": "string", + "peer_id": "string", + "connected_at": "ISO 8601 timestamp" +} +``` + +**Response (503 Service Unavailable):** +```json +{ + "detail": "Peer storage not available" +} +``` + +**Example:** +```bash +curl -X POST "http://localhost:9001/peers/add?agent_id=agent-1&peer_id=agent-2" +``` + +### Remove Peer Connection + +Remove a peer connection for an agent. + +**Endpoint:** `POST /peers/remove` + +**Query Parameters:** +- `agent_id` (required): Agent ID +- `peer_id` (required): Peer agent ID + +**Response (200 OK):** +```json +{ + "status": "success", + "message": "Peer {peer_id} removed for agent {agent_id}", + "agent_id": "string", + "peer_id": "string", + "removed_at": "ISO 8601 timestamp" +} +``` + +**Response (503 Service Unavailable):** +```json +{ + "detail": "Peer storage not available" +} +``` + +**Example:** +```bash +curl -X POST "http://localhost:9001/peers/remove?agent_id=agent-1&peer_id=agent-2" +``` + +### Get Agent Peers + +Get all peers for a specific agent. + +**Endpoint:** `GET /peers/{agent_id}` + +**URL Parameters:** +- `agent_id` (required): Agent ID + +**Response (200 OK):** +```json +{ + "status": "success", + "agent_id": "string", + "peers": ["string"], + "count": 0, + "timestamp": "ISO 8601 timestamp" +} +``` + +**Response (503 Service Unavailable):** +```json +{ + "detail": "Peer storage not available" +} +``` + +**Example:** +```bash +curl http://localhost:9001/peers/agent-1 +``` + +### Get All Peer Connections + +Get all peer connections in the system. + +**Endpoint:** `GET /peers` + +**Response (200 OK):** +```json +{ + "status": "success", + "connections": { + "agent_id": ["peer_id", ...], + ... + }, + "total_agents": 0, + "total_peers": 0, + "timestamp": "ISO 8601 timestamp" +} +``` + +**Response (503 Service Unavailable):** +```json +{ + "detail": "Peer storage not available" +} +``` + +**Example:** +```bash +curl http://localhost:9001/peers +``` + ## Health Check ### Service Health diff --git a/docs/agent-coordinator/ARCHITECTURE.md b/docs/agent-coordinator/ARCHITECTURE.md index c056cd8b..7c969b18 100644 --- a/docs/agent-coordinator/ARCHITECTURE.md +++ b/docs/agent-coordinator/ARCHITECTURE.md @@ -135,8 +135,26 @@ AgentMessage: 5. TTL ensures expired messages are discarded 6. Priority levels ensure important messages are processed first -**Current Implementation:** -The coordinator provides REST APIs for agent registration, discovery, and status updates. Agent communication is currently implemented via HTTP endpoints on the coordinator service (port 9001). The communication protocols are defined in the code but require additional implementation for full peer-to-peer and hierarchical messaging. +**Current Implementation Status:** + +**Implemented:** +- `POST /messages/send` - Send messages (hardcoded to "hierarchical" protocol only) +- `GET /load-balancer/stats` - Load balancer statistics +- `GET /registry/stats` - Agent registry statistics +- `GET /agents/service/{service}` - Find agents by service +- `GET /agents/capability/{capability}` - Find agents by capability +- `PUT /load-balancer/strategy` - Change load balancing strategy + +**Missing / Incomplete:** +1. `POST /messages/send` only uses "hierarchical" protocol - doesn't support: + - `peer_to_peer` protocol + - `broadcast` protocol + - Other protocols defined in MessageType enum +2. No broadcast endpoint - Can't send broadcast messages via API +3. No message history/storage - Messages aren't persisted +4. No peer management endpoints - Can't add/remove peers via API + +**Note:** The protocols (Hierarchical, P2P, Broadcast) are well-implemented in `communication.py`, but the API layer (`messages.py`) doesn't fully expose them yet. ## Service Initialization