"""Utility functions for AITBC CLI""" import time import logging import sys import os from pathlib import Path from typing import Any, Optional, Callable, Iterator from contextlib import contextmanager from rich.console import Console from rich.logging import RichHandler from rich.table import Table from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn import json import yaml from tabulate import tabulate console = Console() @contextmanager def progress_bar(description: str = "Working...", total: Optional[int] = None): """Context manager for progress bar display""" with Progress( SpinnerColumn(), TextColumn("[bold blue]{task.description}"), BarColumn(), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeElapsedColumn(), console=console, ) as progress: task = progress.add_task(description, total=total) yield progress, task def progress_spinner(description: str = "Working..."): """Simple spinner for indeterminate operations""" return console.status(f"[bold blue]{description}") class AuditLogger: """Audit logging for CLI operations""" def __init__(self, log_dir: Optional[Path] = None): self.log_dir = log_dir or Path.home() / ".aitbc" / "audit" self.log_dir.mkdir(parents=True, exist_ok=True) self.log_file = self.log_dir / "audit.jsonl" def log(self, action: str, details: dict = None, user: str = None): """Log an audit event""" import datetime entry = { "timestamp": datetime.datetime.now().isoformat(), "action": action, "user": user or os.environ.get("USER", "unknown"), "details": details or {} } with open(self.log_file, "a") as f: f.write(json.dumps(entry) + "\n") def get_logs(self, limit: int = 50, action_filter: str = None) -> list: """Read audit log entries""" if not self.log_file.exists(): return [] entries = [] with open(self.log_file) as f: for line in f: line = line.strip() if line: entry = json.loads(line) if action_filter and entry.get("action") != action_filter: continue entries.append(entry) return entries[-limit:] def _get_fernet_key(key: str = None) -> bytes: """Derive a Fernet key from a password or use default""" from cryptography.fernet import Fernet import base64 import hashlib if key is None: # Use a default key (should be overridden in production) key = "aitbc_config_key_2026_default" # Derive a 32-byte key suitable for Fernet return base64.urlsafe_b64encode(hashlib.sha256(key.encode()).digest()) def encrypt_value(value: str, key: str = None) -> str: """Encrypt a value using Fernet symmetric encryption""" from cryptography.fernet import Fernet import base64 fernet_key = _get_fernet_key(key) f = Fernet(fernet_key) encrypted = f.encrypt(value.encode()) return base64.b64encode(encrypted).decode() def decrypt_value(encrypted: str, key: str = None) -> str: """Decrypt a Fernet-encrypted value""" from cryptography.fernet import Fernet import base64 fernet_key = _get_fernet_key(key) f = Fernet(fernet_key) data = base64.b64decode(encrypted) return f.decrypt(data).decode() def setup_logging(verbosity: int, debug: bool = False) -> str: """Setup logging with Rich""" log_level = "WARNING" if verbosity >= 3 or debug: log_level = "DEBUG" elif verbosity == 2: log_level = "INFO" elif verbosity == 1: log_level = "WARNING" logging.basicConfig( level=log_level, format="%(message)s", datefmt="[%X]", handlers=[RichHandler(console=console, rich_tracebacks=True)] ) return log_level def output(data: Any, format_type: str = "table", title: str = None): """Format and output data""" if format_type == "json": console.print(json.dumps(data, indent=2, default=str)) elif format_type == "yaml": console.print(yaml.dump(data, default_flow_style=False, sort_keys=False)) elif format_type == "table": if isinstance(data, dict) and not isinstance(data, list): # Simple key-value table table = Table(show_header=False, box=None, title=title) table.add_column("Key", style="cyan") table.add_column("Value", style="green") for key, value in data.items(): if isinstance(value, (dict, list)): value = json.dumps(value, default=str) table.add_row(str(key), str(value)) console.print(table) elif isinstance(data, list) and data: if all(isinstance(item, dict) for item in data): # Table from list of dicts headers = list(data[0].keys()) table = Table() for header in headers: table.add_column(header, style="cyan") for item in data: row = [str(item.get(h, "")) for h in headers] table.add_row(*row) console.print(table) else: # Simple list for item in data: console.print(f"• {item}") else: console.print(data) else: console.print(data) def error(message: str): """Print error message""" console.print(Panel(f"[red]Error: {message}[/red]", title="❌")) def success(message: str): """Print success message""" console.print(Panel(f"[green]{message}[/green]", title="✅")) def warning(message: str): """Print warning message""" console.print(Panel(f"[yellow]{message}[/yellow]", title="⚠️")) def retry_with_backoff( func, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, backoff_factor: float = 2.0, exceptions: tuple = (Exception,) ): """ Retry function with exponential backoff Args: func: Function to retry max_retries: Maximum number of retries base_delay: Initial delay in seconds max_delay: Maximum delay in seconds backoff_factor: Multiplier for delay after each retry exceptions: Tuple of exceptions to catch and retry on Returns: Result of function call """ last_exception = None for attempt in range(max_retries + 1): try: return func() except exceptions as e: last_exception = e if attempt == max_retries: error(f"Max retries ({max_retries}) exceeded. Last error: {e}") raise # Calculate delay with exponential backoff delay = min(base_delay * (backoff_factor ** attempt), max_delay) warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s...") time.sleep(delay) raise last_exception def create_http_client_with_retry( max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, timeout: float = 30.0 ): """ Create an HTTP client with retry capabilities Args: max_retries: Maximum number of retries base_delay: Initial delay in seconds max_delay: Maximum delay in seconds timeout: Request timeout in seconds Returns: httpx.Client with retry transport """ import httpx class RetryTransport(httpx.Transport): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.max_retries = max_retries self.base_delay = base_delay self.max_delay = max_delay self.backoff_factor = 2.0 def handle_request(self, request): last_exception = None for attempt in range(self.max_retries + 1): try: return super().handle_request(request) except (httpx.NetworkError, httpx.TimeoutException) as e: last_exception = e if attempt == self.max_retries: break delay = min( self.base_delay * (self.backoff_factor ** attempt), self.max_delay ) time.sleep(delay) raise last_exception return httpx.Client( transport=RetryTransport(), timeout=timeout )