BEFORE: /opt/aitbc/cli/ ├── aitbc_cli/ # Python package (box in a box) │ ├── commands/ │ ├── main.py │ └── ... ├── setup.py AFTER: /opt/aitbc/cli/ # Flat structure ├── commands/ # Direct access ├── main.py # Direct access ├── auth/ ├── config/ ├── core/ ├── models/ ├── utils/ ├── plugins.py └── setup.py CHANGES MADE: - Moved all files from aitbc_cli/ to cli/ root - Fixed all relative imports (from . to absolute imports) - Updated setup.py entry point: aitbc_cli.main → main - Added CLI directory to Python path in entry script - Simplified deployment.py to remove dependency on deleted core.deployment - Fixed import paths in all command files - Recreated virtual environment with new structure BENEFITS: - Eliminated 'box in a box' nesting - Simpler directory structure - Direct access to all modules - Cleaner imports - Easier maintenance and development - CLI works with both 'python main.py' and 'aitbc' commands
100 lines
3.3 KiB
Python
Executable File
100 lines
3.3 KiB
Python
Executable File
"""
|
|
Plugin Security CLI Commands for AITBC
|
|
Commands for plugin security scanning and vulnerability detection
|
|
"""
|
|
|
|
import click
|
|
import json
|
|
import requests
|
|
from datetime import datetime
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
@click.group()
|
|
def plugin_security():
|
|
"""Plugin security management commands"""
|
|
pass
|
|
|
|
@plugin_security.command()
|
|
@click.argument('plugin_id')
|
|
@click.option('--test-mode', is_flag=True, help='Run in test mode')
|
|
def scan(plugin_id, test_mode):
|
|
"""Scan a plugin for security vulnerabilities"""
|
|
try:
|
|
if test_mode:
|
|
click.echo(f"🔒 Security scan started (test mode)")
|
|
click.echo(f"📦 Plugin ID: {plugin_id}")
|
|
click.echo(f"✅ Scan completed - No vulnerabilities found")
|
|
return
|
|
|
|
# Send to security service
|
|
config = get_config()
|
|
response = requests.post(
|
|
f"{config.coordinator_url}/api/v1/security/scan",
|
|
json={"plugin_id": plugin_id},
|
|
headers={"Authorization": f"Bearer {config.api_key}"},
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
click.echo(f"🔒 Security scan completed")
|
|
click.echo(f"📦 Plugin ID: {result['plugin_id']}")
|
|
click.echo(f"🛡️ Status: {result['status']}")
|
|
click.echo(f"🔍 Vulnerabilities: {result['vulnerabilities_count']}")
|
|
else:
|
|
click.echo(f"❌ Security scan failed: {response.text}", err=True)
|
|
|
|
except Exception as e:
|
|
click.echo(f"❌ Error scanning plugin: {str(e)}", err=True)
|
|
|
|
@plugin_security.command()
|
|
@click.option('--test-mode', is_flag=True, help='Run in test mode')
|
|
def status(test_mode):
|
|
"""Get plugin security status"""
|
|
try:
|
|
if test_mode:
|
|
click.echo("🔒 Plugin Security Status (test mode)")
|
|
click.echo("📊 Total Scans: 156")
|
|
click.echo("✅ Passed: 148")
|
|
click.echo("❌ Failed: 3")
|
|
click.echo("⏳ Pending: 5")
|
|
return
|
|
|
|
# Get status from security service
|
|
config = get_config()
|
|
response = requests.get(
|
|
f"{config.coordinator_url}/api/v1/security/status",
|
|
headers={"Authorization": f"Bearer {config.api_key}"},
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
status = response.json()
|
|
click.echo("🔒 Plugin Security Status")
|
|
click.echo(f"📊 Total Scans: {status.get('total_scans', 0)}")
|
|
click.echo(f"✅ Passed: {status.get('passed', 0)}")
|
|
click.echo(f"❌ Failed: {status.get('failed', 0)}")
|
|
click.echo(f"⏳ Pending: {status.get('pending', 0)}")
|
|
else:
|
|
click.echo(f"❌ Failed to get status: {response.text}", err=True)
|
|
|
|
except Exception as e:
|
|
click.echo(f"❌ Error getting status: {str(e)}", err=True)
|
|
|
|
# Helper function to get config
|
|
def get_config():
|
|
"""Get CLI configuration"""
|
|
try:
|
|
from config import get_config
|
|
return get_config()
|
|
except ImportError:
|
|
# Fallback for testing
|
|
from types import SimpleNamespace
|
|
return SimpleNamespace(
|
|
coordinator_url="http://localhost:8015",
|
|
api_key="test-api-key"
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
plugin_security()
|