Files
aitbc/cli/commands/plugin_security.py
aitbc1 c0952c2525 refactor: flatten CLI directory structure - remove 'box in a box'
BEFORE:
/opt/aitbc/cli/
├── aitbc_cli/                    # Python package (box in a box)
│   ├── commands/
│   ├── main.py
│   └── ...
├── setup.py

AFTER:
/opt/aitbc/cli/                    # Flat structure
├── commands/                      # Direct access
├── main.py                        # Direct access
├── auth/
├── config/
├── core/
├── models/
├── utils/
├── plugins.py
└── setup.py

CHANGES MADE:
- Moved all files from aitbc_cli/ to cli/ root
- Fixed all relative imports (from . to absolute imports)
- Updated setup.py entry point: aitbc_cli.main → main
- Added CLI directory to Python path in entry script
- Simplified deployment.py to remove dependency on deleted core.deployment
- Fixed import paths in all command files
- Recreated virtual environment with new structure

BENEFITS:
- Eliminated 'box in a box' nesting
- Simpler directory structure
- Direct access to all modules
- Cleaner imports
- Easier maintenance and development
- CLI works with both 'python main.py' and 'aitbc' commands
2026-03-26 09:12:02 +01:00

100 lines
3.3 KiB
Python
Executable File

"""
Plugin Security CLI Commands for AITBC
Commands for plugin security scanning and vulnerability detection
"""
import click
import json
import requests
from datetime import datetime
from typing import Dict, Any, List, Optional
@click.group()
def plugin_security():
"""Plugin security management commands"""
pass
@plugin_security.command()
@click.argument('plugin_id')
@click.option('--test-mode', is_flag=True, help='Run in test mode')
def scan(plugin_id, test_mode):
"""Scan a plugin for security vulnerabilities"""
try:
if test_mode:
click.echo(f"🔒 Security scan started (test mode)")
click.echo(f"📦 Plugin ID: {plugin_id}")
click.echo(f"✅ Scan completed - No vulnerabilities found")
return
# Send to security service
config = get_config()
response = requests.post(
f"{config.coordinator_url}/api/v1/security/scan",
json={"plugin_id": plugin_id},
headers={"Authorization": f"Bearer {config.api_key}"},
timeout=30
)
if response.status_code == 200:
result = response.json()
click.echo(f"🔒 Security scan completed")
click.echo(f"📦 Plugin ID: {result['plugin_id']}")
click.echo(f"🛡️ Status: {result['status']}")
click.echo(f"🔍 Vulnerabilities: {result['vulnerabilities_count']}")
else:
click.echo(f"❌ Security scan failed: {response.text}", err=True)
except Exception as e:
click.echo(f"❌ Error scanning plugin: {str(e)}", err=True)
@plugin_security.command()
@click.option('--test-mode', is_flag=True, help='Run in test mode')
def status(test_mode):
"""Get plugin security status"""
try:
if test_mode:
click.echo("🔒 Plugin Security Status (test mode)")
click.echo("📊 Total Scans: 156")
click.echo("✅ Passed: 148")
click.echo("❌ Failed: 3")
click.echo("⏳ Pending: 5")
return
# Get status from security service
config = get_config()
response = requests.get(
f"{config.coordinator_url}/api/v1/security/status",
headers={"Authorization": f"Bearer {config.api_key}"},
timeout=30
)
if response.status_code == 200:
status = response.json()
click.echo("🔒 Plugin Security Status")
click.echo(f"📊 Total Scans: {status.get('total_scans', 0)}")
click.echo(f"✅ Passed: {status.get('passed', 0)}")
click.echo(f"❌ Failed: {status.get('failed', 0)}")
click.echo(f"⏳ Pending: {status.get('pending', 0)}")
else:
click.echo(f"❌ Failed to get status: {response.text}", err=True)
except Exception as e:
click.echo(f"❌ Error getting status: {str(e)}", err=True)
# Helper function to get config
def get_config():
"""Get CLI configuration"""
try:
from config import get_config
return get_config()
except ImportError:
# Fallback for testing
from types import SimpleNamespace
return SimpleNamespace(
coordinator_url="http://localhost:8015",
api_key="test-api-key"
)
if __name__ == "__main__":
plugin_security()