- Changed bare except clauses to specific exception types in web3_utils.py, testing.py, messages.py, and message_storage.py - Replaced print() calls with logger in testing.py, agent_discovery.py, compliance_agent.py, coordinator.py, trading_agent.py, keys.py, escrow.py, persistent_spending_tracker.py, sync_cli.py, and client.py - Added logger initialization using get_logger(__name__) in compliance_agent.py, coordinator.py, trading_agent.py, keys.py, escrow.py, persistent_spending_tracker.py, and client.py - Removed hardcoded secret
160 lines
4.7 KiB
Python
160 lines
4.7 KiB
Python
"""Edge case and error handling tests for plugin security service"""
|
|
|
|
import pytest
|
|
import sys
|
|
import sys
|
|
from pathlib import Path
|
|
from fastapi.testclient import TestClient
|
|
from datetime import datetime
|
|
|
|
|
|
from main import app, SecurityScan, scan_reports, security_policies, scan_queue, vulnerability_database
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_state():
|
|
"""Reset global state before each test"""
|
|
scan_reports.clear()
|
|
security_policies.clear()
|
|
scan_queue.clear()
|
|
vulnerability_database.clear()
|
|
yield
|
|
scan_reports.clear()
|
|
security_policies.clear()
|
|
scan_queue.clear()
|
|
vulnerability_database.clear()
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_security_scan_empty_fields():
|
|
"""Test SecurityScan with empty fields"""
|
|
scan = SecurityScan(
|
|
plugin_id="",
|
|
version="",
|
|
plugin_type="",
|
|
scan_type="",
|
|
priority=""
|
|
)
|
|
assert scan.plugin_id == ""
|
|
assert scan.version == ""
|
|
|
|
|
|
@pytest.mark.unit
|
|
def test_vulnerability_empty_description():
|
|
"""Test Vulnerability with empty description"""
|
|
vuln = {
|
|
"severity": "low",
|
|
"title": "Test",
|
|
"description": "",
|
|
"affected_file": "file.py",
|
|
"recommendation": "Fix"
|
|
}
|
|
assert vuln["description"] == ""
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_create_security_policy_minimal():
|
|
"""Test creating security policy with minimal fields"""
|
|
client = TestClient(app)
|
|
policy = {
|
|
"name": "Minimal Policy"
|
|
}
|
|
response = client.post("/api/v1/security/policies", json=policy)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["policy_id"]
|
|
assert data["name"] == "Minimal Policy"
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_create_security_policy_empty_name():
|
|
"""Test creating security policy with empty name"""
|
|
client = TestClient(app)
|
|
policy = {}
|
|
response = client.post("/api/v1/security/policies", json=policy)
|
|
assert response.status_code == 200
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_list_security_reports_with_no_reports():
|
|
"""Test listing security reports when no reports exist"""
|
|
client = TestClient(app)
|
|
response = client.get("/api/v1/security/reports")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["total_reports"] == 0
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_list_vulnerabilities_with_no_vulnerabilities():
|
|
"""Test listing vulnerabilities when no vulnerabilities exist"""
|
|
client = TestClient(app)
|
|
response = client.get("/api/v1/security/vulnerabilities")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["total_vulnerabilities"] == 0
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_list_security_policies_with_no_policies():
|
|
"""Test listing security policies when no policies exist"""
|
|
client = TestClient(app)
|
|
response = client.get("/api/v1/security/policies")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["total_policies"] == 0
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_scan_priority_ordering():
|
|
"""Test that scan queue respects priority ordering"""
|
|
client = TestClient(app)
|
|
|
|
# Add scans in random priority order
|
|
priorities = ["low", "critical", "medium", "high"]
|
|
for priority in priorities:
|
|
scan = SecurityScan(
|
|
plugin_id=f"plugin_{priority}",
|
|
version="1.0.0",
|
|
plugin_type="cli",
|
|
scan_type="basic",
|
|
priority=priority
|
|
)
|
|
client.post("/api/v1/security/scan", json=scan.model_dump())
|
|
|
|
# Critical should be first, low should be last
|
|
response = client.get("/api/v1/security/scan/nonexistent")
|
|
# This will fail, but we can check queue size
|
|
assert len(scan_queue) == 4
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_security_dashboard_with_no_data():
|
|
"""Test security dashboard with no data"""
|
|
client = TestClient(app)
|
|
response = client.get("/api/v1/security/dashboard")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["dashboard"]["total_scans"] == 0
|
|
assert data["dashboard"]["queue_size"] == 0
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_list_reports_limit_parameter():
|
|
"""Test listing reports with limit parameter"""
|
|
client = TestClient(app)
|
|
response = client.get("/api/v1/security/reports?limit=5")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "reports" in data
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_list_vulnerabilities_invalid_filter():
|
|
"""Test listing vulnerabilities with invalid filter"""
|
|
client = TestClient(app)
|
|
response = client.get("/api/v1/security/vulnerabilities?severity=invalid")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["total_vulnerabilities"] == 0
|