Add datetime serialization to task distribution with send success tracking and failure statistics
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Successful in 15s
Cross-Node Transaction Testing / transaction-test (push) Successful in 5s
Deploy to Testnet / deploy-testnet (push) Successful in 1m9s
Integration Tests / test-service-integration (push) Successful in 2m38s
Multi-Node Stress Testing / stress-test (push) Successful in 2s
Node Failover Simulation / failover-test (push) Successful in 2s
Production Tests / Production Integration Tests (push) Successful in 17s
Python Tests / test-python (push) Failing after 1m5s
Security Scanning / security-scan (push) Failing after 28s
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Successful in 15s
Cross-Node Transaction Testing / transaction-test (push) Successful in 5s
Deploy to Testnet / deploy-testnet (push) Successful in 1m9s
Integration Tests / test-service-integration (push) Successful in 2m38s
Multi-Node Stress Testing / stress-test (push) Successful in 2s
Node Failover Simulation / failover-test (push) Successful in 2s
Production Tests / Production Integration Tests (push) Successful in 17s
Python Tests / test-python (push) Failing after 1m5s
Security Scanning / security-scan (push) Failing after 28s
- Add send_success return value capture from _send_task_to_agent - Add conditional tasks_distributed increment only on successful send - Add tasks_failed increment and warning log on send failure - Remove _simulate_task_completion call (replaced with event-driven comment) - Add convert_datetime helper function for recursive datetime to ISO format conversion - Apply datetime conversion to message payload before HTTP
This commit is contained in:
@@ -652,13 +652,16 @@ class TaskDistributor:
|
||||
task_data=task_info["task_data"]
|
||||
)
|
||||
|
||||
# Send task to agent (implementation depends on communication system)
|
||||
await self._send_task_to_agent(agent_id, task_message)
|
||||
# Send task to agent
|
||||
send_success = await self._send_task_to_agent(agent_id, task_message)
|
||||
|
||||
self.distribution_stats["tasks_distributed"] += 1
|
||||
|
||||
# Simulate task completion (in real implementation, this would be event-driven)
|
||||
asyncio.create_task(self._simulate_task_completion(task_info, agent_id))
|
||||
if send_success:
|
||||
self.distribution_stats["tasks_distributed"] += 1
|
||||
# In real implementation, task completion would be event-driven when agent responds
|
||||
# For now, just mark as distributed
|
||||
else:
|
||||
logger.warning(f"Failed to send task to agent {agent_id}")
|
||||
self.distribution_stats["tasks_failed"] += 1
|
||||
|
||||
else:
|
||||
logger.warning(f"Failed to distribute task: no suitable agent found")
|
||||
@@ -692,12 +695,27 @@ class TaskDistributor:
|
||||
logger.error(f"Agent {agent_id} has no HTTP endpoint")
|
||||
return False
|
||||
|
||||
# Convert message to dict and handle datetime serialization
|
||||
message_dict = task_message.to_dict()
|
||||
|
||||
# Ensure payload is JSON serializable (convert datetime objects)
|
||||
def convert_datetime(obj):
|
||||
if isinstance(obj, datetime):
|
||||
return obj.isoformat()
|
||||
elif isinstance(obj, dict):
|
||||
return {k: convert_datetime(v) for k, v in obj.items()}
|
||||
elif isinstance(obj, list):
|
||||
return [convert_datetime(item) for item in obj]
|
||||
return obj
|
||||
|
||||
message_dict["payload"] = convert_datetime(message_dict["payload"])
|
||||
|
||||
# Send task to agent via HTTP POST
|
||||
import httpx
|
||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||
response = await client.post(
|
||||
f"{http_endpoint}/tasks/execute",
|
||||
json=task_message.to_dict()
|
||||
json=message_dict
|
||||
)
|
||||
|
||||
if response.status_code in (200, 201, 202):
|
||||
|
||||
102
apps/agent-coordinator/test_agent_endpoint.py
Normal file
102
apps/agent-coordinator/test_agent_endpoint.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""
|
||||
Simple test agent endpoint to verify task distribution
|
||||
Listens on port 9997 and accepts task execution requests
|
||||
"""
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
from typing import Dict, Any, Optional
|
||||
import uvicorn
|
||||
from datetime import datetime
|
||||
|
||||
app = FastAPI(title="Test Agent Endpoint")
|
||||
|
||||
|
||||
class TaskMessage(BaseModel):
|
||||
"""Task message structure"""
|
||||
id: str
|
||||
sender_id: str
|
||||
receiver_id: Optional[str]
|
||||
message_type: str
|
||||
priority: str
|
||||
timestamp: str
|
||||
payload: Dict[str, Any]
|
||||
correlation_id: Optional[str]
|
||||
reply_to: Optional[str]
|
||||
ttl: int
|
||||
|
||||
|
||||
class TaskResponse(BaseModel):
|
||||
"""Task execution response"""
|
||||
status: str
|
||||
task_id: str
|
||||
agent_id: str
|
||||
executed_at: str
|
||||
result: Dict[str, Any]
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
"""Health check endpoint"""
|
||||
return {
|
||||
"status": "running",
|
||||
"agent_id": "test-agent-9997",
|
||||
"timestamp": datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
"""Health check endpoint"""
|
||||
return {
|
||||
"status": "healthy",
|
||||
"agent_id": "test-agent-9997"
|
||||
}
|
||||
|
||||
|
||||
@app.post("/tasks/execute")
|
||||
async def execute_task(task: TaskMessage):
|
||||
"""Execute a task sent by the task distributor"""
|
||||
try:
|
||||
print(f"[{datetime.utcnow()}] Received task:")
|
||||
print(f" Task ID: {task.id}")
|
||||
print(f" From: {task.sender_id}")
|
||||
print(f" Type: {task.message_type}")
|
||||
print(f" Priority: {task.priority}")
|
||||
print(f" Payload: {task.payload}")
|
||||
|
||||
# Simulate task processing
|
||||
task_data = task.payload.get("task_data", {})
|
||||
task_type = task_data.get("task_type", "unknown")
|
||||
|
||||
# Simple task simulation
|
||||
result = {
|
||||
"status": "completed",
|
||||
"output": f"Task {task_type} executed successfully",
|
||||
"processing_time_ms": 100
|
||||
}
|
||||
|
||||
response = TaskResponse(
|
||||
status="success",
|
||||
task_id=task.id,
|
||||
agent_id="test-agent-9997",
|
||||
executed_at=datetime.utcnow().isoformat(),
|
||||
result=result
|
||||
)
|
||||
|
||||
print(f"[{datetime.utcnow()}] Task executed successfully")
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
print(f"[{datetime.utcnow()}] Error executing task: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Starting test agent endpoint on port 9997...")
|
||||
uvicorn.run(
|
||||
app,
|
||||
host="0.0.0.0",
|
||||
port=9997,
|
||||
log_level="info"
|
||||
)
|
||||
Reference in New Issue
Block a user