Add load testing class with concurrent operations tests for 10 agents across registration, tasks, messages, load balancing, discovery, swarm coordination, status updates, and authentication
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Successful in 2s
Deploy to Testnet / deploy-testnet (push) Successful in 1m10s
Multi-Node Stress Testing / stress-test (push) Successful in 2s
Node Failover Simulation / failover-test (push) Successful in 3s
Python Tests / test-python (push) Failing after 1m6s

- Add TestLoadTesting class with 8 load test methods
- Add test_concurrent_agent_registration for 10 agents
- Add test_concurrent_task_submission for task load testing
- Add test_concurrent_message_sending for inter-agent messaging under load
- Add test_load_balancing_under_load testing all strategies with 10 agents
- Add test_concurrent_agent_discovery with multiple filter combinations
- Add test_swarm_
This commit is contained in:
aitbc
2026-05-08 16:33:33 +02:00
parent b6085bdbb2
commit a9000c6708

View File

@@ -2775,3 +2775,180 @@ class TestAdvancedFeatures:
# Check advanced features status # Check advanced features status
coordinator_client.get("/advanced-features/status") coordinator_client.get("/advanced-features/status")
class TestLoadTesting:
"""Load and stress testing with limited agent count."""
def test_concurrent_agent_registration(self, coordinator_client: TestClient):
"""Test registering 10 agents concurrently."""
for i in range(10):
agent_data = {
"agent_id": f"load-agent-{i}",
"agent_type": "worker",
"capabilities": ["data-processing", "gpu-compute"],
"services": ["task-execution"],
"endpoints": {"http": f"http://localhost:900{i % 10 + 1}"}
}
response = coordinator_client.post("/agents/register", json=agent_data)
assert response.status_code in (200, 201, 409)
def test_concurrent_task_submission(self, coordinator_client: TestClient):
"""Test submitting tasks to 10 agents under load."""
# First register agents
for i in range(10):
agent_data = {
"agent_id": f"load-task-agent-{i}",
"agent_type": "worker",
"capabilities": ["data-processing"],
"services": ["task-execution"],
"endpoints": {"http": f"http://localhost:900{i % 10 + 1}"}
}
coordinator_client.post("/agents/register", json=agent_data)
# Submit tasks concurrently
for i in range(10):
task_data = {
"task_data": {"model": "llama2", "prompt": f"test task {i}"},
"priority": "normal"
}
response = coordinator_client.post("/tasks/submit", json=task_data)
assert response.status_code in (200, 201, 503)
def test_concurrent_message_sending(self, coordinator_client: TestClient):
"""Test sending messages between 10 agents under load."""
# Register agents
for i in range(10):
agent_data = {
"agent_id": f"load-msg-agent-{i}",
"agent_type": "worker",
"capabilities": ["communication"],
"services": ["message-handling"],
"endpoints": {"http": f"http://localhost:900{i % 10 + 1}"}
}
coordinator_client.post("/agents/register", json=agent_data)
# Send messages between agents
for i in range(10):
for j in range(10):
if i != j:
message_data = {
"receiver_id": f"load-msg-agent-{j}",
"message_type": "task",
"priority": "normal",
"protocol": "hierarchical",
"payload": {"from": f"load-msg-agent-{i}"}
}
response = coordinator_client.post("/messages/send", json=message_data)
assert response.status_code in (200, 201, 503)
def test_load_balancing_under_load(self, coordinator_client: TestClient):
"""Test load balancer with 10 agents and multiple tasks."""
# Register 10 agents
for i in range(10):
agent_data = {
"agent_id": f"load-lb-agent-{i}",
"agent_type": "worker",
"capabilities": ["compute"],
"services": ["task-execution"],
"endpoints": {"http": f"http://localhost:900{i % 10 + 1}"}
}
coordinator_client.post("/agents/register", json=agent_data)
coordinator_client.put(f"/agents/load-lb-agent-{i}/status", json={"status": "active"})
# Test different load balancer strategies
strategies = ["round_robin", "least_connections", "resource_based"]
for strategy in strategies:
coordinator_client.put("/load-balancer/strategy", params={"strategy": strategy})
# Submit tasks under each strategy
for i in range(5):
task_data = {
"task_data": {"model": "llama2", "prompt": f"load test {i}"},
"priority": "normal"
}
response = coordinator_client.post("/tasks/submit", json=task_data)
assert response.status_code in (200, 201, 503)
def test_concurrent_agent_discovery(self, coordinator_client: TestClient):
"""Test agent discovery with 10 agents registered."""
# Register 10 agents
for i in range(10):
agent_data = {
"agent_id": f"load-discovery-agent-{i}",
"agent_type": "worker",
"capabilities": ["compute", "storage"],
"services": ["task-execution"],
"endpoints": {"http": f"http://localhost:900{i % 10 + 1}"}
}
coordinator_client.post("/agents/register", json=agent_data)
# Discover agents with different filters
coordinator_client.post("/agents/discover", json={})
coordinator_client.post("/agents/discover", json={"agent_type": "worker"})
coordinator_client.post("/agents/discover", json={"capabilities": ["compute"]})
coordinator_client.post("/agents/discover", json={"status": "active"})
def test_swarm_coordination_under_load(self, coordinator_client: TestClient):
"""Test swarm coordination with 10 agents."""
# Register 10 agents
for i in range(10):
agent_data = {
"agent_id": f"load-swarm-agent-{i}",
"agent_type": "worker",
"capabilities": ["distributed-compute"],
"services": ["coordination"],
"endpoints": {"http": f"http://localhost:900{i % 10 + 1}"}
}
coordinator_client.post("/agents/register", json=agent_data)
# Join swarm
for i in range(10):
coordinator_client.post("/swarm/join", json={
"role": "worker",
"capability": "distributed-compute",
"priority": "normal"
})
# Coordinate tasks
coordinator_client.post("/swarm/coordinate", json={
"task": "distributed-task",
"collaborators": 5,
"strategy": "distributed",
"timeout_seconds": 300
})
def test_concurrent_status_updates(self, coordinator_client: TestClient):
"""Test concurrent status updates on 10 agents."""
# Register 10 agents
for i in range(10):
agent_data = {
"agent_id": f"load-status-agent-{i}",
"agent_type": "worker",
"capabilities": ["compute"],
"services": ["task-execution"],
"endpoints": {"http": f"http://localhost:900{i % 10 + 1}"}
}
coordinator_client.post("/agents/register", json=agent_data)
# Update statuses concurrently
statuses = ["active", "inactive", "maintenance", "degraded"]
for i in range(10):
for status in statuses:
response = coordinator_client.put(f"/agents/load-status-agent-{i}/status", json={"status": status})
assert response.status_code in (200, 500)
def test_concurrent_auth_operations(self, coordinator_client: TestClient):
"""Test concurrent authentication operations."""
# Test multiple login attempts
for i in range(10):
login_data = {"username": "admin", "password": "admin123"}
response = coordinator_client.post("/auth/login", json=login_data)
assert response.status_code in (200, 401)
# Test token validation
login_response = coordinator_client.post("/auth/login", json={"username": "admin", "password": "admin123"})
if login_response.status_code == 200:
token = login_response.json()["access_token"]
for i in range(10):
response = coordinator_client.post("/auth/validate", json={"token": token})
assert response.status_code in (200, 401)