Files
aitbc/cli/core/plugins.py
aitbc c4a2afee37
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Integration Tests / test-service-integration (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
CLI Tests / test-cli (push) Has been cancelled
Add missing plugin CLI commands and REST API endpoint
CLI:
- Added 'aitbc plugin create' - Create new plugin skeleton
- Added 'aitbc plugin package' - Package plugin for distribution
- Added 'aitbc market list-plugin' - List marketplace plugins

Marketplace service:
- Added GET /v1/marketplace/plugins endpoint
- Returns mock plugin data (cli_enhancer, web_dashboard, security_scanner)

Fixes scenario doc references to non-existent plugin commands
2026-05-15 00:24:29 +02:00

303 lines
8.6 KiB
Python
Executable File

"""Plugin system for AITBC CLI custom commands"""
import importlib
import importlib.util
import json
import click
from pathlib import Path
from typing import Optional
PLUGIN_DIR = Path.home() / ".aitbc" / "plugins"
def get_plugin_dir() -> Path:
"""Get and ensure plugin directory exists"""
PLUGIN_DIR.mkdir(parents=True, exist_ok=True)
return PLUGIN_DIR
def load_plugins(cli_group):
"""Load all plugins and register them with the CLI group"""
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
if not manifest_file.exists():
return
with open(manifest_file) as f:
manifest = json.load(f)
for plugin_info in manifest.get("plugins", []):
if not plugin_info.get("enabled", True):
continue
plugin_path = plugin_dir / plugin_info["file"]
if not plugin_path.exists():
continue
try:
spec = importlib.util.spec_from_file_location(
plugin_info["name"], str(plugin_path)
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Look for a click group or command named 'plugin_command'
if hasattr(module, "plugin_command"):
cli_group.add_command(module.plugin_command)
except Exception:
pass # Skip broken plugins silently
@click.group()
def plugin():
"""Manage CLI plugins"""
pass
@plugin.command(name="list")
@click.pass_context
def list_plugins(ctx):
"""List installed plugins"""
from .utils import output
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
if not manifest_file.exists():
output({"message": "No plugins installed"}, ctx.obj.get('output_format', 'table'))
return
with open(manifest_file) as f:
manifest = json.load(f)
plugins = manifest.get("plugins", [])
if not plugins:
output({"message": "No plugins installed"}, ctx.obj.get('output_format', 'table'))
else:
output(plugins, ctx.obj.get('output_format', 'table'))
@plugin.command()
@click.argument("name")
@click.argument("file_path", type=click.Path(exists=True))
@click.option("--description", default="", help="Plugin description")
@click.pass_context
def install(ctx, name: str, file_path: str, description: str):
"""Install a plugin from a Python file"""
import shutil
from .utils import output, error, success
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
# Copy plugin file
dest = plugin_dir / f"{name}.py"
shutil.copy2(file_path, dest)
# Update manifest
manifest = {"plugins": []}
if manifest_file.exists():
with open(manifest_file) as f:
manifest = json.load(f)
# Remove existing entry with same name
manifest["plugins"] = [p for p in manifest["plugins"] if p["name"] != name]
manifest["plugins"].append({
"name": name,
"file": f"{name}.py",
"description": description,
"enabled": True
})
with open(manifest_file, "w") as f:
json.dump(manifest, f, indent=2)
success(f"Plugin '{name}' installed")
output({"name": name, "file": str(dest), "status": "installed"}, ctx.obj.get('output_format', 'table'))
@plugin.command()
@click.argument("name")
@click.pass_context
def uninstall(ctx, name: str):
"""Uninstall a plugin"""
from .utils import output, error, success
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
if not manifest_file.exists():
error(f"Plugin '{name}' not found")
return
with open(manifest_file) as f:
manifest = json.load(f)
plugin_entry = next((p for p in manifest["plugins"] if p["name"] == name), None)
if not plugin_entry:
error(f"Plugin '{name}' not found")
return
# Remove file
plugin_file = plugin_dir / plugin_entry["file"]
if plugin_file.exists():
plugin_file.unlink()
# Update manifest
manifest["plugins"] = [p for p in manifest["plugins"] if p["name"] != name]
with open(manifest_file, "w") as f:
json.dump(manifest, f, indent=2)
success(f"Plugin '{name}' uninstalled")
output({"name": name, "status": "uninstalled"}, ctx.obj.get('output_format', 'table'))
@plugin.command()
@click.argument("name")
@click.option("--type", default="cli", help="Plugin type (cli, web, blockchain, ai)")
@click.option("--description", default="", help="Plugin description")
@click.option("--author", default="", help="Plugin author")
@click.pass_context
def create(ctx, name: str, type: str, description: str, author: str):
"""Create a new plugin skeleton"""
from .utils import output, success
plugin_dir = get_plugin_dir()
plugin_file = plugin_dir / f"{name}.py"
if plugin_file.exists():
from .utils import error
error(f"Plugin '{name}' already exists")
return
# Create plugin skeleton
template = f'''"""{name} - {description}"""
import click
@click.group()
def plugin_command():
"""{name} plugin commands"""
pass
@plugin_command.command()
def hello():
"""Hello from {name} plugin"""
click.echo("Hello from {name} plugin!")
'''
with open(plugin_file, "w") as f:
f.write(template)
# Update manifest
manifest_file = plugin_dir / "plugins.json"
manifest = {"plugins": []}
if manifest_file.exists():
with open(manifest_file) as f:
manifest = json.load(f)
manifest["plugins"].append({
"name": name,
"file": f"{name}.py",
"description": description,
"author": author,
"type": type,
"enabled": True
})
with open(manifest_file, "w") as f:
json.dump(manifest, f, indent=2)
success(f"Plugin '{name}' created")
output({"name": name, "file": str(plugin_file), "type": type}, ctx.obj.get('output_format', 'table'))
@plugin.command()
@click.argument("name")
@click.option("--output", default=".", help="Output directory")
@click.pass_context
def package(ctx, name: str, output: str):
"""Package a plugin for distribution"""
from .utils import output, success, error
import shutil
from pathlib import Path
import tarfile
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
if not manifest_file.exists():
error(f"Plugin '{name}' not found")
return
with open(manifest_file) as f:
manifest = json.load(f)
plugin_entry = next((p for p in manifest["plugins"] if p["name"] == name), None)
if not plugin_entry:
error(f"Plugin '{name}' not found")
return
plugin_file = plugin_dir / plugin_entry["file"]
if not plugin_file.exists():
error(f"Plugin file '{plugin_entry['file']}' not found")
return
# Create package
output_dir = Path(output)
output_dir.mkdir(parents=True, exist_ok=True)
package_file = output_dir / f"{name}.tar.gz"
with tarfile.open(package_file, "w:gz") as tar:
tar.add(plugin_file, arcname=plugin_file.name)
# Add metadata
metadata = json.dumps({
"name": name,
"description": plugin_entry.get("description", ""),
"author": plugin_entry.get("author", ""),
"type": plugin_entry.get("type", "cli"),
"version": "1.0.0"
})
metadata_file = output_dir / "metadata.json"
with open(metadata_file, "w") as f:
f.write(metadata)
tar.add(metadata_file, arcname="metadata.json")
metadata_file.unlink()
success(f"Plugin '{name}' packaged to {package_file}")
output({"name": name, "package": str(package_file)}, ctx.obj.get('output_format', 'table'))
@plugin.command()
@click.argument("name")
@click.argument("state", type=click.Choice(["enable", "disable"]))
@click.pass_context
def toggle(ctx, name: str, state: str):
"""Enable or disable a plugin"""
from .utils import output, error, success
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
if not manifest_file.exists():
error(f"Plugin '{name}' not found")
return
with open(manifest_file) as f:
manifest = json.load(f)
plugin_entry = next((p for p in manifest["plugins"] if p["name"] == name), None)
if not plugin_entry:
error(f"Plugin '{name}' not found")
return
plugin_entry["enabled"] = (state == "enable")
with open(manifest_file, "w") as f:
json.dump(manifest, f, indent=2)
success(f"Plugin '{name}' {'enabled' if state == 'enable' else 'disabled'}")
output({"name": name, "enabled": plugin_entry["enabled"]}, ctx.obj.get('output_format', 'table'))