Files
aitbc/scripts/security/security_audit.py
aitbc 3b8249d299 refactor: comprehensive scripts directory reorganization by functionality
Scripts Directory Reorganization - Complete:
 FUNCTIONAL ORGANIZATION: Scripts sorted into 8 logical categories
- github/: GitHub and Git operations (6 files)
- sync/: Synchronization and data replication (4 files)
- security/: Security and audit operations (2 files)
- monitoring/: System and service monitoring (6 files)
- maintenance/: System maintenance and cleanup (4 files)
- deployment/: Deployment and provisioning (11 files)
- testing/: Testing and quality assurance (13 files)
- utils/: Utility scripts and helpers (47 files)

 ROOT DIRECTORY CLEANED: Only README.md remains in scripts root
- scripts/README.md: Main documentation
- scripts/SCRIPTS_ORGANIZATION.md: Complete organization guide
- All functional scripts moved to appropriate subdirectories

 SCRIPTS CATEGORIZATION:
📁 GitHub Operations: PR resolution, repository management, Git workflows
📁 Synchronization: Bulk sync, fast sync, sync detection, SystemD sync
📁 Security: Security audits, monitoring, vulnerability scanning
📁 Monitoring: Health checks, log monitoring, network monitoring, production monitoring
📁 Maintenance: Cleanup operations, performance tuning, weekly maintenance
📁 Deployment: Release building, node provisioning, DAO deployment, production deployment
📁 Testing: E2E testing, workflow testing, QA cycles, service testing
📁 Utilities: System management, setup scripts, helpers, tools

 ORGANIZATION BENEFITS:
- Better Navigation: Scripts grouped by functionality
- Easier Maintenance: Related scripts grouped together
- Scalable Structure: Easy to add new scripts to appropriate categories
- Clear Documentation: Comprehensive organization guide with descriptions
- Improved Workflow: Quick access to relevant scripts by category

 DOCUMENTATION ENHANCED:
- SCRIPTS_ORGANIZATION.md: Complete directory structure and usage guide
- Quick Reference: Common script usage examples
- Script Descriptions: Purpose and functionality for each script
- Maintenance Guidelines: How to keep organization current

DIRECTORY STRUCTURE:
📁 scripts/
├── README.md (Main documentation)
├── SCRIPTS_ORGANIZATION.md (Organization guide)
├── github/ (6 files - GitHub operations)
├── sync/ (4 files - Synchronization)
├── security/ (2 files - Security)
├── monitoring/ (6 files - Monitoring)
├── maintenance/ (4 files - Maintenance)
├── deployment/ (11 files - Deployment)
├── testing/ (13 files - Testing)
├── utils/ (47 files - Utilities)
├── ci/ (existing - CI/CD)
├── deployment/ (existing - legacy deployment)
├── development/ (existing - Development tools)
├── monitoring/ (existing - Legacy monitoring)
├── services/ (existing - Service management)
├── testing/ (existing - Legacy testing)
├── utils/ (existing - Legacy utilities)
├── workflow/ (existing - Workflow automation)
└── workflow-openclaw/ (existing - OpenClaw workflows)

RESULT: Successfully reorganized 27 unorganized scripts into 8 functional categories, creating a clean, maintainable, and well-documented scripts directory structure with comprehensive organization guide.
2026-03-30 17:13:27 +02:00

663 lines
25 KiB
Python
Executable File

#!/usr/bin/env python3
"""
AITBC Production Security Audit Script
Comprehensive security assessment for production deployment
"""
import os
import sys
import json
import subprocess
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Tuple
import hashlib
import re
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SecurityAudit:
"""Comprehensive security audit for AITBC production"""
def __init__(self, project_root: str = "/opt/aitbc"):
self.project_root = Path(project_root)
self.results = {
"timestamp": datetime.utcnow().isoformat(),
"audit_version": "v0.2.0",
"findings": [],
"score": 0,
"max_score": 100,
"critical_issues": [],
"warnings": [],
"recommendations": []
}
def run_full_audit(self) -> Dict[str, Any]:
"""Run comprehensive security audit"""
logger.info("Starting AITBC Production Security Audit...")
# Security categories
categories = [
("File Permissions", self.check_file_permissions, 15),
("Secret Management", self.check_secret_management, 20),
("Code Security", self.check_code_security, 15),
("Dependencies", self.check_dependencies, 10),
("Network Security", self.check_network_security, 10),
("Access Control", self.check_access_control, 10),
("Data Protection", self.check_data_protection, 10),
("Infrastructure", self.check_infrastructure_security, 10)
]
total_score = 0
total_weight = 0
for category_name, check_function, weight in categories:
logger.info(f"Checking {category_name}...")
try:
category_score, issues = check_function()
total_score += category_score * weight
total_weight += weight
self.results["findings"].append({
"category": category_name,
"score": category_score,
"weight": weight,
"issues": issues
})
# Categorize issues
for issue in issues:
if issue["severity"] == "critical":
self.results["critical_issues"].append(issue)
elif issue["severity"] == "warning":
self.results["warnings"].append(issue)
except Exception as e:
logger.error(f"Error in {category_name} check: {e}")
self.results["findings"].append({
"category": category_name,
"score": 0,
"weight": weight,
"issues": [{"type": "check_error", "message": str(e), "severity": "critical"}]
})
total_weight += weight
# Calculate final score
self.results["score"] = (total_score / total_weight) * 100 if total_weight > 0 else 0
# Generate recommendations
self.generate_recommendations()
logger.info(f"Audit completed. Final score: {self.results['score']:.1f}/100")
return self.results
def check_file_permissions(self) -> Tuple[float, List[Dict]]:
"""Check file permissions and access controls"""
issues = []
score = 15.0
# Check sensitive file permissions
sensitive_files = [
("*.key", 600), # Private keys
("*.pem", 600), # Certificates
("config/*.env", 600), # Environment files
("keystore/*", 600), # Keystore files
("*.sh", 755), # Shell scripts
]
for pattern, expected_perm in sensitive_files:
try:
files = list(self.project_root.glob(pattern))
for file_path in files:
if file_path.is_file():
current_perm = oct(file_path.stat().st_mode)[-3:]
if current_perm != str(expected_perm):
issues.append({
"type": "file_permission",
"file": str(file_path.relative_to(self.project_root)),
"current": current_perm,
"expected": str(expected_perm),
"severity": "warning" if current_perm > "644" else "critical"
})
score -= 1
except Exception as e:
logger.warning(f"Could not check {pattern}: {e}")
# Check for world-writable files
try:
result = subprocess.run(
["find", str(self.project_root), "-perm", "-o+w", "!", "-type", "l"],
capture_output=True, text=True, timeout=30
)
if result.stdout.strip():
writable_files = result.stdout.strip().split('\n')
issues.append({
"type": "world_writable_files",
"count": len(writable_files),
"files": writable_files[:5], # Limit output
"severity": "critical"
})
score -= min(5, len(writable_files))
except Exception as e:
logger.warning(f"Could not check world-writable files: {e}")
return max(0, score), issues
def check_secret_management(self) -> Tuple[float, List[Dict]]:
"""Check secret management and key storage"""
issues = []
score = 20.0
# Check for hardcoded secrets
secret_patterns = [
(r'password\s*=\s*["\'][^"\']+["\']', "hardcoded_password"),
(r'api_key\s*=\s*["\'][^"\']+["\']', "hardcoded_api_key"),
(r'secret\s*=\s*["\'][^"\']+["\']', "hardcoded_secret"),
(r'token\s*=\s*["\'][^"\']+["\']', "hardcoded_token"),
(r'private_key\s*=\s*["\'][^"\']+["\']', "hardcoded_private_key"),
(r'0x[a-fA-F0-9]{40}', "ethereum_address"),
(r'sk-[a-zA-Z0-9]{48}', "openai_api_key"),
]
code_files = list(self.project_root.glob("**/*.py")) + list(self.project_root.glob("**/*.js"))
for file_path in code_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
for pattern, issue_type in secret_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
issues.append({
"type": issue_type,
"file": str(file_path.relative_to(self.project_root)),
"count": len(matches),
"severity": "critical"
})
score -= 2
except Exception as e:
continue
# Check for .env files in git
try:
result = subprocess.run(
["git", "ls-files", " | grep -E '\.env$|\.key$|\.pem$'"],
shell=True, cwd=self.project_root, capture_output=True, text=True
)
if result.stdout.strip():
issues.append({
"type": "secrets_in_git",
"files": result.stdout.strip().split('\n'),
"severity": "critical"
})
score -= 5
except Exception as e:
logger.warning(f"Could not check git for secrets: {e}")
# Check keystore encryption
keystore_dir = self.project_root / "keystore"
if keystore_dir.exists():
keystore_files = list(keystore_dir.glob("*"))
for keystore_file in keystore_files:
if keystore_file.is_file():
try:
with open(keystore_file, 'rb') as f:
content = f.read()
# Check if file is encrypted (not plain text)
try:
content.decode('utf-8')
if "private_key" in content.decode('utf-8').lower():
issues.append({
"type": "unencrypted_keystore",
"file": str(keystore_file.relative_to(self.project_root)),
"severity": "critical"
})
score -= 3
except UnicodeDecodeError:
# File is binary/encrypted, which is good
pass
except Exception as e:
continue
return max(0, score), issues
def check_code_security(self) -> Tuple[float, List[Dict]]:
"""Check code security vulnerabilities"""
issues = []
score = 15.0
# Check for dangerous imports
dangerous_imports = [
"pickle", # Insecure deserialization
"eval", # Code execution
"exec", # Code execution
"subprocess.call", # Command injection
"os.system", # Command injection
]
python_files = list(self.project_root.glob("**/*.py"))
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
for dangerous in dangerous_imports:
if dangerous in content:
issues.append({
"type": "dangerous_import",
"file": str(file_path.relative_to(self.project_root)),
"import": dangerous,
"severity": "warning"
})
score -= 0.5
except Exception as e:
continue
# Check for SQL injection patterns
sql_patterns = [
r'execute\s*\(\s*["\'][^"\']*\+',
r'format.*SELECT.*\+',
r'%s.*SELECT.*\+',
]
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
for pattern in sql_patterns:
if re.search(pattern, content, re.IGNORECASE):
issues.append({
"type": "sql_injection_risk",
"file": str(file_path.relative_to(self.project_root)),
"severity": "warning"
})
score -= 1
except Exception as e:
continue
# Check for input validation
input_validation_files = [
"apps/coordinator-api/src/app/services/secure_pickle.py",
"apps/coordinator-api/src/app/middleware/security.py"
]
for validation_file in input_validation_files:
file_path = self.project_root / validation_file
if file_path.exists():
# Positive check - security measures in place
score += 1
return max(0, min(15, score)), issues
def check_dependencies(self) -> Tuple[float, List[Dict]]:
"""Check dependency security"""
issues = []
score = 10.0
# Check pyproject.toml for known vulnerable packages
pyproject_file = self.project_root / "pyproject.toml"
if pyproject_file.exists():
try:
with open(pyproject_file, 'r') as f:
content = f.read()
# Check for outdated packages (simplified)
vulnerable_packages = {
"requests": "<2.25.0",
"urllib3": "<1.26.0",
"cryptography": "<3.4.0",
"pyyaml": "<5.4.0"
}
for package, version in vulnerable_packages.items():
if package in content:
issues.append({
"type": "potentially_vulnerable_dependency",
"package": package,
"recommended_version": version,
"severity": "warning"
})
score -= 1
except Exception as e:
logger.warning(f"Could not analyze dependencies: {e}")
# Check for poetry.lock or requirements.txt
lock_files = ["poetry.lock", "requirements.txt"]
has_lock_file = any((self.project_root / f).exists() for f in lock_files)
if not has_lock_file:
issues.append({
"type": "no_dependency_lock_file",
"severity": "warning"
})
score -= 2
return max(0, score), issues
def check_network_security(self) -> Tuple[float, List[Dict]]:
"""Check network security configurations"""
issues = []
score = 10.0
# Check for hardcoded URLs and endpoints
network_files = list(self.project_root.glob("**/*.py")) + list(self.project_root.glob("**/*.js"))
hardcoded_urls = []
for file_path in network_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Look for hardcoded URLs
url_patterns = [
r'http://localhost:\d+',
r'http://127\.0\.0\.1:\d+',
r'https?://[^/\s]+:\d+',
]
for pattern in url_patterns:
matches = re.findall(pattern, content)
hardcoded_urls.extend(matches)
except Exception as e:
continue
if hardcoded_urls:
issues.append({
"type": "hardcoded_network_endpoints",
"count": len(hardcoded_urls),
"examples": hardcoded_urls[:3],
"severity": "warning"
})
score -= min(3, len(hardcoded_urls))
# Check for SSL/TLS usage
ssl_config_files = [
"apps/coordinator-api/src/app/config.py",
"apps/blockchain-node/src/aitbc_chain/config.py"
]
ssl_enabled = False
for config_file in ssl_config_files:
file_path = self.project_root / config_file
if file_path.exists():
try:
with open(file_path, 'r') as f:
content = f.read()
if "ssl" in content.lower() or "tls" in content.lower():
ssl_enabled = True
break
except Exception as e:
continue
if not ssl_enabled:
issues.append({
"type": "no_ssl_configuration",
"severity": "warning"
})
score -= 2
return max(0, score), issues
def check_access_control(self) -> Tuple[float, List[Dict]]:
"""Check access control mechanisms"""
issues = []
score = 10.0
# Check for authentication mechanisms
auth_files = [
"apps/coordinator-api/src/app/auth/",
"apps/coordinator-api/src/app/middleware/auth.py"
]
has_auth = any((self.project_root / f).exists() for f in auth_files)
if not has_auth:
issues.append({
"type": "no_authentication_mechanism",
"severity": "critical"
})
score -= 5
# Check for role-based access control
rbac_patterns = ["role", "permission", "authorization"]
rbac_found = False
python_files = list(self.project_root.glob("**/*.py"))
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if any(pattern in content.lower() for pattern in rbac_patterns):
rbac_found = True
break
except Exception as e:
continue
if not rbac_found:
issues.append({
"type": "no_role_based_access_control",
"severity": "warning"
})
score -= 2
return max(0, score), issues
def check_data_protection(self) -> Tuple[float, List[Dict]]:
"""Check data protection measures"""
issues = []
score = 10.0
# Check for encryption usage
encryption_patterns = ["encrypt", "decrypt", "cipher", "hash"]
encryption_found = False
python_files = list(self.project_root.glob("**/*.py"))
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if any(pattern in content.lower() for pattern in encryption_patterns):
encryption_found = True
break
except Exception as e:
continue
if not encryption_found:
issues.append({
"type": "no_encryption_mechanism",
"severity": "warning"
})
score -= 3
# Check for data validation
validation_patterns = ["validate", "sanitize", "clean"]
validation_found = False
for file_path in python_files:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if any(pattern in content.lower() for pattern in validation_patterns):
validation_found = True
break
except Exception as e:
continue
if not validation_found:
issues.append({
"type": "no_data_validation",
"severity": "warning"
})
score -= 2
return max(0, score), issues
def check_infrastructure_security(self) -> Tuple[float, List[Dict]]:
"""Check infrastructure security"""
issues = []
score = 10.0
# Check for systemd service files
service_files = list(self.project_root.glob("**/*.service"))
if service_files:
for service_file in service_files:
try:
with open(service_file, 'r') as f:
content = f.read()
# Check for running as root
if "User=root" in content or "User=root" not in content:
issues.append({
"type": "service_running_as_root",
"file": str(service_file.relative_to(self.project_root)),
"severity": "warning"
})
score -= 1
except Exception as e:
continue
# Check for Docker usage (should be none per policy)
docker_files = list(self.project_root.glob("**/Dockerfile*")) + list(self.project_root.glob("**/docker-compose*"))
if docker_files:
issues.append({
"type": "docker_usage_detected",
"files": [str(f.relative_to(self.project_root)) for f in docker_files],
"severity": "warning"
})
score -= 2
# Check for firewall configuration
firewall_configs = list(self.project_root.glob("**/firewall*")) + list(self.project_root.glob("**/ufw*"))
if not firewall_configs:
issues.append({
"type": "no_firewall_configuration",
"severity": "warning"
})
score -= 1
return max(0, score), issues
def generate_recommendations(self):
"""Generate security recommendations based on findings"""
recommendations = []
# Critical issues recommendations
critical_types = set(issue["type"] for issue in self.results["critical_issues"])
if "hardcoded_password" in critical_types:
recommendations.append({
"priority": "critical",
"action": "Remove all hardcoded passwords and use environment variables or secret management",
"files": [issue["file"] for issue in self.results["critical_issues"] if issue["type"] == "hardcoded_password"]
})
if "secrets_in_git" in critical_types:
recommendations.append({
"priority": "critical",
"action": "Remove secrets from git history and configure .gitignore properly",
"details": "Use git filter-branch to remove sensitive data from history"
})
if "unencrypted_keystore" in critical_types:
recommendations.append({
"priority": "critical",
"action": "Encrypt all keystore files using AES-GCM encryption",
"implementation": "Use the existing keystore.py encryption mechanisms"
})
if "world_writable_files" in critical_types:
recommendations.append({
"priority": "critical",
"action": "Fix world-writable file permissions immediately",
"command": "find /opt/aitbc -type f -perm /o+w -exec chmod 644 {} \\;"
})
# Warning level recommendations
warning_types = set(issue["type"] for issue in self.results["warnings"])
if "dangerous_import" in warning_types:
recommendations.append({
"priority": "high",
"action": "Replace dangerous imports with secure alternatives",
"details": "Use json instead of pickle, subprocess.run with shell=False"
})
if "no_ssl_configuration" in warning_types:
recommendations.append({
"priority": "high",
"action": "Implement SSL/TLS configuration for all network services",
"implementation": "Configure SSL certificates and HTTPS endpoints"
})
if "no_authentication_mechanism" in critical_types:
recommendations.append({
"priority": "critical",
"action": "Implement proper authentication and authorization",
"implementation": "Add JWT-based authentication with role-based access control"
})
# General recommendations
if self.results["score"] < 70:
recommendations.append({
"priority": "medium",
"action": "Conduct regular security audits and implement security testing in CI/CD",
"implementation": "Add automated security scanning to GitHub Actions"
})
self.results["recommendations"] = recommendations
def save_report(self, output_file: str):
"""Save audit report to file"""
with open(output_file, 'w') as f:
json.dump(self.results, f, indent=2)
logger.info(f"Security audit report saved to: {output_file}")
def main():
"""Main function to run security audit"""
audit = SecurityAudit()
# Run full audit
results = audit.run_full_audit()
# Save report
report_file = "/opt/aitbc/security_audit_report.json"
audit.save_report(report_file)
# Print summary
print(f"\n{'='*60}")
print(f"AITBC PRODUCTION SECURITY AUDIT REPORT")
print(f"{'='*60}")
print(f"Overall Score: {results['score']:.1f}/100")
print(f"Critical Issues: {len(results['critical_issues'])}")
print(f"Warnings: {len(results['warnings'])}")
print(f"Recommendations: {len(results['recommendations'])}")
if results['critical_issues']:
print(f"\n🚨 CRITICAL ISSUES:")
for issue in results['critical_issues'][:5]:
print(f" - {issue['type']}: {issue.get('message', 'N/A')}")
if results['recommendations']:
print(f"\n💡 TOP RECOMMENDATIONS:")
for rec in results['recommendations'][:3]:
print(f" - [{rec['priority'].upper()}] {rec['action']}")
print(f"\n📄 Full report: {report_file}")
# Exit with appropriate code
if results['score'] < 50:
sys.exit(2) # Critical security issues
elif results['score'] < 70:
sys.exit(1) # Security concerns
else:
sys.exit(0) # Acceptable security posture
if __name__ == "__main__":
main()