Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Successful in 2s
Deploy to Testnet / deploy-testnet (push) Successful in 1m4s
Multi-Node Stress Testing / stress-test (push) Successful in 2s
Node Failover Simulation / failover-test (push) Successful in 2s
Python Tests / test-python (push) Has been cancelled
- Add TestAuthentication class with login, token refresh, and token validation tests - Add TestAlerts class with unauthorized access tests for alerts, stats, rules, SLA, and system status endpoints - Add TestUsers class with unauthorized access tests for user role management and permissions endpoints - Add TestConsensus class with tests for node registration, proposal creation, statistics, algorithm configuration, and advanced features status
458 lines
20 KiB
Python
458 lines
20 KiB
Python
"""Integration tests for AITBC Agent Coordinator service."""
|
|
|
|
import pytest
|
|
from typing import Dict, Any, Generator
|
|
from starlette.testclient import TestClient
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add the agent-coordinator source to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "apps" / "agent-coordinator" / "src"))
|
|
|
|
from app.main import create_app
|
|
|
|
|
|
@pytest.fixture
|
|
def coordinator_client() -> Generator[TestClient, None, None]:
|
|
"""Create a test client for coordinator API."""
|
|
app = create_app()
|
|
with TestClient(app) as client:
|
|
yield client
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_agent_data() -> Dict[str, Any]:
|
|
"""Sample agent registration data."""
|
|
return {
|
|
"agent_id": "test-integration-agent",
|
|
"agent_type": "worker",
|
|
"capabilities": ["data-processing", "analysis"],
|
|
"services": ["task-execution"],
|
|
"endpoints": {"http": "http://localhost:9002"},
|
|
"metadata": {"version": "1.0.0", "test": True}
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_task_data() -> Dict[str, Any]:
|
|
"""Sample task submission data."""
|
|
return {
|
|
"task_data": {
|
|
"model": "llama2",
|
|
"prompt": "test prompt"
|
|
},
|
|
"priority": "normal",
|
|
"requirements": {}
|
|
}
|
|
|
|
|
|
class TestAgentRegistration:
|
|
"""Test agent registration endpoints."""
|
|
|
|
def test_register_agent_success(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
|
|
"""Test successful agent registration."""
|
|
response = coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
assert response.status_code in (200, 201)
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert data["agent_id"] == sample_agent_data["agent_id"]
|
|
|
|
def test_register_agent_duplicate(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
|
|
"""Test registering duplicate agent."""
|
|
coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
response = coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
assert response.status_code in (200, 201, 409)
|
|
|
|
def test_register_agent_invalid_data(self, coordinator_client: TestClient):
|
|
"""Test registration with invalid data."""
|
|
invalid_data = {"agent_id": "invalid"}
|
|
response = coordinator_client.post("/agents/register", json=invalid_data)
|
|
assert response.status_code == 422
|
|
|
|
def test_register_agent_missing_agent_id(self, coordinator_client: TestClient):
|
|
"""Test registration without agent ID."""
|
|
invalid_data = {"agent_type": "worker"}
|
|
response = coordinator_client.post("/agents/register", json=invalid_data)
|
|
assert response.status_code == 422
|
|
|
|
|
|
class TestAgentDiscovery:
|
|
"""Test agent discovery endpoints."""
|
|
|
|
def test_discover_all_agents(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
|
|
"""Test discovering all agents."""
|
|
coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
response = coordinator_client.post("/agents/discover", json={})
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "agents" in data
|
|
|
|
def test_discover_by_status(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
|
|
"""Test discovering agents by status."""
|
|
coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
response = coordinator_client.post("/agents/discover", json={"status": "active"})
|
|
assert response.status_code == 200
|
|
|
|
def test_discover_by_type(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
|
|
"""Test discovering agents by type."""
|
|
coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
response = coordinator_client.post("/agents/discover", json={"agent_type": "worker"})
|
|
assert response.status_code == 200
|
|
|
|
def test_discover_empty_result(self, coordinator_client: TestClient):
|
|
"""Test discovering with no results."""
|
|
response = coordinator_client.post("/agents/discover", json={"agent_type": "nonexistent"})
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data.get("agents", [])) == 0
|
|
|
|
|
|
class TestAgentStatus:
|
|
"""Test agent status endpoints."""
|
|
|
|
def test_get_agent_info(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
|
|
"""Test getting agent information."""
|
|
coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
response = coordinator_client.get(f"/agents/{sample_agent_data['agent_id']}")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert data["agent"]["agent_id"] == sample_agent_data["agent_id"]
|
|
|
|
def test_get_agent_not_found(self, coordinator_client: TestClient):
|
|
"""Test getting non-existent agent."""
|
|
response = coordinator_client.get("/agents/nonexistent-agent")
|
|
assert response.status_code == 404
|
|
|
|
def test_update_agent_status(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
|
|
"""Test updating agent status."""
|
|
coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
response = coordinator_client.put(f"/agents/{sample_agent_data['agent_id']}/status", json={"status": "inactive"})
|
|
assert response.status_code == 200
|
|
|
|
def test_update_agent_status_invalid(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
|
|
"""Test updating agent status with invalid data."""
|
|
coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
response = coordinator_client.put(f"/agents/{sample_agent_data['agent_id']}/status", json={"status": "invalid"})
|
|
# API returns 500 for invalid status (not 422)
|
|
assert response.status_code in (400, 422, 500)
|
|
|
|
|
|
class TestTaskDistribution:
|
|
"""Test task distribution endpoints."""
|
|
|
|
def test_submit_task_success(self, coordinator_client: TestClient, sample_task_data: Dict[str, Any]):
|
|
"""Test successful task submission."""
|
|
response = coordinator_client.post("/tasks/submit", json=sample_task_data)
|
|
assert response.status_code in (200, 201)
|
|
data = response.json()
|
|
assert "task_id" in data or "status" in data
|
|
|
|
def test_submit_task_invalid_priority(self, coordinator_client: TestClient):
|
|
"""Test task submission with invalid priority."""
|
|
invalid_data = {
|
|
"task_data": {"model": "llama2"},
|
|
"priority": "invalid"
|
|
}
|
|
response = coordinator_client.post("/tasks/submit", json=invalid_data)
|
|
# API returns 400 for invalid priority (not 422)
|
|
assert response.status_code in (400, 422)
|
|
|
|
def test_task_distribution_stats(self, coordinator_client: TestClient):
|
|
"""Test getting task distribution statistics."""
|
|
response = coordinator_client.get("/tasks/status")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert "stats" in data
|
|
# tasks_distributed is inside stats
|
|
assert "tasks_distributed" in data["stats"]
|
|
assert "load_balancer_stats" in data["stats"]
|
|
assert "active_agents" in data["stats"]["load_balancer_stats"]
|
|
|
|
def test_task_assignment_with_active_agent(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any], sample_task_data: Dict[str, Any]):
|
|
"""Test task assignment with active agent."""
|
|
coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
coordinator_client.put(f"/agents/{sample_agent_data['agent_id']}/status", json={"status": "active"})
|
|
response = coordinator_client.post("/tasks/submit", json=sample_task_data)
|
|
assert response.status_code in (200, 201)
|
|
|
|
|
|
class TestLoadBalancing:
|
|
"""Test load balancing strategies."""
|
|
|
|
def test_least_connections_strategy(self, coordinator_client: TestClient):
|
|
"""Test least connections strategy."""
|
|
agents = []
|
|
for i in range(3):
|
|
agent_data = {
|
|
"agent_id": f"test-agent-{i}",
|
|
"agent_type": "worker",
|
|
"capabilities": ["data-processing"],
|
|
"services": ["task-execution"],
|
|
"endpoints": {"http": f"http://localhost:{9002+i}"}
|
|
}
|
|
coordinator_client.post("/agents/register", json=agent_data)
|
|
agents.append(agent_data)
|
|
|
|
for agent in agents:
|
|
coordinator_client.put(f"/agents/{agent['agent_id']}/status", json={"status": "active"})
|
|
|
|
task_data = {
|
|
"task_data": {"model": "llama2", "prompt": "test"},
|
|
"priority": "normal"
|
|
}
|
|
response = coordinator_client.post("/tasks/submit", json=task_data)
|
|
assert response.status_code in (200, 201)
|
|
|
|
def test_no_eligible_agents(self, coordinator_client: TestClient, sample_task_data: Dict[str, Any]):
|
|
"""Test task submission with no eligible agents."""
|
|
response = coordinator_client.post("/tasks/submit", json=sample_task_data)
|
|
assert response.status_code in (200, 201, 503)
|
|
|
|
|
|
class TestQueueManagement:
|
|
"""Test queue management endpoints."""
|
|
|
|
def test_get_queue_sizes(self, coordinator_client: TestClient):
|
|
"""Test getting queue sizes."""
|
|
response = coordinator_client.get("/tasks/queues")
|
|
# Queue endpoints might not be registered in running coordinator
|
|
if response.status_code == 404:
|
|
pytest.skip("Queue endpoints not registered in running coordinator")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert "queue_sizes" in data
|
|
|
|
def test_clear_queue(self, coordinator_client: TestClient, sample_task_data: Dict[str, Any]):
|
|
"""Test clearing a queue."""
|
|
# First submit a task to have something in the queue
|
|
coordinator_client.post("/tasks/submit", json=sample_task_data)
|
|
response = coordinator_client.post("/tasks/queues/normal/clear")
|
|
if response.status_code == 404:
|
|
pytest.skip("Queue endpoints not registered in running coordinator")
|
|
assert response.status_code in (200, 204)
|
|
|
|
def test_clear_invalid_queue(self, coordinator_client: TestClient):
|
|
"""Test clearing invalid queue."""
|
|
response = coordinator_client.post("/tasks/queues/invalid/clear")
|
|
# API returns 400 for invalid priority (not 404)
|
|
assert response.status_code in (400, 404)
|
|
|
|
def test_get_queue_stats(self, coordinator_client: TestClient):
|
|
"""Test getting queue statistics."""
|
|
response = coordinator_client.get("/tasks/queues/stats")
|
|
if response.status_code == 404:
|
|
pytest.skip("Queue endpoints not registered in running coordinator")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert "queue_sizes" in data
|
|
|
|
|
|
class TestHeartbeat:
|
|
"""Test agent heartbeat endpoint."""
|
|
|
|
def test_agent_heartbeat(self, coordinator_client: TestClient, sample_agent_data: Dict[str, Any]):
|
|
"""Test agent heartbeat."""
|
|
# Register agent first
|
|
coordinator_client.post("/agents/register", json=sample_agent_data)
|
|
response = coordinator_client.post(f"/agents/{sample_agent_data['agent_id']}/heartbeat")
|
|
if response.status_code == 404:
|
|
pytest.skip("Heartbeat endpoint not registered in running coordinator")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
|
|
def test_heartbeat_nonexistent_agent(self, coordinator_client: TestClient):
|
|
"""Test heartbeat for non-existent agent."""
|
|
response = coordinator_client.post("/agents/nonexistent/heartbeat")
|
|
assert response.status_code == 404
|
|
|
|
|
|
class TestHealthCheck:
|
|
"""Test health check endpoint."""
|
|
|
|
def test_health_check(self, coordinator_client: TestClient):
|
|
"""Test health check."""
|
|
response = coordinator_client.get("/health")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "healthy"
|
|
|
|
|
|
class TestAuthentication:
|
|
"""Test authentication endpoints."""
|
|
|
|
def test_login_admin_success(self, coordinator_client: TestClient):
|
|
"""Test successful admin login."""
|
|
login_data = {"username": "admin", "password": "admin123"}
|
|
response = coordinator_client.post("/auth/login", json=login_data)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert data["username"] == "admin"
|
|
assert data["role"] == "admin"
|
|
assert "access_token" in data
|
|
assert "refresh_token" in data
|
|
|
|
def test_login_invalid_credentials(self, coordinator_client: TestClient):
|
|
"""Test login with invalid credentials."""
|
|
login_data = {"username": "admin", "password": "wrongpassword"}
|
|
response = coordinator_client.post("/auth/login", json=login_data)
|
|
assert response.status_code == 401
|
|
|
|
def test_login_missing_fields(self, coordinator_client: TestClient):
|
|
"""Test login with missing username or password."""
|
|
login_data = {"username": "admin"}
|
|
response = coordinator_client.post("/auth/login", json=login_data)
|
|
assert response.status_code == 422
|
|
|
|
def test_refresh_token_success(self, coordinator_client: TestClient):
|
|
"""Test successful token refresh."""
|
|
# First login to get a refresh token
|
|
login_data = {"username": "admin", "password": "admin123"}
|
|
login_response = coordinator_client.post("/auth/login", json=login_data)
|
|
refresh_token = login_response.json()["refresh_token"]
|
|
|
|
# Now refresh the token
|
|
refresh_data = {"refresh_token": refresh_token}
|
|
response = coordinator_client.post("/auth/refresh", json=refresh_data)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert "access_token" in data
|
|
|
|
def test_validate_token_success(self, coordinator_client: TestClient):
|
|
"""Test successful token validation."""
|
|
# First login to get a token
|
|
login_data = {"username": "admin", "password": "admin123"}
|
|
login_response = coordinator_client.post("/auth/login", json=login_data)
|
|
token = login_response.json()["access_token"]
|
|
|
|
# Now validate the token
|
|
validate_data = {"token": token}
|
|
response = coordinator_client.post("/auth/validate", json=validate_data)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["valid"] is True
|
|
|
|
def test_validate_token_invalid(self, coordinator_client: TestClient):
|
|
"""Test validation with invalid token."""
|
|
validate_data = {"token": "invalid_token"}
|
|
response = coordinator_client.post("/auth/validate", json=validate_data)
|
|
assert response.status_code == 401
|
|
|
|
|
|
class TestAlerts:
|
|
"""Test alerting endpoints."""
|
|
|
|
def test_get_alerts_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test getting alerts without authentication."""
|
|
response = coordinator_client.get("/alerts")
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_get_alert_stats_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test getting alert stats without authentication."""
|
|
response = coordinator_client.get("/alerts/stats")
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_get_alert_rules_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test getting alert rules without authentication."""
|
|
response = coordinator_client.get("/alerts/rules")
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_get_sla_status_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test getting SLA status without authentication."""
|
|
response = coordinator_client.get("/sla")
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_get_system_status_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test getting system status without authentication."""
|
|
response = coordinator_client.get("/system/status")
|
|
assert response.status_code in (401, 403)
|
|
|
|
|
|
class TestUsers:
|
|
"""Test user management endpoints."""
|
|
|
|
def test_assign_user_role_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test assigning user role without authentication."""
|
|
response = coordinator_client.post("/users/test_user/role", json={"role": "admin"})
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_get_user_role_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test getting user role without authentication."""
|
|
response = coordinator_client.get("/users/test_user/role")
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_get_user_permissions_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test getting user permissions without authentication."""
|
|
response = coordinator_client.get("/users/test_user/permissions")
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_list_roles_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test listing roles without authentication."""
|
|
response = coordinator_client.get("/roles")
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_get_role_permissions_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test getting role permissions without authentication."""
|
|
response = coordinator_client.get("/roles/admin")
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_protected_admin_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test protected admin endpoint without authentication."""
|
|
response = coordinator_client.get("/protected/admin")
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_protected_operator_unauthorized(self, coordinator_client: TestClient):
|
|
"""Test protected operator endpoint without authentication."""
|
|
response = coordinator_client.get("/protected/operator")
|
|
assert response.status_code in (401, 403)
|
|
|
|
|
|
class TestConsensus:
|
|
"""Test consensus endpoints."""
|
|
|
|
def test_register_consensus_node(self, coordinator_client: TestClient):
|
|
"""Test registering a consensus node."""
|
|
node_data = {
|
|
"node_id": "test-node-001",
|
|
"address": "http://localhost:9003",
|
|
"stake": 1000
|
|
}
|
|
response = coordinator_client.post("/consensus/node/register", json=node_data)
|
|
# Should work or return appropriate error
|
|
assert response.status_code in (200, 201, 500)
|
|
|
|
def test_create_consensus_proposal(self, coordinator_client: TestClient):
|
|
"""Test creating a consensus proposal."""
|
|
proposal_data = {
|
|
"proposal_id": "prop-001",
|
|
"proposer": "test-node-001",
|
|
"content": {"action": "upgrade", "version": "2.0"}
|
|
}
|
|
response = coordinator_client.post("/consensus/proposal/create", json=proposal_data)
|
|
# Should work or return appropriate error
|
|
assert response.status_code in (200, 201, 500)
|
|
|
|
def test_get_consensus_statistics(self, coordinator_client: TestClient):
|
|
"""Test getting consensus statistics."""
|
|
response = coordinator_client.get("/consensus/statistics")
|
|
# Should work or return appropriate error
|
|
assert response.status_code in (200, 500)
|
|
|
|
def test_set_consensus_algorithm(self, coordinator_client: TestClient):
|
|
"""Test setting consensus algorithm."""
|
|
response = coordinator_client.put("/consensus/algorithm", params={"algorithm": "majority_vote"})
|
|
# Should work or return appropriate error
|
|
assert response.status_code in (200, 500)
|
|
|
|
def test_get_advanced_features_status(self, coordinator_client: TestClient):
|
|
"""Test getting advanced features status."""
|
|
response = coordinator_client.get("/advanced-features/status")
|
|
# Should work or return appropriate error
|
|
assert response.status_code in (200, 500)
|