Files
aitbc/cli/aitbc_cli/commands/plugin_security.py
oib 15427c96c0 chore: update file permissions to executable across repository
- Change file mode from 644 to 755 for all project files
- Add chain_id parameter to get_balance RPC endpoint with default "ait-devnet"
- Rename Miner.extra_meta_data to extra_metadata for consistency
2026-03-06 22:17:54 +01:00

100 lines
3.3 KiB
Python

"""
Plugin Security CLI Commands for AITBC
Commands for plugin security scanning and vulnerability detection
"""
import click
import json
import requests
from datetime import datetime
from typing import Dict, Any, List, Optional
@click.group()
def plugin_security():
"""Plugin security management commands"""
pass
@plugin_security.command()
@click.argument('plugin_id')
@click.option('--test-mode', is_flag=True, help='Run in test mode')
def scan(plugin_id, test_mode):
"""Scan a plugin for security vulnerabilities"""
try:
if test_mode:
click.echo(f"🔒 Security scan started (test mode)")
click.echo(f"📦 Plugin ID: {plugin_id}")
click.echo(f"✅ Scan completed - No vulnerabilities found")
return
# Send to security service
config = get_config()
response = requests.post(
f"{config.coordinator_url}/api/v1/security/scan",
json={"plugin_id": plugin_id},
headers={"Authorization": f"Bearer {config.api_key}"},
timeout=30
)
if response.status_code == 200:
result = response.json()
click.echo(f"🔒 Security scan completed")
click.echo(f"📦 Plugin ID: {result['plugin_id']}")
click.echo(f"🛡️ Status: {result['status']}")
click.echo(f"🔍 Vulnerabilities: {result['vulnerabilities_count']}")
else:
click.echo(f"❌ Security scan failed: {response.text}", err=True)
except Exception as e:
click.echo(f"❌ Error scanning plugin: {str(e)}", err=True)
@plugin_security.command()
@click.option('--test-mode', is_flag=True, help='Run in test mode')
def status(test_mode):
"""Get plugin security status"""
try:
if test_mode:
click.echo("🔒 Plugin Security Status (test mode)")
click.echo("📊 Total Scans: 156")
click.echo("✅ Passed: 148")
click.echo("❌ Failed: 3")
click.echo("⏳ Pending: 5")
return
# Get status from security service
config = get_config()
response = requests.get(
f"{config.coordinator_url}/api/v1/security/status",
headers={"Authorization": f"Bearer {config.api_key}"},
timeout=30
)
if response.status_code == 200:
status = response.json()
click.echo("🔒 Plugin Security Status")
click.echo(f"📊 Total Scans: {status.get('total_scans', 0)}")
click.echo(f"✅ Passed: {status.get('passed', 0)}")
click.echo(f"❌ Failed: {status.get('failed', 0)}")
click.echo(f"⏳ Pending: {status.get('pending', 0)}")
else:
click.echo(f"❌ Failed to get status: {response.text}", err=True)
except Exception as e:
click.echo(f"❌ Error getting status: {str(e)}", err=True)
# Helper function to get config
def get_config():
"""Get CLI configuration"""
try:
from .config import get_config
return get_config()
except ImportError:
# Fallback for testing
from types import SimpleNamespace
return SimpleNamespace(
coordinator_url="http://localhost:8015",
api_key="test-api-key"
)
if __name__ == "__main__":
plugin_security()