Refactor agent coordinator integration tests to use TestClient instead of async httpx client
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Successful in 2s
Deploy to Testnet / deploy-testnet (push) Successful in 1m9s
Multi-Node Stress Testing / stress-test (push) Successful in 4s
Node Failover Simulation / failover-test (push) Successful in 3s
Python Tests / test-python (push) Failing after 1m7s

- Replace httpx.AsyncClient with Starlette TestClient for synchronous testing
- Remove pytest_asyncio dependency and @pytest.mark.asyncio decorators
- Add sys.path manipulation to import agent-coordinator app module
- Change coordinator_client fixture from async to sync using create_app()
- Convert all test methods from async to sync by removing async/await keywords
- Remove await calls from all HTTP requests (
This commit is contained in:
aitbc
2026-05-08 14:00:52 +02:00
parent 0b2728c2c0
commit 8ad3d072f5

View File

@@ -1,15 +1,22 @@
"""Integration tests for AITBC Agent Coordinator service."""
import pytest
import httpx
from typing import Dict, Any, Generator
import pytest_asyncio
from starlette.testclient import TestClient
import sys
from pathlib import Path
# Add the agent-coordinator source to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "apps" / "agent-coordinator" / "src"))
from app.main import create_app
@pytest_asyncio.fixture
async def coordinator_client() -> Generator[httpx.AsyncClient, None, None]:
"""Create an HTTP client for coordinator API."""
async with httpx.AsyncClient(base_url="http://localhost:9001", timeout=30) as client:
@pytest.fixture
def coordinator_client() -> Generator[TestClient, None, None]:
"""Create a test client for coordinator API."""
app = create_app()
with TestClient(app) as client:
yield client
@@ -42,67 +49,59 @@ def sample_task_data() -> Dict[str, Any]:
class TestAgentRegistration:
"""Test agent registration endpoints."""
@pytest.mark.asyncio
async def test_register_agent_success(self, coordinator_client: httpx.AsyncClient, sample_agent_data: Dict[str, Any]):
def test_register_agent_success(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
"""Test successful agent registration."""
response = await coordinator_client.post("/agents/register", json=sample_agent_data)
response = coordinator_client.post("/agents/register", json=sample_agent_data)
assert response.status_code in (200, 201)
data = response.json()
assert data["status"] == "success"
assert data["agent_id"] == sample_agent_data["agent_id"]
@pytest.mark.asyncio
async def test_register_agent_duplicate(self, coordinator_client: httpx.AsyncClient, sample_agent_data: Dict[str, Any]):
def test_register_agent_duplicate(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
"""Test registering duplicate agent."""
await coordinator_client.post("/agents/register", json=sample_agent_data)
response = await coordinator_client.post("/agents/register", json=sample_agent_data)
coordinator_client.post("/agents/register", json=sample_agent_data)
response = coordinator_client.post("/agents/register", json=sample_agent_data)
assert response.status_code in (200, 201, 409)
@pytest.mark.asyncio
async def test_register_agent_invalid_data(self, coordinator_client: httpx.AsyncClient):
def test_register_agent_invalid_data(self, coordinator_client: TestClient):
"""Test registration with invalid data."""
invalid_data = {"agent_id": "invalid"}
response = await coordinator_client.post("/agents/register", json=invalid_data)
response = coordinator_client.post("/agents/register", json=invalid_data)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_register_agent_missing_agent_id(self, coordinator_client: httpx.AsyncClient):
def test_register_agent_missing_agent_id(self, coordinator_client: TestClient):
"""Test registration without agent ID."""
invalid_data = {"agent_type": "worker"}
response = await coordinator_client.post("/agents/register", json=invalid_data)
response = coordinator_client.post("/agents/register", json=invalid_data)
assert response.status_code == 422
class TestAgentDiscovery:
"""Test agent discovery endpoints."""
@pytest.mark.asyncio
async def test_discover_all_agents(self, coordinator_client: httpx.AsyncClient, sample_agent_data: Dict[str, Any]):
def test_discover_all_agents(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
"""Test discovering all agents."""
await coordinator_client.post("/agents/register", json=sample_agent_data)
response = await coordinator_client.post("/agents/discover", json={})
coordinator_client.post("/agents/register", json=sample_agent_data)
response = coordinator_client.post("/agents/discover", json={})
assert response.status_code == 200
data = response.json()
assert "agents" in data
@pytest.mark.asyncio
async def test_discover_by_status(self, coordinator_client: httpx.AsyncClient, sample_agent_data: Dict[str, Any]):
def test_discover_by_status(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
"""Test discovering agents by status."""
await coordinator_client.post("/agents/register", json=sample_agent_data)
response = await coordinator_client.post("/agents/discover", json={"status": "active"})
coordinator_client.post("/agents/register", json=sample_agent_data)
response = coordinator_client.post("/agents/discover", json={"status": "active"})
assert response.status_code == 200
@pytest.mark.asyncio
async def test_discover_by_type(self, coordinator_client: httpx.AsyncClient, sample_agent_data: Dict[str, Any]):
def test_discover_by_type(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
"""Test discovering agents by type."""
await coordinator_client.post("/agents/register", json=sample_agent_data)
response = await coordinator_client.post("/agents/discover", json={"agent_type": "worker"})
coordinator_client.post("/agents/register", json=sample_agent_data)
response = coordinator_client.post("/agents/discover", json={"agent_type": "worker"})
assert response.status_code == 200
@pytest.mark.asyncio
async def test_discover_empty_result(self, coordinator_client: httpx.AsyncClient):
def test_discover_empty_result(self, coordinator_client: TestClient):
"""Test discovering with no results."""
response = await coordinator_client.post("/agents/discover", json={"agent_type": "nonexistent"})
response = coordinator_client.post("/agents/discover", json={"agent_type": "nonexistent"})
assert response.status_code == 200
data = response.json()
assert len(data.get("agents", [])) == 0
@@ -111,34 +110,30 @@ class TestAgentDiscovery:
class TestAgentStatus:
"""Test agent status endpoints."""
@pytest.mark.asyncio
async def test_get_agent_info(self, coordinator_client: httpx.AsyncClient, sample_agent_data: Dict[str, Any]):
def test_get_agent_info(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
"""Test getting agent information."""
await coordinator_client.post("/agents/register", json=sample_agent_data)
response = await coordinator_client.get(f"/agents/{sample_agent_data['agent_id']}")
coordinator_client.post("/agents/register", json=sample_agent_data)
response = coordinator_client.get(f"/agents/{sample_agent_data['agent_id']}")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert data["agent"]["agent_id"] == sample_agent_data["agent_id"]
@pytest.mark.asyncio
async def test_get_agent_not_found(self, coordinator_client: httpx.AsyncClient):
def test_get_agent_not_found(self, coordinator_client: TestClient):
"""Test getting non-existent agent."""
response = await coordinator_client.get("/agents/nonexistent-agent")
response = coordinator_client.get("/agents/nonexistent-agent")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_update_agent_status(self, coordinator_client: httpx.AsyncClient, sample_agent_data: Dict[str, Any]):
def test_update_agent_status(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
"""Test updating agent status."""
await coordinator_client.post("/agents/register", json=sample_agent_data)
response = await coordinator_client.put(f"/agents/{sample_agent_data['agent_id']}/status", json={"status": "inactive"})
coordinator_client.post("/agents/register", json=sample_agent_data)
response = coordinator_client.put(f"/agents/{sample_agent_data['agent_id']}/status", json={"status": "inactive"})
assert response.status_code == 200
@pytest.mark.asyncio
async def test_update_agent_status_invalid(self, coordinator_client: httpx.AsyncClient, sample_agent_data: Dict[str, Any]):
def test_update_agent_status_invalid(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
"""Test updating agent status with invalid data."""
await coordinator_client.post("/agents/register", json=sample_agent_data)
response = await coordinator_client.put(f"/agents/{sample_agent_data['agent_id']}/status", json={"status": "invalid"})
coordinator_client.post("/agents/register", json=sample_agent_data)
response = coordinator_client.put(f"/agents/{sample_agent_data['agent_id']}/status", json={"status": "invalid"})
# API returns 500 for invalid status (not 422)
assert response.status_code in (400, 422, 500)
@@ -146,29 +141,26 @@ class TestAgentStatus:
class TestTaskDistribution:
"""Test task distribution endpoints."""
@pytest.mark.asyncio
async def test_submit_task_success(self, coordinator_client: httpx.AsyncClient, sample_task_data: Dict[str, Any]):
def test_submit_task_success(self, coordinator_client: TestClient, sample_task_data: Dict[str, Any]):
"""Test successful task submission."""
response = await coordinator_client.post("/tasks/submit", json=sample_task_data)
response = coordinator_client.post("/tasks/submit", json=sample_task_data)
assert response.status_code in (200, 201)
data = response.json()
assert "task_id" in data or "status" in data
@pytest.mark.asyncio
async def test_submit_task_invalid_priority(self, coordinator_client: httpx.AsyncClient):
def test_submit_task_invalid_priority(self, coordinator_client: TestClient):
"""Test task submission with invalid priority."""
invalid_data = {
"task_data": {"model": "llama2"},
"priority": "invalid"
}
response = await coordinator_client.post("/tasks/submit", json=invalid_data)
response = coordinator_client.post("/tasks/submit", json=invalid_data)
# API returns 400 for invalid priority (not 422)
assert response.status_code in (400, 422)
@pytest.mark.asyncio
async def test_task_distribution_stats(self, coordinator_client: httpx.AsyncClient):
def test_task_distribution_stats(self, coordinator_client: TestClient):
"""Test getting task distribution statistics."""
response = await coordinator_client.get("/tasks/status")
response = coordinator_client.get("/tasks/status")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
@@ -178,20 +170,18 @@ class TestTaskDistribution:
assert "load_balancer_stats" in data["stats"]
assert "active_agents" in data["stats"]["load_balancer_stats"]
@pytest.mark.asyncio
async def test_task_assignment_with_active_agent(self, coordinator_client: httpx.AsyncClient, sample_agent_data: Dict[str, Any], sample_task_data: Dict[str, Any]):
def test_task_assignment_with_active_agent(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any], sample_task_data: Dict[str, Any]):
"""Test task assignment with active agent."""
await coordinator_client.post("/agents/register", json=sample_agent_data)
await coordinator_client.put(f"/agents/{sample_agent_data['agent_id']}/status", json={"status": "active"})
response = await coordinator_client.post("/tasks/submit", json=sample_task_data)
coordinator_client.post("/agents/register", json=sample_agent_data)
coordinator_client.put(f"/agents/{sample_agent_data['agent_id']}/status", json={"status": "active"})
response = coordinator_client.post("/tasks/submit", json=sample_task_data)
assert response.status_code in (200, 201)
class TestLoadBalancing:
"""Test load balancing strategies."""
@pytest.mark.asyncio
async def test_least_connections_strategy(self, coordinator_client: httpx.AsyncClient):
def test_least_connections_strategy(self, coordinator_client: TestClient):
"""Test least connections strategy."""
agents = []
for i in range(3):
@@ -202,33 +192,31 @@ class TestLoadBalancing:
"services": ["task-execution"],
"endpoints": {"http": f"http://localhost:{9002+i}"}
}
await coordinator_client.post("/agents/register", json=agent_data)
coordinator_client.post("/agents/register", json=agent_data)
agents.append(agent_data)
for agent in agents:
await coordinator_client.put(f"/agents/{agent['agent_id']}/status", json={"status": "active"})
coordinator_client.put(f"/agents/{agent['agent_id']}/status", json={"status": "active"})
task_data = {
"task_data": {"model": "llama2", "prompt": "test"},
"priority": "normal"
}
response = await coordinator_client.post("/tasks/submit", json=task_data)
response = coordinator_client.post("/tasks/submit", json=task_data)
assert response.status_code in (200, 201)
@pytest.mark.asyncio
async def test_no_eligible_agents(self, coordinator_client: httpx.AsyncClient, sample_task_data: Dict[str, Any]):
def test_no_eligible_agents(self, coordinator_client: TestClient, sample_task_data: Dict[str, Any]):
"""Test task submission with no eligible agents."""
response = await coordinator_client.post("/tasks/submit", json=sample_task_data)
response = coordinator_client.post("/tasks/submit", json=sample_task_data)
assert response.status_code in (200, 201, 503)
class TestQueueManagement:
"""Test queue management endpoints."""
@pytest.mark.asyncio
async def test_get_queue_sizes(self, coordinator_client: httpx.AsyncClient):
def test_get_queue_sizes(self, coordinator_client: TestClient):
"""Test getting queue sizes."""
response = await coordinator_client.get("/tasks/queues")
response = coordinator_client.get("/tasks/queues")
# Queue endpoints might not be registered in running coordinator
if response.status_code == 404:
pytest.skip("Queue endpoints not registered in running coordinator")
@@ -237,27 +225,24 @@ class TestQueueManagement:
assert data["status"] == "success"
assert "queue_sizes" in data
@pytest.mark.asyncio
async def test_clear_queue(self, coordinator_client: httpx.AsyncClient, sample_task_data: Dict[str, Any]):
def test_clear_queue(self, coordinator_client: TestClient, sample_task_data: Dict[str, Any]):
"""Test clearing a queue."""
# First submit a task to have something in the queue
await coordinator_client.post("/tasks/submit", json=sample_task_data)
response = await coordinator_client.post("/tasks/queues/normal/clear")
coordinator_client.post("/tasks/submit", json=sample_task_data)
response = coordinator_client.post("/tasks/queues/normal/clear")
if response.status_code == 404:
pytest.skip("Queue endpoints not registered in running coordinator")
assert response.status_code in (200, 204)
@pytest.mark.asyncio
async def test_clear_invalid_queue(self, coordinator_client: httpx.AsyncClient):
def test_clear_invalid_queue(self, coordinator_client: TestClient):
"""Test clearing invalid queue."""
response = await coordinator_client.post("/tasks/queues/invalid/clear")
response = coordinator_client.post("/tasks/queues/invalid/clear")
# API returns 400 for invalid priority (not 404)
assert response.status_code in (400, 404)
@pytest.mark.asyncio
async def test_get_queue_stats(self, coordinator_client: httpx.AsyncClient):
def test_get_queue_stats(self, coordinator_client: TestClient):
"""Test getting queue statistics."""
response = await coordinator_client.get("/tasks/queues/stats")
response = coordinator_client.get("/tasks/queues/stats")
if response.status_code == 404:
pytest.skip("Queue endpoints not registered in running coordinator")
assert response.status_code == 200
@@ -269,32 +254,29 @@ class TestQueueManagement:
class TestHeartbeat:
"""Test agent heartbeat endpoint."""
@pytest.mark.asyncio
async def test_agent_heartbeat(self, coordinator_client: httpx.AsyncClient, sample_agent_data: Dict[str, Any]):
def test_agent_heartbeat(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
"""Test agent heartbeat."""
# Register agent first
await coordinator_client.post("/agents/register", json=sample_agent_data)
response = await coordinator_client.post(f"/agents/{sample_agent_data['agent_id']}/heartbeat")
coordinator_client.post("/agents/register", json=sample_agent_data)
response = coordinator_client.post(f"/agents/{sample_agent_data['agent_id']}/heartbeat")
if response.status_code == 404:
pytest.skip("Heartbeat endpoint not registered in running coordinator")
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
@pytest.mark.asyncio
async def test_heartbeat_nonexistent_agent(self, coordinator_client: httpx.AsyncClient):
def test_heartbeat_nonexistent_agent(self, coordinator_client: TestClient):
"""Test heartbeat for non-existent agent."""
response = await coordinator_client.post("/agents/nonexistent/heartbeat")
response = coordinator_client.post("/agents/nonexistent/heartbeat")
assert response.status_code == 404
class TestHealthCheck:
"""Test health check endpoint."""
@pytest.mark.asyncio
async def test_health_check(self, coordinator_client: httpx.AsyncClient):
def test_health_check(self, coordinator_client: TestClient):
"""Test health check."""
response = await coordinator_client.get("/health")
response = coordinator_client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"