diff --git a/apps/agent-coordinator/src/app/routing/load_balancer.py b/apps/agent-coordinator/src/app/routing/load_balancer.py index 9dbd5099..5a2389d6 100644 --- a/apps/agent-coordinator/src/app/routing/load_balancer.py +++ b/apps/agent-coordinator/src/app/routing/load_balancer.py @@ -652,13 +652,16 @@ class TaskDistributor: task_data=task_info["task_data"] ) - # Send task to agent (implementation depends on communication system) - await self._send_task_to_agent(agent_id, task_message) + # Send task to agent + send_success = await self._send_task_to_agent(agent_id, task_message) - self.distribution_stats["tasks_distributed"] += 1 - - # Simulate task completion (in real implementation, this would be event-driven) - asyncio.create_task(self._simulate_task_completion(task_info, agent_id)) + if send_success: + self.distribution_stats["tasks_distributed"] += 1 + # In real implementation, task completion would be event-driven when agent responds + # For now, just mark as distributed + else: + logger.warning(f"Failed to send task to agent {agent_id}") + self.distribution_stats["tasks_failed"] += 1 else: logger.warning(f"Failed to distribute task: no suitable agent found") @@ -692,12 +695,27 @@ class TaskDistributor: logger.error(f"Agent {agent_id} has no HTTP endpoint") return False + # Convert message to dict and handle datetime serialization + message_dict = task_message.to_dict() + + # Ensure payload is JSON serializable (convert datetime objects) + def convert_datetime(obj): + if isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, dict): + return {k: convert_datetime(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [convert_datetime(item) for item in obj] + return obj + + message_dict["payload"] = convert_datetime(message_dict["payload"]) + # Send task to agent via HTTP POST import httpx async with httpx.AsyncClient(timeout=5.0) as client: response = await client.post( f"{http_endpoint}/tasks/execute", - json=task_message.to_dict() + json=message_dict ) if response.status_code in (200, 201, 202): diff --git a/apps/agent-coordinator/test_agent_endpoint.py b/apps/agent-coordinator/test_agent_endpoint.py new file mode 100644 index 00000000..b851c340 --- /dev/null +++ b/apps/agent-coordinator/test_agent_endpoint.py @@ -0,0 +1,102 @@ +""" +Simple test agent endpoint to verify task distribution +Listens on port 9997 and accepts task execution requests +""" + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from typing import Dict, Any, Optional +import uvicorn +from datetime import datetime + +app = FastAPI(title="Test Agent Endpoint") + + +class TaskMessage(BaseModel): + """Task message structure""" + id: str + sender_id: str + receiver_id: Optional[str] + message_type: str + priority: str + timestamp: str + payload: Dict[str, Any] + correlation_id: Optional[str] + reply_to: Optional[str] + ttl: int + + +class TaskResponse(BaseModel): + """Task execution response""" + status: str + task_id: str + agent_id: str + executed_at: str + result: Dict[str, Any] + + +@app.get("/") +async def root(): + """Health check endpoint""" + return { + "status": "running", + "agent_id": "test-agent-9997", + "timestamp": datetime.utcnow().isoformat() + } + + +@app.get("/health") +async def health(): + """Health check endpoint""" + return { + "status": "healthy", + "agent_id": "test-agent-9997" + } + + +@app.post("/tasks/execute") +async def execute_task(task: TaskMessage): + """Execute a task sent by the task distributor""" + try: + print(f"[{datetime.utcnow()}] Received task:") + print(f" Task ID: {task.id}") + print(f" From: {task.sender_id}") + print(f" Type: {task.message_type}") + print(f" Priority: {task.priority}") + print(f" Payload: {task.payload}") + + # Simulate task processing + task_data = task.payload.get("task_data", {}) + task_type = task_data.get("task_type", "unknown") + + # Simple task simulation + result = { + "status": "completed", + "output": f"Task {task_type} executed successfully", + "processing_time_ms": 100 + } + + response = TaskResponse( + status="success", + task_id=task.id, + agent_id="test-agent-9997", + executed_at=datetime.utcnow().isoformat(), + result=result + ) + + print(f"[{datetime.utcnow()}] Task executed successfully") + return response + + except Exception as e: + print(f"[{datetime.utcnow()}] Error executing task: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +if __name__ == "__main__": + print("Starting test agent endpoint on port 9997...") + uvicorn.run( + app, + host="0.0.0.0", + port=9997, + log_level="info" + )