Files
aitbc/cli/src/aitbc_cli/commands/workflow.py
aitbc a7b6e39cdf fix: replace all print() with click.echo() or logger in CLI production code
- 55 CLI files: handlers/, aitbc_cli/commands/, cli/core/, cli/utils/, top-level scripts
- Click-based files: print() -> click.echo()
- Library modules: print() -> logger.info/error/warning
- Fixed pre-existing indentation bugs in monitor.py dashboard function
- Fixed bare print() -> logger.info('') in chain_manager.py
- 0 remaining print() in production CLI code
- All files compile cleanly
2026-05-25 13:53:49 +02:00

74 lines
2.1 KiB
Python

"""
Workflow commands for AITBC CLI
"""
import json
import time
from typing import Optional
import click
from ..utils import error, success
@click.group()
def workflow():
"""Workflow management commands"""
pass
@workflow.command()
@click.argument('workflow_name')
@click.option('--config', help='Workflow configuration file')
@click.option('--dry-run', is_flag=True, help='Dry run without executing')
def run(workflow_name: str, config: Optional[str], dry_run: bool):
"""Run a workflow"""
if dry_run:
success(f"Dry run for workflow {workflow_name}")
click.echo("Would execute workflow without making changes")
return
success(f"Run workflow {workflow_name}")
if config:
click.echo(f"Using config: {config}")
# TODO: Implement actual workflow execution logic
click.echo(f"Execution ID: wf_exec_{int(time.time())}")
click.echo("Status: Running")
@workflow.command()
@click.option('--format', type=click.Choice(['table', 'json']), default='table', help='Output format')
def list(format: str):
"""List available workflows"""
success("Available workflows:")
workflows = [
{"name": "gpu-marketplace", "status": "active", "steps": 5},
{"name": "ai-job-processing", "status": "active", "steps": 3},
{"name": "mining-optimization", "status": "inactive", "steps": 4}
]
if format == 'json':
click.echo(json.dumps(workflows, indent=2))
else:
for wf in workflows:
click.echo(f" - {wf['name']}: {wf['status']} ({wf['steps']} steps)")
@workflow.command()
@click.argument('workflow_name')
def status(workflow_name: str):
"""Get workflow status"""
success(f"Get status for workflow {workflow_name}")
# TODO: Implement actual status check from workflow engine
click.echo("Status: Not running")
click.echo("Last execution: Never")
@workflow.command()
@click.argument('workflow_name')
def stop(workflow_name: str):
"""Stop a running workflow"""
success(f"Stop workflow {workflow_name}")
# TODO: Implement actual stop command via workflow engine