Files
aitbc/cli/aitbc_cli/core/plugins.py
aitbc 7ced360c1f
Some checks failed
CLI Tests / test-cli (push) Has been cancelled
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
refactor: move cli/core/ to cli/aitbc_cli/core/ for proper package structure
- Moved core/ directory to aitbc_cli/core/ to make it a proper subpackage
- Updated aitbc_cli.py to load from new path
- Simplified aitbc_cli/__init__.py to use normal import instead of spec_from_file_location
- Updated all core imports to use aitbc_cli.core prefix
- Copied utils files (wallet_daemon_client, error_handling, crypto_utils, subprocess) to aitbc_cli/utils/
- Fixed wallet list command to work with new structure
- This fixes ModuleNotFoundError for aitbc_cli.core submodules
2026-05-26 12:17:58 +02:00

303 lines
8.6 KiB
Python
Executable File

"""Plugin system for AITBC CLI custom commands"""
import importlib
import importlib.util
import json
import click
from pathlib import Path
from typing import Optional
PLUGIN_DIR = Path.home() / ".aitbc" / "plugins"
def get_plugin_dir() -> Path:
"""Get and ensure plugin directory exists"""
PLUGIN_DIR.mkdir(parents=True, exist_ok=True)
return PLUGIN_DIR
def load_plugins(cli_group):
"""Load all plugins and register them with the CLI group"""
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
if not manifest_file.exists():
return
with open(manifest_file) as f:
manifest = json.load(f)
for plugin_info in manifest.get("plugins", []):
if not plugin_info.get("enabled", True):
continue
plugin_path = plugin_dir / plugin_info["file"]
if not plugin_path.exists():
continue
try:
spec = importlib.util.spec_from_file_location(
plugin_info["name"], str(plugin_path)
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Look for a click group or command named 'plugin_command'
if hasattr(module, "plugin_command"):
cli_group.add_command(module.plugin_command)
except Exception:
pass # Skip broken plugins silently
@click.group()
def plugin():
"""Manage CLI plugins"""
pass
@plugin.command(name="list")
@click.pass_context
def list_plugins(ctx):
"""List installed plugins"""
from .utils import output
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
if not manifest_file.exists():
output({"message": "No plugins installed"}, ctx.obj.get('output_format', 'table'))
return
with open(manifest_file) as f:
manifest = json.load(f)
plugins = manifest.get("plugins", [])
if not plugins:
output({"message": "No plugins installed"}, ctx.obj.get('output_format', 'table'))
else:
output(plugins, ctx.obj.get('output_format', 'table'))
@plugin.command()
@click.argument("name")
@click.argument("file_path", type=click.Path(exists=True))
@click.option("--description", default="", help="Plugin description")
@click.pass_context
def install(ctx, name: str, file_path: str, description: str):
"""Install a plugin from a Python file"""
import shutil
from .utils import output, error, success
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
# Copy plugin file
dest = plugin_dir / f"{name}.py"
shutil.copy2(file_path, dest)
# Update manifest
manifest = {"plugins": []}
if manifest_file.exists():
with open(manifest_file) as f:
manifest = json.load(f)
# Remove existing entry with same name
manifest["plugins"] = [p for p in manifest["plugins"] if p["name"] != name]
manifest["plugins"].append({
"name": name,
"file": f"{name}.py",
"description": description,
"enabled": True
})
with open(manifest_file, "w") as f:
json.dump(manifest, f, indent=2)
success(f"Plugin '{name}' installed")
output({"name": name, "file": str(dest), "status": "installed"}, ctx.obj.get('output_format', 'table'))
@plugin.command()
@click.argument("name")
@click.pass_context
def uninstall(ctx, name: str):
"""Uninstall a plugin"""
from .utils import output, error, success
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
if not manifest_file.exists():
error(f"Plugin '{name}' not found")
return
with open(manifest_file) as f:
manifest = json.load(f)
plugin_entry = next((p for p in manifest["plugins"] if p["name"] == name), None)
if not plugin_entry:
error(f"Plugin '{name}' not found")
return
# Remove file
plugin_file = plugin_dir / plugin_entry["file"]
if plugin_file.exists():
plugin_file.unlink()
# Update manifest
manifest["plugins"] = [p for p in manifest["plugins"] if p["name"] != name]
with open(manifest_file, "w") as f:
json.dump(manifest, f, indent=2)
success(f"Plugin '{name}' uninstalled")
output({"name": name, "status": "uninstalled"}, ctx.obj.get('output_format', 'table'))
@plugin.command()
@click.argument("name")
@click.option("--type", default="cli", help="Plugin type (cli, web, blockchain, ai)")
@click.option("--description", default="", help="Plugin description")
@click.option("--author", default="", help="Plugin author")
@click.pass_context
def create(ctx, name: str, type: str, description: str, author: str):
"""Create a new plugin skeleton"""
from .utils import output, success
plugin_dir = get_plugin_dir()
plugin_file = plugin_dir / f"{name}.py"
if plugin_file.exists():
from .utils import error
error(f"Plugin '{name}' already exists")
return
# Create plugin skeleton
template = f'''"""{name} - {description}"""
import click
@click.group()
def plugin_command():
"""{name} plugin commands"""
pass
@plugin_command.command()
def hello():
"""Hello from {name} plugin"""
click.echo("Hello from {name} plugin!")
'''
with open(plugin_file, "w") as f:
f.write(template)
# Update manifest
manifest_file = plugin_dir / "plugins.json"
manifest = {"plugins": []}
if manifest_file.exists():
with open(manifest_file) as f:
manifest = json.load(f)
manifest["plugins"].append({
"name": name,
"file": f"{name}.py",
"description": description,
"author": author,
"type": type,
"enabled": True
})
with open(manifest_file, "w") as f:
json.dump(manifest, f, indent=2)
success(f"Plugin '{name}' created")
output({"name": name, "file": str(plugin_file), "type": type}, ctx.obj.get('output_format', 'table'))
@plugin.command()
@click.argument("name")
@click.option("--output", default=".", help="Output directory")
@click.pass_context
def package(ctx, name: str, output: str):
"""Package a plugin for distribution"""
from .utils import output, success, error
import shutil
from pathlib import Path
import tarfile
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
if not manifest_file.exists():
error(f"Plugin '{name}' not found")
return
with open(manifest_file) as f:
manifest = json.load(f)
plugin_entry = next((p for p in manifest["plugins"] if p["name"] == name), None)
if not plugin_entry:
error(f"Plugin '{name}' not found")
return
plugin_file = plugin_dir / plugin_entry["file"]
if not plugin_file.exists():
error(f"Plugin file '{plugin_entry['file']}' not found")
return
# Create package
output_dir = Path(output)
output_dir.mkdir(parents=True, exist_ok=True)
package_file = output_dir / f"{name}.tar.gz"
with tarfile.open(package_file, "w:gz") as tar:
tar.add(plugin_file, arcname=plugin_file.name)
# Add metadata
metadata = json.dumps({
"name": name,
"description": plugin_entry.get("description", ""),
"author": plugin_entry.get("author", ""),
"type": plugin_entry.get("type", "cli"),
"version": "1.0.0"
})
metadata_file = output_dir / "metadata.json"
with open(metadata_file, "w") as f:
f.write(metadata)
tar.add(metadata_file, arcname="metadata.json")
metadata_file.unlink()
success(f"Plugin '{name}' packaged to {package_file}")
output({"name": name, "package": str(package_file)}, ctx.obj.get('output_format', 'table'))
@plugin.command()
@click.argument("name")
@click.argument("state", type=click.Choice(["enable", "disable"]))
@click.pass_context
def toggle(ctx, name: str, state: str):
"""Enable or disable a plugin"""
from .utils import output, error, success
plugin_dir = get_plugin_dir()
manifest_file = plugin_dir / "plugins.json"
if not manifest_file.exists():
error(f"Plugin '{name}' not found")
return
with open(manifest_file) as f:
manifest = json.load(f)
plugin_entry = next((p for p in manifest["plugins"] if p["name"] == name), None)
if not plugin_entry:
error(f"Plugin '{name}' not found")
return
plugin_entry["enabled"] = (state == "enable")
with open(manifest_file, "w") as f:
json.dump(manifest, f, indent=2)
success(f"Plugin '{name}' {'enabled' if state == 'enable' else 'disabled'}")
output({"name": name, "enabled": plugin_entry["enabled"]}, ctx.obj.get('output_format', 'table'))