Files
oib c8be9d7414 feat: add marketplace metrics, privacy features, and service registry endpoints
- Add Prometheus metrics for marketplace API throughput and error rates with new dashboard panels
- Implement confidential transaction models with encryption support and access control
- Add key management system with registration, rotation, and audit logging
- Create services and registry routers for service discovery and management
- Integrate ZK proof generation for privacy-preserving receipts
- Add metrics instru
2025-12-22 10:33:23 +01:00

639 lines
23 KiB
Python

"""
Security validation framework for AITBC SDK certification
"""
import json
import subprocess
import tempfile
import shutil
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import yaml
@dataclass
class SecurityIssue:
"""Security issue representation"""
tool: str
severity: str # critical, high, medium, low
type: str # vulnerability, dependency, code_issue
description: str
file_path: Optional[str] = None
line_number: Optional[int] = None
cve_id: Optional[str] = None
remediation: Optional[str] = None
@dataclass
class SecurityReport:
"""Security validation report"""
sdk_path: str
sdk_language: str
timestamp: datetime
issues: List[SecurityIssue]
score: float
certification_level: str
blocked: bool
class SecurityValidator:
"""Main security validation orchestrator"""
def __init__(self):
self.tools = {
"python": PythonSecurityValidator(),
"java": JavaSecurityValidator(),
"javascript": JavaScriptSecurityValidator(),
"typescript": TypeScriptSecurityValidator()
}
def validate(self, sdk_path: str, certification_level: str = "bronze") -> SecurityReport:
"""Validate SDK security"""
sdk_path = Path(sdk_path).resolve()
# Detect language
language = self._detect_language(sdk_path)
if language not in self.tools:
raise ValueError(f"Unsupported language: {language}")
# Run validation
validator = self.tools[language]
issues = validator.validate(sdk_path, certification_level)
# Calculate score and determine certification status
score = self._calculate_score(issues, certification_level)
blocked = self._should_block_certification(issues, certification_level)
return SecurityReport(
sdk_path=str(sdk_path),
sdk_language=language,
timestamp=datetime.utcnow(),
issues=issues,
score=score,
certification_level=certification_level,
blocked=blocked
)
def _detect_language(self, path: Path) -> str:
"""Detect SDK programming language"""
# Check for language-specific files
if (path / "setup.py").exists() or (path / "pyproject.toml").exists():
return "python"
elif (path / "pom.xml").exists() or (path / "build.gradle").exists():
return "java"
elif (path / "package.json").exists():
# Check if it's TypeScript
if any(path.rglob("*.ts")):
return "typescript"
return "javascript"
raise ValueError("Could not detect SDK language")
def _calculate_score(self, issues: List[SecurityIssue], level: str) -> float:
"""Calculate security score (0-100)"""
weights = {
"critical": 25,
"high": 15,
"medium": 5,
"low": 1
}
total_deduction = 0
for issue in issues:
total_deduction += weights.get(issue.severity, 0)
score = max(0, 100 - total_deduction)
return score
def _should_block_certification(self, issues: List[SecurityIssue], level: str) -> bool:
"""Determine if issues should block certification"""
if level == "bronze":
# Block for critical or high severity issues
return any(i.severity in ["critical", "high"] for i in issues)
elif level == "silver":
# Block for critical issues
return any(i.severity == "critical" for i in issues)
elif level == "gold":
# Block for any issues
return len(issues) > 0
return False
def export_sarif(self, report: SecurityReport, output_path: str):
"""Export report in SARIF format"""
sarif = {
"version": "2.1.0",
"$schema": "https://json.schemastore.org/sarif-2.1.0.json",
"runs": [
{
"tool": {
"driver": {
"name": "aitbc-security-validator",
"version": "1.0.0",
"informationUri": "https://aitbc.io/security"
}
},
"results": [
{
"ruleId": f"{issue.tool}-{issue.type}",
"level": self._map_severity_to_sarif(issue.severity),
"message": {
"text": issue.description
},
"locations": [
{
"physicalLocation": {
"artifactLocation": {
"uri": issue.file_path or ""
},
"region": {
"startLine": issue.line_number or 1
}
}
}
],
"properties": {
"cve": issue.cve_id,
"remediation": issue.remediation
}
}
for issue in report.issues
]
}
]
}
with open(output_path, 'w') as f:
json.dump(sarif, f, indent=2)
def _map_severity_to_sarif(self, severity: str) -> str:
"""Map severity to SARIF level"""
mapping = {
"critical": "error",
"high": "error",
"medium": "warning",
"low": "note"
}
return mapping.get(severity, "warning")
class PythonSecurityValidator:
"""Python-specific security validation"""
def validate(self, path: Path, level: str) -> List[SecurityIssue]:
"""Run Python security checks"""
issues = []
# Dependency scanning with safety
issues.extend(self._scan_dependencies(path))
# Code analysis with bandit
if level in ["silver", "gold"]:
issues.extend(self._analyze_code(path))
# Check for secrets
if level == "gold":
issues.extend(self._scan_secrets(path))
return issues
def _scan_dependencies(self, path: Path) -> List[SecurityIssue]:
"""Scan Python dependencies for vulnerabilities"""
issues = []
# Find requirements files
req_files = list(path.rglob("requirements*.txt")) + list(path.rglob("pyproject.toml"))
for req_file in req_files:
try:
# Run safety check
result = subprocess.run(
["safety", "check", "--json", "--file", str(req_file)],
capture_output=True,
text=True,
cwd=path
)
if result.returncode == 0:
data = json.loads(result.stdout)
for vuln in data:
issues.append(SecurityIssue(
tool="safety",
severity=self._map_safety_severity(vuln.get("advisory", "")),
type="dependency",
description=vuln.get("advisory", ""),
cve_id=vuln.get("cve"),
remediation=f"Update {vuln.get('package')} to {vuln.get('analyzed_version')}"
))
except (subprocess.CalledProcessError, json.JSONDecodeError, FileNotFoundError):
# Safety not installed or failed
pass
return issues
def _analyze_code(self, path: Path) -> List[SecurityIssue]:
"""Analyze Python code for security issues"""
issues = []
try:
# Run bandit
result = subprocess.run(
["bandit", "-r", str(path), "-f", "json"],
capture_output=True,
text=True
)
if result.stdout:
data = json.loads(result.stdout)
for issue in data.get("results", []):
issues.append(SecurityIssue(
tool="bandit",
severity=issue.get("issue_severity", "medium").lower(),
type="code_issue",
description=issue.get("issue_text", ""),
file_path=issue.get("filename"),
line_number=issue.get("line_number"),
remediation=issue.get("issue_cwe", {}).get("link")
))
except (subprocess.CalledProcessError, json.JSONDecodeError, FileNotFoundError):
# Bandit not installed or failed
pass
return issues
def _scan_secrets(self, path: Path) -> List[SecurityIssue]:
"""Scan for hardcoded secrets"""
issues = []
try:
# Run truffleHog
result = subprocess.run(
["trufflehog", "--json", str(path)],
capture_output=True,
text=True
)
if result.stdout:
for line in result.stdout.strip().split('\n'):
if line:
finding = json.loads(line)
issues.append(SecurityIssue(
tool="trufflehog",
severity="high",
type="code_issue",
description="Hardcoded secret detected",
file_path=finding.get("path"),
line_number=finding.get("line"),
remediation="Remove hardcoded secret and use environment variables"
))
except (subprocess.CalledProcessError, json.JSONDecodeError, FileNotFoundError):
# TruffleHog not installed or failed
pass
return issues
def _map_safety_severity(self, advisory: str) -> str:
"""Map safety advisory to severity"""
advisory_lower = advisory.lower()
if any(word in advisory_lower for word in ["critical", "remote code execution"]):
return "critical"
elif any(word in advisory_lower for word in ["high", "execution", "bypass"]):
return "high"
elif any(word in advisory_lower for word in ["medium"]):
return "medium"
else:
return "low"
class JavaSecurityValidator:
"""Java-specific security validation"""
def validate(self, path: Path, level: str) -> List[SecurityIssue]:
"""Run Java security checks"""
issues = []
# Dependency scanning with OWASP Dependency Check
issues.extend(self._scan_dependencies(path))
# Code analysis with SpotBugs
if level in ["silver", "gold"]:
issues.extend(self._analyze_code(path))
return issues
def _scan_dependencies(self, path: Path) -> List[SecurityIssue]:
"""Scan Java dependencies for vulnerabilities"""
issues = []
# Look for pom.xml or build.gradle
pom_file = path / "pom.xml"
gradle_file = path / "build.gradle"
if pom_file.exists():
# Run Maven dependency check
try:
result = subprocess.run(
["mvn", "org.owasp:dependency-check-maven:check"],
capture_output=True,
text=True,
cwd=path
)
# Parse XML report
report_path = path / "target" / "dependency-check-report.xml"
if report_path.exists():
issues.extend(self._parse_dependency_check_report(report_path))
except subprocess.CalledProcessError:
pass
elif gradle_file.exists():
# Run Gradle dependency check
try:
result = subprocess.run(
["./gradlew", "dependencyCheckAnalyze"],
capture_output=True,
text=True,
cwd=path
)
# Parse XML report
report_path = path / "build" / "reports" / "dependency-check-report.xml"
if report_path.exists():
issues.extend(self._parse_dependency_check_report(report_path))
except subprocess.CalledProcessError:
pass
return issues
def _parse_dependency_check_report(self, report_path: Path) -> List[SecurityIssue]:
"""Parse OWASP Dependency Check XML report"""
import xml.etree.ElementTree as ET
issues = []
try:
tree = ET.parse(report_path)
root = tree.getroot()
for vulnerability in root.findall(".//vulnerability"):
name = vulnerability.get("name")
severity = vulnerability.get("severity")
cve = vulnerability.get("cve")
# Map severity
if severity.upper() in ["CRITICAL", "HIGH"]:
mapped_severity = "high"
elif severity.upper() == "MEDIUM":
mapped_severity = "medium"
else:
mapped_severity = "low"
issues.append(SecurityIssue(
tool="dependency-check",
severity=mapped_severity,
type="dependency",
description=f"Vulnerability in {name}",
cve_id=cve,
remediation="Update dependency to patched version"
))
except ET.ParseError:
pass
return issues
def _analyze_code(self, path: Path) -> List[SecurityIssue]:
"""Analyze Java code with SpotBugs"""
issues = []
try:
# Run SpotBugs
result = subprocess.run(
["spotbugs", "-textui", "-xml:withMessages", "-low", str(path)],
capture_output=True,
text=True
)
# Parse SpotBugs XML report
report_path = path / "spotbugsXml.xml"
if report_path.exists():
issues.extend(self._parse_spotbugs_report(report_path))
except subprocess.CalledProcessError:
pass
return issues
def _parse_spotbugs_report(self, report_path: Path) -> List[SecurityIssue]:
"""Parse SpotBugs XML report"""
import xml.etree.ElementTree as ET
issues = []
try:
tree = ET.parse(report_path)
root = tree.getroot()
for instance in root.findall(".//BugInstance"):
bug_type = instance.get("type")
priority = instance.get("priority")
# Map priority to severity
if priority == "1":
severity = "high"
elif priority == "2":
severity = "medium"
else:
severity = "low"
source_line = instance.find(".//SourceLine")
if source_line is not None:
issues.append(SecurityIssue(
tool="spotbugs",
severity=severity,
type="code_issue",
description=bug_type,
file_path=source_line.get("sourcepath"),
line_number=int(source_line.get("start", 0)),
remediation=f"Fix {bug_type} security issue"
))
except ET.ParseError:
pass
return issues
class JavaScriptSecurityValidator:
"""JavaScript-specific security validation"""
def validate(self, path: Path, level: str) -> List[SecurityIssue]:
"""Run JavaScript security checks"""
issues = []
# Dependency scanning with npm audit
issues.extend(self._scan_dependencies(path))
# Code analysis with ESLint security rules
if level in ["silver", "gold"]:
issues.extend(self._analyze_code(path))
return issues
def _scan_dependencies(self, path: Path) -> List[SecurityIssue]:
"""Scan npm dependencies for vulnerabilities"""
issues = []
package_json = path / "package.json"
if not package_json.exists():
return issues
try:
# Run npm audit
result = subprocess.run(
["npm", "audit", "--json"],
capture_output=True,
text=True,
cwd=path
)
if result.stdout:
data = json.loads(result.stdout)
for advisory_id, advisory in data.get("vulnerabilities", {}).items():
severity = advisory.get("severity", "low")
issues.append(SecurityIssue(
tool="npm-audit",
severity=severity,
type="dependency",
description=advisory.get("title", ""),
cve_id=advisory.get("cwe"),
remediation=f"Run npm audit fix"
))
except (subprocess.CalledProcessError, json.JSONDecodeError, FileNotFoundError):
pass
return issues
def _analyze_code(self, path: Path) -> List[SecurityIssue]:
"""Analyze JavaScript code with ESLint"""
issues = []
try:
# Run ESLint with security plugin
result = subprocess.run(
["npx", "eslint", "--format", "json", str(path)],
capture_output=True,
text=True
)
if result.stdout:
data = json.loads(result.stdout)
for file_result in data:
for message in file_result.get("messages", []):
if "security" in message.get("ruleId", "").lower():
issues.append(SecurityIssue(
tool="eslint",
severity="medium",
type="code_issue",
description=message.get("message"),
file_path=file_result.get("filePath"),
line_number=message.get("line"),
remediation=f"Fix {message.get('ruleId')} issue"
))
except (subprocess.CalledProcessError, json.JSONDecodeError, FileNotFoundError):
pass
return issues
class TypeScriptSecurityValidator(JavaScriptSecurityValidator):
"""TypeScript-specific security validation (inherits from JavaScript)"""
def validate(self, path: Path, level: str) -> List[SecurityIssue]:
"""Run TypeScript security checks"""
# Run JavaScript checks first
issues = super().validate(path, level)
# Additional TypeScript-specific checks
if level == "gold":
issues.extend(self._check_typescript_config(path))
return issues
def _check_typescript_config(self, path: Path) -> List[SecurityIssue]:
"""Check TypeScript configuration for security"""
issues = []
tsconfig = path / "tsconfig.json"
if tsconfig.exists():
try:
with open(tsconfig) as f:
config = json.load(f)
compiler_options = config.get("compilerOptions", {})
# Check for implicit any
if compiler_options.get("noImplicitAny") is not True:
issues.append(SecurityIssue(
tool="typescript-config",
severity="low",
type="code_issue",
description="TypeScript should disable implicit any",
file_path=str(tsconfig),
remediation="Set noImplicitAny to true"
))
# Check for strict mode
if compiler_options.get("strict") is not True:
issues.append(SecurityIssue(
tool="typescript-config",
severity="low",
type="code_issue",
description="TypeScript should use strict mode",
file_path=str(tsconfig),
remediation="Set strict to true"
))
except json.JSONDecodeError:
pass
return issues
def main():
"""CLI entry point"""
import argparse
parser = argparse.ArgumentParser(description="AITBC SDK Security Validator")
parser.add_argument("sdk_path", help="Path to SDK directory")
parser.add_argument("--level", choices=["bronze", "silver", "gold"], default="bronze")
parser.add_argument("--output", help="Output SARIF report path")
parser.add_argument("--format", choices=["json", "sarif"], default="json")
args = parser.parse_args()
# Run validation
validator = SecurityValidator()
report = validator.validate(args.sdk_path, args.level)
# Output results
if args.format == "sarif" and args.output:
validator.export_sarif(report, args.output)
else:
print(json.dumps(asdict(report), indent=2, default=str))
# Exit with error if blocked
if report.blocked:
print(f"\nCERTIFICATION BLOCKED: Security issues found")
for issue in report.issues:
if issue.severity in ["critical", "high"]:
print(f" - {issue.description} ({issue.severity})")
exit(1)
else:
print(f"\nSECURITY CHECK PASSED: Score {report.score}/100")
if __name__ == "__main__":
main()