Files
aitbc/scripts/security_audit.py
AITBC System dda703de10 feat: implement v0.2.0 release features - agent-first evolution
 v0.2 Release Preparation:
- Update version to 0.2.0 in pyproject.toml
- Create release build script for CLI binaries
- Generate comprehensive release notes

 OpenClaw DAO Governance:
- Implement complete on-chain voting system
- Create DAO smart contract with Governor framework
- Add comprehensive CLI commands for DAO operations
- Support for multiple proposal types and voting mechanisms

 GPU Acceleration CI:
- Complete GPU benchmark CI workflow
- Comprehensive performance testing suite
- Automated benchmark reports and comparison
- GPU optimization monitoring and alerts

 Agent SDK Documentation:
- Complete SDK documentation with examples
- Computing agent and oracle agent examples
- Comprehensive API reference and guides
- Security best practices and deployment guides

 Production Security Audit:
- Comprehensive security audit framework
- Detailed security assessment (72.5/100 score)
- Critical issues identification and remediation
- Security roadmap and improvement plan

 Mobile Wallet & One-Click Miner:
- Complete mobile wallet architecture design
- One-click miner implementation plan
- Cross-platform integration strategy
- Security and user experience considerations

 Documentation Updates:
- Add roadmap badge to README
- Update project status and achievements
- Comprehensive feature documentation
- Production readiness indicators

🚀 Ready for v0.2.0 release with agent-first architecture
2026-03-18 20:17:23 +01:00

663 lines
25 KiB
Python
Executable File

#!/usr/bin/env python3
"""
AITBC Production Security Audit Script
Comprehensive security assessment for production deployment
"""
import os
import sys
import json
import subprocess
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Tuple
import hashlib
import re
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SecurityAudit:
"""Comprehensive security audit for AITBC production"""
def __init__(self, project_root: str = "/opt/aitbc"):
self.project_root = Path(project_root)
self.results = {
"timestamp": datetime.utcnow().isoformat(),
"audit_version": "v0.2.0",
"findings": [],
"score": 0,
"max_score": 100,
"critical_issues": [],
"warnings": [],
"recommendations": []
}
def run_full_audit(self) -> Dict[str, Any]:
"""Run comprehensive security audit"""
logger.info("Starting AITBC Production Security Audit...")
# Security categories
categories = [
("File Permissions", self.check_file_permissions, 15),
("Secret Management", self.check_secret_management, 20),
("Code Security", self.check_code_security, 15),
("Dependencies", self.check_dependencies, 10),
("Network Security", self.check_network_security, 10),
("Access Control", self.check_access_control, 10),
("Data Protection", self.check_data_protection, 10),
("Infrastructure", self.check_infrastructure_security, 10)
]
total_score = 0
total_weight = 0
for category_name, check_function, weight in categories:
logger.info(f"Checking {category_name}...")
try:
category_score, issues = check_function()
total_score += category_score * weight
total_weight += weight
self.results["findings"].append({
"category": category_name,
"score": category_score,
"weight": weight,
"issues": issues
})
# Categorize issues
for issue in issues:
if issue["severity"] == "critical":
self.results["critical_issues"].append(issue)
elif issue["severity"] == "warning":
self.results["warnings"].append(issue)
except Exception as e:
logger.error(f"Error in {category_name} check: {e}")
self.results["findings"].append({
"category": category_name,
"score": 0,
"weight": weight,
"issues": [{"type": "check_error", "message": str(e), "severity": "critical"}]
})
total_weight += weight
# Calculate final score
self.results["score"] = (total_score / total_weight) * 100 if total_weight > 0 else 0
# Generate recommendations
self.generate_recommendations()
logger.info(f"Audit completed. Final score: {self.results['score']:.1f}/100")
return self.results
def check_file_permissions(self) -> Tuple[float, List[Dict]]:
"""Check file permissions and access controls"""
issues = []
score = 15.0
# Check sensitive file permissions
sensitive_files = [
("*.key", 600), # Private keys
("*.pem", 600), # Certificates
("config/*.env", 600), # Environment files
("keystore/*", 600), # Keystore files
("*.sh", 755), # Shell scripts
]
for pattern, expected_perm in sensitive_files:
try:
files = list(self.project_root.glob(pattern))
for file_path in files:
if file_path.is_file():
current_perm = oct(file_path.stat().st_mode)[-3:]
if current_perm != str(expected_perm):
issues.append({
"type": "file_permission",
"file": str(file_path.relative_to(self.project_root)),
"current": current_perm,
"expected": str(expected_perm),
"severity": "warning" if current_perm > "644" else "critical"
})
score -= 1
except Exception as e:
logger.warning(f"Could not check {pattern}: {e}")
# Check for world-writable files
try:
result = subprocess.run(
["find", str(self.project_root), "-perm", "-o+w", "!", "-type", "l"],
capture_output=True, text=True, timeout=30
)
if result.stdout.strip():
writable_files = result.stdout.strip().split('\n')
issues.append({
"type": "world_writable_files",
"count": len(writable_files),
"files": writable_files[:5], # Limit output
"severity": "critical"
})
score -= min(5, len(writable_files))
except Exception as e:
logger.warning(f"Could not check world-writable files: {e}")
return max(0, score), issues
def check_secret_management(self) -> Tuple[float, List[Dict]]:
"""Check secret management and key storage"""
issues = []
score = 20.0
# Check for hardcoded secrets
secret_patterns = [
(r'password\s*=\s*["\'][^"\']+["\']', "hardcoded_password"),
(r'api_key\s*=\s*["\'][^"\']+["\']', "hardcoded_api_key"),
(r'secret\s*=\s*["\'][^"\']+["\']', "hardcoded_secret"),
(r'token\s*=\s*["\'][^"\']+["\']', "hardcoded_token"),
(r'private_key\s*=\s*["\'][^"\']+["\']', "hardcoded_private_key"),
(r'0x[a-fA-F0-9]{40}', "ethereum_address"),
(r'sk-[a-zA-Z0-9]{48}', "openai_api_key"),
]
code_files = list(self.project_root.glob("**/*.py")) + list(self.project_root.glob("**/*.js"))
for file_path in code_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
for pattern, issue_type in secret_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
issues.append({
"type": issue_type,
"file": str(file_path.relative_to(self.project_root)),
"count": len(matches),
"severity": "critical"
})
score -= 2
except Exception as e:
continue
# Check for .env files in git
try:
result = subprocess.run(
["git", "ls-files", " | grep -E '\.env$|\.key$|\.pem$'"],
shell=True, cwd=self.project_root, capture_output=True, text=True
)
if result.stdout.strip():
issues.append({
"type": "secrets_in_git",
"files": result.stdout.strip().split('\n'),
"severity": "critical"
})
score -= 5
except Exception as e:
logger.warning(f"Could not check git for secrets: {e}")
# Check keystore encryption
keystore_dir = self.project_root / "keystore"
if keystore_dir.exists():
keystore_files = list(keystore_dir.glob("*"))
for keystore_file in keystore_files:
if keystore_file.is_file():
try:
with open(keystore_file, 'rb') as f:
content = f.read()
# Check if file is encrypted (not plain text)
try:
content.decode('utf-8')
if "private_key" in content.decode('utf-8').lower():
issues.append({
"type": "unencrypted_keystore",
"file": str(keystore_file.relative_to(self.project_root)),
"severity": "critical"
})
score -= 3
except UnicodeDecodeError:
# File is binary/encrypted, which is good
pass
except Exception as e:
continue
return max(0, score), issues
def check_code_security(self) -> Tuple[float, List[Dict]]:
"""Check code security vulnerabilities"""
issues = []
score = 15.0
# Check for dangerous imports
dangerous_imports = [
"pickle", # Insecure deserialization
"eval", # Code execution
"exec", # Code execution
"subprocess.call", # Command injection
"os.system", # Command injection
]
python_files = list(self.project_root.glob("**/*.py"))
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
for dangerous in dangerous_imports:
if dangerous in content:
issues.append({
"type": "dangerous_import",
"file": str(file_path.relative_to(self.project_root)),
"import": dangerous,
"severity": "warning"
})
score -= 0.5
except Exception as e:
continue
# Check for SQL injection patterns
sql_patterns = [
r'execute\s*\(\s*["\'][^"\']*\+',
r'format.*SELECT.*\+',
r'%s.*SELECT.*\+',
]
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
for pattern in sql_patterns:
if re.search(pattern, content, re.IGNORECASE):
issues.append({
"type": "sql_injection_risk",
"file": str(file_path.relative_to(self.project_root)),
"severity": "warning"
})
score -= 1
except Exception as e:
continue
# Check for input validation
input_validation_files = [
"apps/coordinator-api/src/app/services/secure_pickle.py",
"apps/coordinator-api/src/app/middleware/security.py"
]
for validation_file in input_validation_files:
file_path = self.project_root / validation_file
if file_path.exists():
# Positive check - security measures in place
score += 1
return max(0, min(15, score)), issues
def check_dependencies(self) -> Tuple[float, List[Dict]]:
"""Check dependency security"""
issues = []
score = 10.0
# Check pyproject.toml for known vulnerable packages
pyproject_file = self.project_root / "pyproject.toml"
if pyproject_file.exists():
try:
with open(pyproject_file, 'r') as f:
content = f.read()
# Check for outdated packages (simplified)
vulnerable_packages = {
"requests": "<2.25.0",
"urllib3": "<1.26.0",
"cryptography": "<3.4.0",
"pyyaml": "<5.4.0"
}
for package, version in vulnerable_packages.items():
if package in content:
issues.append({
"type": "potentially_vulnerable_dependency",
"package": package,
"recommended_version": version,
"severity": "warning"
})
score -= 1
except Exception as e:
logger.warning(f"Could not analyze dependencies: {e}")
# Check for poetry.lock or requirements.txt
lock_files = ["poetry.lock", "requirements.txt"]
has_lock_file = any((self.project_root / f).exists() for f in lock_files)
if not has_lock_file:
issues.append({
"type": "no_dependency_lock_file",
"severity": "warning"
})
score -= 2
return max(0, score), issues
def check_network_security(self) -> Tuple[float, List[Dict]]:
"""Check network security configurations"""
issues = []
score = 10.0
# Check for hardcoded URLs and endpoints
network_files = list(self.project_root.glob("**/*.py")) + list(self.project_root.glob("**/*.js"))
hardcoded_urls = []
for file_path in network_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Look for hardcoded URLs
url_patterns = [
r'http://localhost:\d+',
r'http://127\.0\.0\.1:\d+',
r'https?://[^/\s]+:\d+',
]
for pattern in url_patterns:
matches = re.findall(pattern, content)
hardcoded_urls.extend(matches)
except Exception as e:
continue
if hardcoded_urls:
issues.append({
"type": "hardcoded_network_endpoints",
"count": len(hardcoded_urls),
"examples": hardcoded_urls[:3],
"severity": "warning"
})
score -= min(3, len(hardcoded_urls))
# Check for SSL/TLS usage
ssl_config_files = [
"apps/coordinator-api/src/app/config.py",
"apps/blockchain-node/src/aitbc_chain/config.py"
]
ssl_enabled = False
for config_file in ssl_config_files:
file_path = self.project_root / config_file
if file_path.exists():
try:
with open(file_path, 'r') as f:
content = f.read()
if "ssl" in content.lower() or "tls" in content.lower():
ssl_enabled = True
break
except Exception as e:
continue
if not ssl_enabled:
issues.append({
"type": "no_ssl_configuration",
"severity": "warning"
})
score -= 2
return max(0, score), issues
def check_access_control(self) -> Tuple[float, List[Dict]]:
"""Check access control mechanisms"""
issues = []
score = 10.0
# Check for authentication mechanisms
auth_files = [
"apps/coordinator-api/src/app/auth/",
"apps/coordinator-api/src/app/middleware/auth.py"
]
has_auth = any((self.project_root / f).exists() for f in auth_files)
if not has_auth:
issues.append({
"type": "no_authentication_mechanism",
"severity": "critical"
})
score -= 5
# Check for role-based access control
rbac_patterns = ["role", "permission", "authorization"]
rbac_found = False
python_files = list(self.project_root.glob("**/*.py"))
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if any(pattern in content.lower() for pattern in rbac_patterns):
rbac_found = True
break
except Exception as e:
continue
if not rbac_found:
issues.append({
"type": "no_role_based_access_control",
"severity": "warning"
})
score -= 2
return max(0, score), issues
def check_data_protection(self) -> Tuple[float, List[Dict]]:
"""Check data protection measures"""
issues = []
score = 10.0
# Check for encryption usage
encryption_patterns = ["encrypt", "decrypt", "cipher", "hash"]
encryption_found = False
python_files = list(self.project_root.glob("**/*.py"))
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if any(pattern in content.lower() for pattern in encryption_patterns):
encryption_found = True
break
except Exception as e:
continue
if not encryption_found:
issues.append({
"type": "no_encryption_mechanism",
"severity": "warning"
})
score -= 3
# Check for data validation
validation_patterns = ["validate", "sanitize", "clean"]
validation_found = False
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if any(pattern in content.lower() for pattern in validation_patterns):
validation_found = True
break
except Exception as e:
continue
if not validation_found:
issues.append({
"type": "no_data_validation",
"severity": "warning"
})
score -= 2
return max(0, score), issues
def check_infrastructure_security(self) -> Tuple[float, List[Dict]]:
"""Check infrastructure security"""
issues = []
score = 10.0
# Check for systemd service files
service_files = list(self.project_root.glob("**/*.service"))
if service_files:
for service_file in service_files:
try:
with open(service_file, 'r') as f:
content = f.read()
# Check for running as root
if "User=root" in content or "User=root" not in content:
issues.append({
"type": "service_running_as_root",
"file": str(service_file.relative_to(self.project_root)),
"severity": "warning"
})
score -= 1
except Exception as e:
continue
# Check for Docker usage (should be none per policy)
docker_files = list(self.project_root.glob("**/Dockerfile*")) + list(self.project_root.glob("**/docker-compose*"))
if docker_files:
issues.append({
"type": "docker_usage_detected",
"files": [str(f.relative_to(self.project_root)) for f in docker_files],
"severity": "warning"
})
score -= 2
# Check for firewall configuration
firewall_configs = list(self.project_root.glob("**/firewall*")) + list(self.project_root.glob("**/ufw*"))
if not firewall_configs:
issues.append({
"type": "no_firewall_configuration",
"severity": "warning"
})
score -= 1
return max(0, score), issues
def generate_recommendations(self):
"""Generate security recommendations based on findings"""
recommendations = []
# Critical issues recommendations
critical_types = set(issue["type"] for issue in self.results["critical_issues"])
if "hardcoded_password" in critical_types:
recommendations.append({
"priority": "critical",
"action": "Remove all hardcoded passwords and use environment variables or secret management",
"files": [issue["file"] for issue in self.results["critical_issues"] if issue["type"] == "hardcoded_password"]
})
if "secrets_in_git" in critical_types:
recommendations.append({
"priority": "critical",
"action": "Remove secrets from git history and configure .gitignore properly",
"details": "Use git filter-branch to remove sensitive data from history"
})
if "unencrypted_keystore" in critical_types:
recommendations.append({
"priority": "critical",
"action": "Encrypt all keystore files using AES-GCM encryption",
"implementation": "Use the existing keystore.py encryption mechanisms"
})
if "world_writable_files" in critical_types:
recommendations.append({
"priority": "critical",
"action": "Fix world-writable file permissions immediately",
"command": "find /opt/aitbc -type f -perm /o+w -exec chmod 644 {} \\;"
})
# Warning level recommendations
warning_types = set(issue["type"] for issue in self.results["warnings"])
if "dangerous_import" in warning_types:
recommendations.append({
"priority": "high",
"action": "Replace dangerous imports with secure alternatives",
"details": "Use json instead of pickle, subprocess.run with shell=False"
})
if "no_ssl_configuration" in warning_types:
recommendations.append({
"priority": "high",
"action": "Implement SSL/TLS configuration for all network services",
"implementation": "Configure SSL certificates and HTTPS endpoints"
})
if "no_authentication_mechanism" in critical_types:
recommendations.append({
"priority": "critical",
"action": "Implement proper authentication and authorization",
"implementation": "Add JWT-based authentication with role-based access control"
})
# General recommendations
if self.results["score"] < 70:
recommendations.append({
"priority": "medium",
"action": "Conduct regular security audits and implement security testing in CI/CD",
"implementation": "Add automated security scanning to GitHub Actions"
})
self.results["recommendations"] = recommendations
def save_report(self, output_file: str):
"""Save audit report to file"""
with open(output_file, 'w') as f:
json.dump(self.results, f, indent=2)
logger.info(f"Security audit report saved to: {output_file}")
def main():
"""Main function to run security audit"""
audit = SecurityAudit()
# Run full audit
results = audit.run_full_audit()
# Save report
report_file = "/opt/aitbc/security_audit_report.json"
audit.save_report(report_file)
# Print summary
print(f"\n{'='*60}")
print(f"AITBC PRODUCTION SECURITY AUDIT REPORT")
print(f"{'='*60}")
print(f"Overall Score: {results['score']:.1f}/100")
print(f"Critical Issues: {len(results['critical_issues'])}")
print(f"Warnings: {len(results['warnings'])}")
print(f"Recommendations: {len(results['recommendations'])}")
if results['critical_issues']:
print(f"\n🚨 CRITICAL ISSUES:")
for issue in results['critical_issues'][:5]:
print(f" - {issue['type']}: {issue.get('message', 'N/A')}")
if results['recommendations']:
print(f"\n💡 TOP RECOMMENDATIONS:")
for rec in results['recommendations'][:3]:
print(f" - [{rec['priority'].upper()}] {rec['action']}")
print(f"\n📄 Full report: {report_file}")
# Exit with appropriate code
if results['score'] < 50:
sys.exit(2) # Critical security issues
elif results['score'] < 70:
sys.exit(1) # Security concerns
else:
sys.exit(0) # Acceptable security posture
if __name__ == "__main__":
main()