"""
Comprehensive security tests for AITBC
"""
import pytest
import json
import hashlib
import hmac
import time
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
from fastapi.testclient import TestClient
from web3 import Web3
@pytest.mark.security
class TestAuthenticationSecurity:
"""Test authentication security measures"""
def test_password_strength_validation(self, coordinator_client):
"""Test password strength requirements"""
weak_passwords = [
"123456",
"password",
"qwerty",
"abc123",
"password123",
"Aa1!" # Too short
]
for password in weak_passwords:
response = coordinator_client.post(
"/v1/auth/register",
json={
"email": "test@example.com",
"password": password,
"organization": "Test Org"
}
)
assert response.status_code == 400
assert "password too weak" in response.json()["detail"].lower()
def test_account_lockout_after_failed_attempts(self, coordinator_client):
"""Test account lockout after multiple failed attempts"""
email = "lockout@test.com"
# Attempt 5 failed logins
for i in range(5):
response = coordinator_client.post(
"/v1/auth/login",
json={
"email": email,
"password": f"wrong_password_{i}"
}
)
assert response.status_code == 401
# 6th attempt should lock account
response = coordinator_client.post(
"/v1/auth/login",
json={
"email": email,
"password": "correct_password"
}
)
assert response.status_code == 423
assert "account locked" in response.json()["detail"].lower()
def test_session_timeout(self, coordinator_client):
"""Test session timeout functionality"""
# Login
response = coordinator_client.post(
"/v1/auth/login",
json={
"email": "session@test.com",
"password": "SecurePass123!"
}
)
token = response.json()["access_token"]
# Use expired session
with patch('time.time') as mock_time:
mock_time.return_value = time.time() + 3600 * 25 # 25 hours later
response = coordinator_client.get(
"/v1/jobs",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 401
assert "session expired" in response.json()["detail"].lower()
def test_jwt_token_validation(self, coordinator_client):
"""Test JWT token validation"""
# Test malformed token
response = coordinator_client.get(
"/v1/jobs",
headers={"Authorization": "Bearer invalid.jwt.token"}
)
assert response.status_code == 401
# Test token with invalid signature
header = {"alg": "HS256", "typ": "JWT"}
payload = {"sub": "user123", "exp": time.time() + 3600}
# Create token with wrong secret
token_parts = [
json.dumps(header).encode(),
json.dumps(payload).encode()
]
encoded = [base64.urlsafe_b64encode(part).rstrip(b'=') for part in token_parts]
signature = hmac.digest(b"wrong_secret", b".".join(encoded), hashlib.sha256)
encoded.append(base64.urlsafe_b64encode(signature).rstrip(b'='))
invalid_token = b".".join(encoded).decode()
response = coordinator_client.get(
"/v1/jobs",
headers={"Authorization": f"Bearer {invalid_token}"}
)
assert response.status_code == 401
@pytest.mark.security
class TestAuthorizationSecurity:
"""Test authorization and access control"""
def test_tenant_data_isolation(self, coordinator_client):
"""Test strict tenant data isolation"""
# Create job for tenant A
response = coordinator_client.post(
"/v1/jobs",
json={"job_type": "test", "parameters": {}},
headers={"X-Tenant-ID": "tenant-a"}
)
job_id = response.json()["id"]
# Try to access with tenant B's context
response = coordinator_client.get(
f"/v1/jobs/{job_id}",
headers={"X-Tenant-ID": "tenant-b"}
)
assert response.status_code == 404
# Try to access with no tenant
response = coordinator_client.get(f"/v1/jobs/{job_id}")
assert response.status_code == 401
# Try to modify with wrong tenant
response = coordinator_client.patch(
f"/v1/jobs/{job_id}",
json={"status": "completed"},
headers={"X-Tenant-ID": "tenant-b"}
)
assert response.status_code == 404
def test_role_based_access_control(self, coordinator_client):
"""Test RBAC permissions"""
# Test with viewer role (read-only)
viewer_token = "viewer_jwt_token"
response = coordinator_client.get(
"/v1/jobs",
headers={"Authorization": f"Bearer {viewer_token}"}
)
assert response.status_code == 200
# Viewer cannot create jobs
response = coordinator_client.post(
"/v1/jobs",
json={"job_type": "test"},
headers={"Authorization": f"Bearer {viewer_token}"}
)
assert response.status_code == 403
assert "insufficient permissions" in response.json()["detail"].lower()
# Test with admin role
admin_token = "admin_jwt_token"
response = coordinator_client.post(
"/v1/jobs",
json={"job_type": "test"},
headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 201
def test_api_key_security(self, coordinator_client):
"""Test API key authentication"""
# Test without API key
response = coordinator_client.get("/v1/api-keys")
assert response.status_code == 401
# Test with invalid API key
response = coordinator_client.get(
"/v1/api-keys",
headers={"X-API-Key": "invalid_key_123"}
)
assert response.status_code == 401
# Test with valid API key
response = coordinator_client.get(
"/v1/api-keys",
headers={"X-API-Key": "valid_key_456"}
)
assert response.status_code == 200
@pytest.mark.security
class TestInputValidationSecurity:
"""Test input validation and sanitization"""
def test_sql_injection_prevention(self, coordinator_client):
"""Test SQL injection protection"""
malicious_inputs = [
"'; DROP TABLE jobs; --",
"' OR '1'='1",
"1; DELETE FROM users WHERE '1'='1",
"'; INSERT INTO jobs VALUES ('hack'); --",
"' UNION SELECT * FROM users --"
]
for payload in malicious_inputs:
# Test in job ID parameter
response = coordinator_client.get(f"/v1/jobs/{payload}")
assert response.status_code == 404
assert response.status_code != 500
# Test in query parameters
response = coordinator_client.get(
f"/v1/jobs?search={payload}"
)
assert response.status_code != 500
# Test in JSON body
response = coordinator_client.post(
"/v1/jobs",
json={"job_type": payload, "parameters": {}}
)
assert response.status_code == 422
def test_xss_prevention(self, coordinator_client):
"""Test XSS protection"""
xss_payloads = [
"",
"javascript:alert('xss')",
"
",
"';alert('xss');//",
"