""" Comprehensive security tests for AITBC """ import pytest import json import hashlib import hmac import time from datetime import datetime, timedelta from unittest.mock import Mock, patch from fastapi.testclient import TestClient from web3 import Web3 @pytest.mark.security class TestAuthenticationSecurity: """Test authentication security measures""" def test_password_strength_validation(self, coordinator_client): """Test password strength requirements""" weak_passwords = [ "123456", "password", "qwerty", "abc123", "password123", "Aa1!" # Too short ] for password in weak_passwords: response = coordinator_client.post( "/v1/auth/register", json={ "email": "test@example.com", "password": password, "organization": "Test Org" } ) assert response.status_code == 400 assert "password too weak" in response.json()["detail"].lower() def test_account_lockout_after_failed_attempts(self, coordinator_client): """Test account lockout after multiple failed attempts""" email = "lockout@test.com" # Attempt 5 failed logins for i in range(5): response = coordinator_client.post( "/v1/auth/login", json={ "email": email, "password": f"wrong_password_{i}" } ) assert response.status_code == 401 # 6th attempt should lock account response = coordinator_client.post( "/v1/auth/login", json={ "email": email, "password": "correct_password" } ) assert response.status_code == 423 assert "account locked" in response.json()["detail"].lower() def test_session_timeout(self, coordinator_client): """Test session timeout functionality""" # Login response = coordinator_client.post( "/v1/auth/login", json={ "email": "session@test.com", "password": "SecurePass123!" } ) token = response.json()["access_token"] # Use expired session with patch('time.time') as mock_time: mock_time.return_value = time.time() + 3600 * 25 # 25 hours later response = coordinator_client.get( "/v1/jobs", headers={"Authorization": f"Bearer {token}"} ) assert response.status_code == 401 assert "session expired" in response.json()["detail"].lower() def test_jwt_token_validation(self, coordinator_client): """Test JWT token validation""" # Test malformed token response = coordinator_client.get( "/v1/jobs", headers={"Authorization": "Bearer invalid.jwt.token"} ) assert response.status_code == 401 # Test token with invalid signature header = {"alg": "HS256", "typ": "JWT"} payload = {"sub": "user123", "exp": time.time() + 3600} # Create token with wrong secret token_parts = [ json.dumps(header).encode(), json.dumps(payload).encode() ] encoded = [base64.urlsafe_b64encode(part).rstrip(b'=') for part in token_parts] signature = hmac.digest(b"wrong_secret", b".".join(encoded), hashlib.sha256) encoded.append(base64.urlsafe_b64encode(signature).rstrip(b'=')) invalid_token = b".".join(encoded).decode() response = coordinator_client.get( "/v1/jobs", headers={"Authorization": f"Bearer {invalid_token}"} ) assert response.status_code == 401 @pytest.mark.security class TestAuthorizationSecurity: """Test authorization and access control""" def test_tenant_data_isolation(self, coordinator_client): """Test strict tenant data isolation""" # Create job for tenant A response = coordinator_client.post( "/v1/jobs", json={"job_type": "test", "parameters": {}}, headers={"X-Tenant-ID": "tenant-a"} ) job_id = response.json()["id"] # Try to access with tenant B's context response = coordinator_client.get( f"/v1/jobs/{job_id}", headers={"X-Tenant-ID": "tenant-b"} ) assert response.status_code == 404 # Try to access with no tenant response = coordinator_client.get(f"/v1/jobs/{job_id}") assert response.status_code == 401 # Try to modify with wrong tenant response = coordinator_client.patch( f"/v1/jobs/{job_id}", json={"status": "completed"}, headers={"X-Tenant-ID": "tenant-b"} ) assert response.status_code == 404 def test_role_based_access_control(self, coordinator_client): """Test RBAC permissions""" # Test with viewer role (read-only) viewer_token = "viewer_jwt_token" response = coordinator_client.get( "/v1/jobs", headers={"Authorization": f"Bearer {viewer_token}"} ) assert response.status_code == 200 # Viewer cannot create jobs response = coordinator_client.post( "/v1/jobs", json={"job_type": "test"}, headers={"Authorization": f"Bearer {viewer_token}"} ) assert response.status_code == 403 assert "insufficient permissions" in response.json()["detail"].lower() # Test with admin role admin_token = "admin_jwt_token" response = coordinator_client.post( "/v1/jobs", json={"job_type": "test"}, headers={"Authorization": f"Bearer {admin_token}"} ) assert response.status_code == 201 def test_api_key_security(self, coordinator_client): """Test API key authentication""" # Test without API key response = coordinator_client.get("/v1/api-keys") assert response.status_code == 401 # Test with invalid API key response = coordinator_client.get( "/v1/api-keys", headers={"X-API-Key": "invalid_key_123"} ) assert response.status_code == 401 # Test with valid API key response = coordinator_client.get( "/v1/api-keys", headers={"X-API-Key": "valid_key_456"} ) assert response.status_code == 200 @pytest.mark.security class TestInputValidationSecurity: """Test input validation and sanitization""" def test_sql_injection_prevention(self, coordinator_client): """Test SQL injection protection""" malicious_inputs = [ "'; DROP TABLE jobs; --", "' OR '1'='1", "1; DELETE FROM users WHERE '1'='1", "'; INSERT INTO jobs VALUES ('hack'); --", "' UNION SELECT * FROM users --" ] for payload in malicious_inputs: # Test in job ID parameter response = coordinator_client.get(f"/v1/jobs/{payload}") assert response.status_code == 404 assert response.status_code != 500 # Test in query parameters response = coordinator_client.get( f"/v1/jobs?search={payload}" ) assert response.status_code != 500 # Test in JSON body response = coordinator_client.post( "/v1/jobs", json={"job_type": payload, "parameters": {}} ) assert response.status_code == 422 def test_xss_prevention(self, coordinator_client): """Test XSS protection""" xss_payloads = [ "", "javascript:alert('xss')", "", "';alert('xss');//", "" ] for payload in xss_payloads: # Test in job name response = coordinator_client.post( "/v1/jobs", json={ "job_type": "test", "parameters": {}, "name": payload } ) if response.status_code == 201: # Verify XSS is sanitized in response assert "