Add HTTP task distribution implementation to TaskDistributor with agent endpoint lookup and error handling
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Successful in 18s
Cross-Node Transaction Testing / transaction-test (push) Successful in 2s
Deploy to Testnet / deploy-testnet (push) Successful in 1m7s
Integration Tests / test-service-integration (push) Successful in 2m38s
Multi-Node Stress Testing / stress-test (push) Successful in 2s
Node Failover Simulation / failover-test (push) Successful in 2s
Production Tests / Production Integration Tests (push) Successful in 20s
Python Tests / test-python (push) Failing after 1m7s
Security Scanning / security-scan (push) Successful in 29s
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Successful in 18s
Cross-Node Transaction Testing / transaction-test (push) Successful in 2s
Deploy to Testnet / deploy-testnet (push) Successful in 1m7s
Integration Tests / test-service-integration (push) Successful in 2m38s
Multi-Node Stress Testing / stress-test (push) Successful in 2s
Node Failover Simulation / failover-test (push) Successful in 2s
Production Tests / Production Integration Tests (push) Successful in 20s
Python Tests / test-python (push) Failing after 1m7s
Security Scanning / security-scan (push) Successful in 29s
- Uncomment _send_task_to_agent call in distribute_tasks method - Add _send_task_to_agent method with HTTP POST to agent /tasks/execute endpoint - Add agent registry lookup for HTTP endpoint retrieval - Add httpx client with 5s timeout for task delivery - Add response status code validation (200, 201, 202) and error logging
This commit is contained in:
@@ -653,7 +653,7 @@ class TaskDistributor:
|
||||
)
|
||||
|
||||
# Send task to agent (implementation depends on communication system)
|
||||
# await self._send_task_to_agent(agent_id, task_message)
|
||||
await self._send_task_to_agent(agent_id, task_message)
|
||||
|
||||
self.distribution_stats["tasks_distributed"] += 1
|
||||
|
||||
@@ -677,6 +677,40 @@ class TaskDistributor:
|
||||
if total_distributed > 0 else distribution_time
|
||||
)
|
||||
|
||||
async def _send_task_to_agent(self, agent_id: str, task_message: AgentMessage):
|
||||
"""Send task to agent via HTTP"""
|
||||
try:
|
||||
# Get agent info from registry to find endpoint
|
||||
agent_info = await self.load_balancer.registry.get_agent_by_id(agent_id)
|
||||
if not agent_info:
|
||||
logger.error(f"Agent {agent_id} not found in registry")
|
||||
return False
|
||||
|
||||
# Get HTTP endpoint
|
||||
http_endpoint = agent_info.endpoints.get("http")
|
||||
if not http_endpoint:
|
||||
logger.error(f"Agent {agent_id} has no HTTP endpoint")
|
||||
return False
|
||||
|
||||
# Send task to agent via HTTP POST
|
||||
import httpx
|
||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||
response = await client.post(
|
||||
f"{http_endpoint}/tasks/execute",
|
||||
json=task_message.to_dict()
|
||||
)
|
||||
|
||||
if response.status_code in (200, 201, 202):
|
||||
logger.info(f"Task sent successfully to agent {agent_id}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Failed to send task to agent {agent_id}: {response.status_code}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending task to agent {agent_id}: {e}")
|
||||
return False
|
||||
|
||||
async def _simulate_task_completion(self, task_info: Dict[str, Any], agent_id: str):
|
||||
"""Simulate task completion (for testing)"""
|
||||
# Simulate task processing time
|
||||
|
||||
Reference in New Issue
Block a user