Add sys import to test files and remove obsolete integration tests
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Successful in 9s
Blockchain Synchronization Verification / sync-verification (push) Failing after 1s
CLI Tests / test-cli (push) Failing after 3s
Documentation Validation / validate-docs (push) Successful in 6s
Documentation Validation / validate-policies-strict (push) Successful in 2s
Integration Tests / test-service-integration (push) Successful in 40s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 1s
P2P Network Verification / p2p-verification (push) Successful in 2s
Production Tests / Production Integration Tests (push) Successful in 21s
Python Tests / test-python (push) Successful in 13s
Security Scanning / security-scan (push) Failing after 46s
Smart Contract Tests / test-solidity (map[name:aitbc-token path:packages/solidity/aitbc-token]) (push) Successful in 17s
Smart Contract Tests / lint-solidity (push) Successful in 10s

- Add sys import to 29 test files across agent-coordinator, blockchain-event-bridge, blockchain-node, and coordinator-api
- Remove apps/blockchain-event-bridge/tests/test_integration.py (obsolete bridge integration tests)
- Remove apps/coordinator-api/tests/test_integration.py (obsolete API integration tests)
- Implement GPU registration in marketplace_gpu.py with GPURegistry model persistence
This commit is contained in:
aitbc
2026-04-23 16:43:17 +02:00
parent b8b1454573
commit e60cc3226c
134 changed files with 14321 additions and 1873 deletions

View File

@@ -0,0 +1 @@
"""Plugin security service tests"""

View File

@@ -0,0 +1,159 @@
"""Edge case and error handling tests for plugin security service"""
import pytest
import sys
import sys
from pathlib import Path
from fastapi.testclient import TestClient
from datetime import datetime
from main import app, SecurityScan, scan_reports, security_policies, scan_queue, vulnerability_database
@pytest.fixture(autouse=True)
def reset_state():
"""Reset global state before each test"""
scan_reports.clear()
security_policies.clear()
scan_queue.clear()
vulnerability_database.clear()
yield
scan_reports.clear()
security_policies.clear()
scan_queue.clear()
vulnerability_database.clear()
@pytest.mark.unit
def test_security_scan_empty_fields():
"""Test SecurityScan with empty fields"""
scan = SecurityScan(
plugin_id="",
version="",
plugin_type="",
scan_type="",
priority=""
)
assert scan.plugin_id == ""
assert scan.version == ""
@pytest.mark.unit
def test_vulnerability_empty_description():
"""Test Vulnerability with empty description"""
vuln = {
"severity": "low",
"title": "Test",
"description": "",
"affected_file": "file.py",
"recommendation": "Fix"
}
assert vuln["description"] == ""
@pytest.mark.integration
def test_create_security_policy_minimal():
"""Test creating security policy with minimal fields"""
client = TestClient(app)
policy = {
"name": "Minimal Policy"
}
response = client.post("/api/v1/security/policies", json=policy)
assert response.status_code == 200
data = response.json()
assert data["policy_id"]
assert data["name"] == "Minimal Policy"
@pytest.mark.integration
def test_create_security_policy_empty_name():
"""Test creating security policy with empty name"""
client = TestClient(app)
policy = {}
response = client.post("/api/v1/security/policies", json=policy)
assert response.status_code == 200
@pytest.mark.integration
def test_list_security_reports_with_no_reports():
"""Test listing security reports when no reports exist"""
client = TestClient(app)
response = client.get("/api/v1/security/reports")
assert response.status_code == 200
data = response.json()
assert data["total_reports"] == 0
@pytest.mark.integration
def test_list_vulnerabilities_with_no_vulnerabilities():
"""Test listing vulnerabilities when no vulnerabilities exist"""
client = TestClient(app)
response = client.get("/api/v1/security/vulnerabilities")
assert response.status_code == 200
data = response.json()
assert data["total_vulnerabilities"] == 0
@pytest.mark.integration
def test_list_security_policies_with_no_policies():
"""Test listing security policies when no policies exist"""
client = TestClient(app)
response = client.get("/api/v1/security/policies")
assert response.status_code == 200
data = response.json()
assert data["total_policies"] == 0
@pytest.mark.integration
def test_scan_priority_ordering():
"""Test that scan queue respects priority ordering"""
client = TestClient(app)
# Add scans in random priority order
priorities = ["low", "critical", "medium", "high"]
for priority in priorities:
scan = SecurityScan(
plugin_id=f"plugin_{priority}",
version="1.0.0",
plugin_type="cli",
scan_type="basic",
priority=priority
)
client.post("/api/v1/security/scan", json=scan.model_dump())
# Critical should be first, low should be last
response = client.get("/api/v1/security/scan/nonexistent")
# This will fail, but we can check queue size
assert len(scan_queue) == 4
@pytest.mark.integration
def test_security_dashboard_with_no_data():
"""Test security dashboard with no data"""
client = TestClient(app)
response = client.get("/api/v1/security/dashboard")
assert response.status_code == 200
data = response.json()
assert data["dashboard"]["total_scans"] == 0
assert data["dashboard"]["queue_size"] == 0
@pytest.mark.integration
def test_list_reports_limit_parameter():
"""Test listing reports with limit parameter"""
client = TestClient(app)
response = client.get("/api/v1/security/reports?limit=5")
assert response.status_code == 200
data = response.json()
assert "reports" in data
@pytest.mark.integration
def test_list_vulnerabilities_invalid_filter():
"""Test listing vulnerabilities with invalid filter"""
client = TestClient(app)
response = client.get("/api/v1/security/vulnerabilities?severity=invalid")
assert response.status_code == 200
data = response.json()
assert data["total_vulnerabilities"] == 0

View File

@@ -0,0 +1,217 @@
"""Integration tests for plugin security service"""
import pytest
import sys
import sys
from pathlib import Path
from fastapi.testclient import TestClient
from datetime import datetime
from main import app, SecurityScan, scan_reports, security_policies, scan_queue, vulnerability_database
@pytest.fixture(autouse=True)
def reset_state():
"""Reset global state before each test"""
scan_reports.clear()
security_policies.clear()
scan_queue.clear()
vulnerability_database.clear()
yield
scan_reports.clear()
security_policies.clear()
scan_queue.clear()
vulnerability_database.clear()
@pytest.mark.integration
def test_root_endpoint():
"""Test root endpoint"""
client = TestClient(app)
response = client.get("/")
assert response.status_code == 200
data = response.json()
assert data["service"] == "AITBC Plugin Security Service"
assert data["status"] == "running"
@pytest.mark.integration
def test_health_check_endpoint():
"""Test health check endpoint"""
client = TestClient(app)
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
assert "total_scans" in data
assert "queue_size" in data
@pytest.mark.integration
def test_initiate_security_scan():
"""Test initiating a security scan"""
client = TestClient(app)
scan = SecurityScan(
plugin_id="plugin_123",
version="1.0.0",
plugin_type="cli",
scan_type="comprehensive",
priority="high"
)
response = client.post("/api/v1/security/scan", json=scan.model_dump())
assert response.status_code == 200
data = response.json()
assert data["scan_id"]
assert data["status"] == "queued"
assert "queue_position" in data
@pytest.mark.integration
def test_get_scan_status_queued():
"""Test getting scan status for queued scan"""
client = TestClient(app)
scan = SecurityScan(
plugin_id="plugin_123",
version="1.0.0",
plugin_type="cli",
scan_type="basic",
priority="medium"
)
scan_response = client.post("/api/v1/security/scan", json=scan.model_dump())
scan_id = scan_response.json()["scan_id"]
response = client.get(f"/api/v1/security/scan/{scan_id}")
assert response.status_code == 200
data = response.json()
assert data["scan_id"] == scan_id
assert data["status"] == "queued"
@pytest.mark.integration
def test_get_scan_status_not_found():
"""Test getting scan status for nonexistent scan"""
client = TestClient(app)
response = client.get("/api/v1/security/scan/nonexistent")
assert response.status_code == 404
@pytest.mark.integration
def test_list_security_reports():
"""Test listing security reports"""
client = TestClient(app)
response = client.get("/api/v1/security/reports")
assert response.status_code == 200
data = response.json()
assert "reports" in data
assert "total_reports" in data
@pytest.mark.integration
def test_list_security_reports_with_filters():
"""Test listing security reports with filters"""
client = TestClient(app)
response = client.get("/api/v1/security/reports?plugin_id=plugin_123&status=completed")
assert response.status_code == 200
data = response.json()
assert "reports" in data
@pytest.mark.integration
def test_list_vulnerabilities():
"""Test listing vulnerabilities"""
client = TestClient(app)
response = client.get("/api/v1/security/vulnerabilities")
assert response.status_code == 200
data = response.json()
assert "vulnerabilities" in data
assert "total_vulnerabilities" in data
@pytest.mark.integration
def test_list_vulnerabilities_with_filters():
"""Test listing vulnerabilities with filters"""
client = TestClient(app)
response = client.get("/api/v1/security/vulnerabilities?severity=high&plugin_id=plugin_123")
assert response.status_code == 200
data = response.json()
assert "vulnerabilities" in data
@pytest.mark.integration
def test_create_security_policy():
"""Test creating a security policy"""
client = TestClient(app)
policy = {
"name": "Test Policy",
"description": "A test security policy",
"rules": ["rule1", "rule2"],
"severity_thresholds": {
"critical": 0,
"high": 0,
"medium": 5,
"low": 10
},
"plugin_types": ["cli", "web"]
}
response = client.post("/api/v1/security/policies", json=policy)
assert response.status_code == 200
data = response.json()
assert data["policy_id"]
assert data["name"] == "Test Policy"
assert data["active"] is True
@pytest.mark.integration
def test_list_security_policies():
"""Test listing security policies"""
client = TestClient(app)
response = client.get("/api/v1/security/policies")
assert response.status_code == 200
data = response.json()
assert "policies" in data
assert "total_policies" in data
@pytest.mark.integration
def test_get_security_dashboard():
"""Test getting security dashboard"""
client = TestClient(app)
response = client.get("/api/v1/security/dashboard")
assert response.status_code == 200
data = response.json()
assert "dashboard" in data
assert "total_scans" in data["dashboard"]
assert "vulnerabilities" in data["dashboard"]
@pytest.mark.integration
def test_scan_priority_queueing():
"""Test that scans are queued by priority"""
client = TestClient(app)
# Add low priority scan
scan_low = SecurityScan(
plugin_id="plugin_low",
version="1.0.0",
plugin_type="cli",
scan_type="basic",
priority="low"
)
client.post("/api/v1/security/scan", json=scan_low.model_dump())
# Add critical priority scan
scan_critical = SecurityScan(
plugin_id="plugin_critical",
version="1.0.0",
plugin_type="cli",
scan_type="basic",
priority="critical"
)
response = client.post("/api/v1/security/scan", json=scan_critical.model_dump())
scan_id = response.json()["scan_id"]
# Critical scan should be at position 1
response = client.get(f"/api/v1/security/scan/{scan_id}")
data = response.json()
assert data["queue_position"] == 1

View File

@@ -0,0 +1,205 @@
"""Unit tests for plugin security service"""
import pytest
import sys
import sys
from pathlib import Path
from datetime import datetime
from main import app, SecurityScan, Vulnerability, SecurityReport, calculate_overall_score, generate_recommendations, get_severity_distribution, estimate_scan_time
@pytest.mark.unit
def test_app_initialization():
"""Test that the FastAPI app initializes correctly"""
assert app is not None
assert app.title == "AITBC Plugin Security Service"
assert app.version == "1.0.0"
@pytest.mark.unit
def test_security_scan_model():
"""Test SecurityScan model"""
scan = SecurityScan(
plugin_id="plugin_123",
version="1.0.0",
plugin_type="cli",
scan_type="comprehensive",
priority="high"
)
assert scan.plugin_id == "plugin_123"
assert scan.version == "1.0.0"
assert scan.plugin_type == "cli"
assert scan.scan_type == "comprehensive"
assert scan.priority == "high"
@pytest.mark.unit
def test_vulnerability_model():
"""Test Vulnerability model"""
vuln = Vulnerability(
cve_id="CVE-2023-1234",
severity="high",
title="Buffer Overflow",
description="Buffer overflow vulnerability",
affected_file="file.py",
line_number=42,
recommendation="Update to latest version"
)
assert vuln.cve_id == "CVE-2023-1234"
assert vuln.severity == "high"
assert vuln.title == "Buffer Overflow"
assert vuln.line_number == 42
@pytest.mark.unit
def test_vulnerability_model_optional_fields():
"""Test Vulnerability model with optional fields"""
vuln = Vulnerability(
cve_id=None,
severity="low",
title="Minor issue",
description="Description",
affected_file="file.py",
line_number=None,
recommendation="Fix it"
)
assert vuln.cve_id is None
assert vuln.line_number is None
@pytest.mark.unit
def test_security_report_model():
"""Test SecurityReport model"""
report = SecurityReport(
scan_id="scan_123",
plugin_id="plugin_123",
version="1.0.0",
scan_date=datetime.utcnow(),
scan_duration=120.5,
overall_score="passed",
vulnerabilities=[],
security_metrics={},
recommendations=[]
)
assert report.scan_id == "scan_123"
assert report.overall_score == "passed"
assert report.scan_duration == 120.5
@pytest.mark.unit
def test_calculate_overall_score_passed():
"""Test calculate overall score with no vulnerabilities"""
scan_result = {"vulnerabilities": []}
score = calculate_overall_score(scan_result)
assert score == "passed"
@pytest.mark.unit
def test_calculate_overall_score_critical():
"""Test calculate overall score with critical vulnerability"""
scan_result = {
"vulnerabilities": [
{"severity": "critical"},
{"severity": "low"}
]
}
score = calculate_overall_score(scan_result)
assert score == "critical"
@pytest.mark.unit
def test_calculate_overall_score_failed():
"""Test calculate overall score with multiple high vulnerabilities"""
scan_result = {
"vulnerabilities": [
{"severity": "high"},
{"severity": "high"},
{"severity": "high"}
]
}
score = calculate_overall_score(scan_result)
assert score == "failed"
@pytest.mark.unit
def test_calculate_overall_score_warning():
"""Test calculate overall score with high and medium vulnerabilities"""
scan_result = {
"vulnerabilities": [
{"severity": "high"},
{"severity": "medium"},
{"severity": "medium"},
{"severity": "medium"},
{"severity": "medium"},
{"severity": "medium"}
]
}
score = calculate_overall_score(scan_result)
assert score == "warning"
@pytest.mark.unit
def test_generate_recommendations_no_vulnerabilities():
"""Test generate recommendations with no vulnerabilities"""
recommendations = generate_recommendations([])
assert len(recommendations) == 1
assert "No security issues detected" in recommendations[0]
@pytest.mark.unit
def test_generate_recommendations_critical():
"""Test generate recommendations with critical vulnerabilities"""
vulnerabilities = [
{"severity": "critical"},
{"severity": "high"}
]
recommendations = generate_recommendations(vulnerabilities)
assert any("CRITICAL" in r for r in recommendations)
assert any("HIGH" in r for r in recommendations)
@pytest.mark.unit
def test_get_severity_distribution():
"""Test get severity distribution"""
vulnerabilities = [
{"severity": "critical"},
{"severity": "high"},
{"severity": "high"},
{"severity": "medium"},
{"severity": "low"}
]
distribution = get_severity_distribution(vulnerabilities)
assert distribution["critical"] == 1
assert distribution["high"] == 2
assert distribution["medium"] == 1
assert distribution["low"] == 1
@pytest.mark.unit
def test_estimate_scan_time_basic():
"""Test estimate scan time for basic scan"""
time = estimate_scan_time("basic")
assert time == "1-2 minutes"
@pytest.mark.unit
def test_estimate_scan_time_comprehensive():
"""Test estimate scan time for comprehensive scan"""
time = estimate_scan_time("comprehensive")
assert time == "5-10 minutes"
@pytest.mark.unit
def test_estimate_scan_time_deep():
"""Test estimate scan time for deep scan"""
time = estimate_scan_time("deep")
assert time == "15-30 minutes"
@pytest.mark.unit
def test_estimate_scan_time_unknown():
"""Test estimate scan time for unknown scan type"""
time = estimate_scan_time("unknown")
assert time == "5-10 minutes"