diff --git a/tests/integration/test_agent_coordinator.py b/tests/integration/test_agent_coordinator.py index 05798053..56505c09 100644 --- a/tests/integration/test_agent_coordinator.py +++ b/tests/integration/test_agent_coordinator.py @@ -2775,3 +2775,180 @@ class TestAdvancedFeatures: # Check advanced features status coordinator_client.get("/advanced-features/status") + + +class TestLoadTesting: + """Load and stress testing with limited agent count.""" + + def test_concurrent_agent_registration(self, coordinator_client: TestClient): + """Test registering 10 agents concurrently.""" + for i in range(10): + agent_data = { + "agent_id": f"load-agent-{i}", + "agent_type": "worker", + "capabilities": ["data-processing", "gpu-compute"], + "services": ["task-execution"], + "endpoints": {"http": f"http://localhost:900{i % 10 + 1}"} + } + response = coordinator_client.post("/agents/register", json=agent_data) + assert response.status_code in (200, 201, 409) + + def test_concurrent_task_submission(self, coordinator_client: TestClient): + """Test submitting tasks to 10 agents under load.""" + # First register agents + for i in range(10): + agent_data = { + "agent_id": f"load-task-agent-{i}", + "agent_type": "worker", + "capabilities": ["data-processing"], + "services": ["task-execution"], + "endpoints": {"http": f"http://localhost:900{i % 10 + 1}"} + } + coordinator_client.post("/agents/register", json=agent_data) + + # Submit tasks concurrently + for i in range(10): + task_data = { + "task_data": {"model": "llama2", "prompt": f"test task {i}"}, + "priority": "normal" + } + response = coordinator_client.post("/tasks/submit", json=task_data) + assert response.status_code in (200, 201, 503) + + def test_concurrent_message_sending(self, coordinator_client: TestClient): + """Test sending messages between 10 agents under load.""" + # Register agents + for i in range(10): + agent_data = { + "agent_id": f"load-msg-agent-{i}", + "agent_type": "worker", + "capabilities": ["communication"], + "services": ["message-handling"], + "endpoints": {"http": f"http://localhost:900{i % 10 + 1}"} + } + coordinator_client.post("/agents/register", json=agent_data) + + # Send messages between agents + for i in range(10): + for j in range(10): + if i != j: + message_data = { + "receiver_id": f"load-msg-agent-{j}", + "message_type": "task", + "priority": "normal", + "protocol": "hierarchical", + "payload": {"from": f"load-msg-agent-{i}"} + } + response = coordinator_client.post("/messages/send", json=message_data) + assert response.status_code in (200, 201, 503) + + def test_load_balancing_under_load(self, coordinator_client: TestClient): + """Test load balancer with 10 agents and multiple tasks.""" + # Register 10 agents + for i in range(10): + agent_data = { + "agent_id": f"load-lb-agent-{i}", + "agent_type": "worker", + "capabilities": ["compute"], + "services": ["task-execution"], + "endpoints": {"http": f"http://localhost:900{i % 10 + 1}"} + } + coordinator_client.post("/agents/register", json=agent_data) + coordinator_client.put(f"/agents/load-lb-agent-{i}/status", json={"status": "active"}) + + # Test different load balancer strategies + strategies = ["round_robin", "least_connections", "resource_based"] + for strategy in strategies: + coordinator_client.put("/load-balancer/strategy", params={"strategy": strategy}) + # Submit tasks under each strategy + for i in range(5): + task_data = { + "task_data": {"model": "llama2", "prompt": f"load test {i}"}, + "priority": "normal" + } + response = coordinator_client.post("/tasks/submit", json=task_data) + assert response.status_code in (200, 201, 503) + + def test_concurrent_agent_discovery(self, coordinator_client: TestClient): + """Test agent discovery with 10 agents registered.""" + # Register 10 agents + for i in range(10): + agent_data = { + "agent_id": f"load-discovery-agent-{i}", + "agent_type": "worker", + "capabilities": ["compute", "storage"], + "services": ["task-execution"], + "endpoints": {"http": f"http://localhost:900{i % 10 + 1}"} + } + coordinator_client.post("/agents/register", json=agent_data) + + # Discover agents with different filters + coordinator_client.post("/agents/discover", json={}) + coordinator_client.post("/agents/discover", json={"agent_type": "worker"}) + coordinator_client.post("/agents/discover", json={"capabilities": ["compute"]}) + coordinator_client.post("/agents/discover", json={"status": "active"}) + + def test_swarm_coordination_under_load(self, coordinator_client: TestClient): + """Test swarm coordination with 10 agents.""" + # Register 10 agents + for i in range(10): + agent_data = { + "agent_id": f"load-swarm-agent-{i}", + "agent_type": "worker", + "capabilities": ["distributed-compute"], + "services": ["coordination"], + "endpoints": {"http": f"http://localhost:900{i % 10 + 1}"} + } + coordinator_client.post("/agents/register", json=agent_data) + + # Join swarm + for i in range(10): + coordinator_client.post("/swarm/join", json={ + "role": "worker", + "capability": "distributed-compute", + "priority": "normal" + }) + + # Coordinate tasks + coordinator_client.post("/swarm/coordinate", json={ + "task": "distributed-task", + "collaborators": 5, + "strategy": "distributed", + "timeout_seconds": 300 + }) + + def test_concurrent_status_updates(self, coordinator_client: TestClient): + """Test concurrent status updates on 10 agents.""" + # Register 10 agents + for i in range(10): + agent_data = { + "agent_id": f"load-status-agent-{i}", + "agent_type": "worker", + "capabilities": ["compute"], + "services": ["task-execution"], + "endpoints": {"http": f"http://localhost:900{i % 10 + 1}"} + } + coordinator_client.post("/agents/register", json=agent_data) + + # Update statuses concurrently + statuses = ["active", "inactive", "maintenance", "degraded"] + for i in range(10): + for status in statuses: + response = coordinator_client.put(f"/agents/load-status-agent-{i}/status", json={"status": status}) + assert response.status_code in (200, 500) + + def test_concurrent_auth_operations(self, coordinator_client: TestClient): + """Test concurrent authentication operations.""" + # Test multiple login attempts + for i in range(10): + login_data = {"username": "admin", "password": "admin123"} + response = coordinator_client.post("/auth/login", json=login_data) + assert response.status_code in (200, 401) + + # Test token validation + login_response = coordinator_client.post("/auth/login", json={"username": "admin", "password": "admin123"}) + if login_response.status_code == 200: + token = login_response.json()["access_token"] + for i in range(10): + response = coordinator_client.post("/auth/validate", json={"token": token}) + assert response.status_code in (200, 401)