Files
aitbc/apps/agent-coordinator/test_agent_endpoint.py
aitbc 4ee9705670
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Successful in 15s
Cross-Node Transaction Testing / transaction-test (push) Successful in 5s
Deploy to Testnet / deploy-testnet (push) Successful in 1m9s
Integration Tests / test-service-integration (push) Successful in 2m38s
Multi-Node Stress Testing / stress-test (push) Successful in 2s
Node Failover Simulation / failover-test (push) Successful in 2s
Production Tests / Production Integration Tests (push) Successful in 17s
Python Tests / test-python (push) Failing after 1m5s
Security Scanning / security-scan (push) Failing after 28s
Add datetime serialization to task distribution with send success tracking and failure statistics
- Add send_success return value capture from _send_task_to_agent
- Add conditional tasks_distributed increment only on successful send
- Add tasks_failed increment and warning log on send failure
- Remove _simulate_task_completion call (replaced with event-driven comment)
- Add convert_datetime helper function for recursive datetime to ISO format conversion
- Apply datetime conversion to message payload before HTTP
2026-05-08 17:29:58 +02:00

103 lines
2.5 KiB
Python

"""
Simple test agent endpoint to verify task distribution
Listens on port 9997 and accepts task execution requests
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any, Optional
import uvicorn
from datetime import datetime
app = FastAPI(title="Test Agent Endpoint")
class TaskMessage(BaseModel):
"""Task message structure"""
id: str
sender_id: str
receiver_id: Optional[str]
message_type: str
priority: str
timestamp: str
payload: Dict[str, Any]
correlation_id: Optional[str]
reply_to: Optional[str]
ttl: int
class TaskResponse(BaseModel):
"""Task execution response"""
status: str
task_id: str
agent_id: str
executed_at: str
result: Dict[str, Any]
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"status": "running",
"agent_id": "test-agent-9997",
"timestamp": datetime.utcnow().isoformat()
}
@app.get("/health")
async def health():
"""Health check endpoint"""
return {
"status": "healthy",
"agent_id": "test-agent-9997"
}
@app.post("/tasks/execute")
async def execute_task(task: TaskMessage):
"""Execute a task sent by the task distributor"""
try:
print(f"[{datetime.utcnow()}] Received task:")
print(f" Task ID: {task.id}")
print(f" From: {task.sender_id}")
print(f" Type: {task.message_type}")
print(f" Priority: {task.priority}")
print(f" Payload: {task.payload}")
# Simulate task processing
task_data = task.payload.get("task_data", {})
task_type = task_data.get("task_type", "unknown")
# Simple task simulation
result = {
"status": "completed",
"output": f"Task {task_type} executed successfully",
"processing_time_ms": 100
}
response = TaskResponse(
status="success",
task_id=task.id,
agent_id="test-agent-9997",
executed_at=datetime.utcnow().isoformat(),
result=result
)
print(f"[{datetime.utcnow()}] Task executed successfully")
return response
except Exception as e:
print(f"[{datetime.utcnow()}] Error executing task: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
print("Starting test agent endpoint on port 9997...")
uvicorn.run(
app,
host="0.0.0.0",
port=9997,
log_level="info"
)