refactor: move cli/core/ to cli/aitbc_cli/core/ for proper package structure
Some checks failed
CLI Tests / test-cli (push) Has been cancelled
Cross-Node Transaction Testing / transaction-test (push) Has been cancelled
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled

- Moved core/ directory to aitbc_cli/core/ to make it a proper subpackage
- Updated aitbc_cli.py to load from new path
- Simplified aitbc_cli/__init__.py to use normal import instead of spec_from_file_location
- Updated all core imports to use aitbc_cli.core prefix
- Copied utils files (wallet_daemon_client, error_handling, crypto_utils, subprocess) to aitbc_cli/utils/
- Fixed wallet list command to work with new structure
- This fixes ModuleNotFoundError for aitbc_cli.core submodules
This commit is contained in:
aitbc
2026-05-26 12:17:58 +02:00
parent f8150ae1aa
commit 7ced360c1f
21 changed files with 1031 additions and 45 deletions

View File

@@ -29,7 +29,7 @@ def _load_cli_module() -> ModuleType:
if _CLI_MODULE is not None: if _CLI_MODULE is not None:
return _CLI_MODULE return _CLI_MODULE
cli_path = Path(__file__).with_name("core") / "main.py" cli_path = Path(__file__).parent / "aitbc_cli" / "core" / "main.py"
spec = importlib.util.spec_from_file_location("aitbc_cli_core_main", cli_path) spec = importlib.util.spec_from_file_location("aitbc_cli_core_main", cli_path)
if spec is None or spec.loader is None: if spec is None or spec.loader is None:
raise ImportError(f"Unable to load modular CLI entrypoint from {cli_path}") raise ImportError(f"Unable to load modular CLI entrypoint from {cli_path}")

View File

@@ -7,7 +7,7 @@ import sys
__version__ = "0.1.0" __version__ = "0.1.0"
__author__ = "AITBC Team" __author__ = "AITBC Team"
__email__ = "team@aitbc.net" __email__ = "andreas.fleckl@bubuit.net"
# Provide compatibility aliases for source-tree imports used by modular commands. # Provide compatibility aliases for source-tree imports used by modular commands.
# Note: core and models are sibling directories, not subpackages of aitbc_cli # Note: core and models are sibling directories, not subpackages of aitbc_cli
@@ -26,32 +26,9 @@ def __getattr__(name: str):
if name not in __all__: if name not in __all__:
raise AttributeError(f"module {__name__!r} has no attribute {name!r}") raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
# Set up sys.modules for proper package resolution # core/ is now a proper subpackage of aitbc_cli/, so we can import normally
# core/ is a sibling of aitbc_cli/, but imports reference it as aitbc_cli.core from aitbc_cli.core.main import main, cli
# We need to register these in sys.modules before loading core/main.py
import importlib.util
from pathlib import Path
cli_dir = Path(__file__).parent value = cli if name == "cli" else main
core_dir = cli_dir.parent / "core"
# Register aitbc_cli.core as a module pointing to the core/ directory
if "aitbc_cli.core" not in sys.modules:
core_spec = importlib.util.spec_from_file_location("aitbc_cli.core", core_dir / "__init__.py")
if core_spec and core_spec.loader:
core_module = importlib.util.module_from_spec(core_spec)
sys.modules["aitbc_cli.core"] = core_module
# Load core/main.py and register it as aitbc_cli.core.main
cli_path = core_dir / "main.py"
spec = importlib.util.spec_from_file_location("aitbc_cli.core.main", cli_path)
if spec is None or spec.loader is None:
raise ImportError(f"Unable to load modular CLI entrypoint from {cli_path}")
module = importlib.util.module_from_spec(spec)
sys.modules["aitbc_cli.core.main"] = module
spec.loader.exec_module(module)
value = module.cli if name == "cli" else module.main
globals()[name] = value globals()[name] = value
return value return value

View File

@@ -19,8 +19,8 @@ try:
from ..core.node_client import NodeClient from ..core.node_client import NodeClient
except ImportError: except ImportError:
from utils import output, error, success, warning from utils import output, error, success, warning
from core.config import MultiChainConfig, load_multichain_config, get_default_node_config, add_node_config, remove_node_config from aitbc_cli.core.config import MultiChainConfig, load_multichain_config, get_default_node_config, add_node_config, remove_node_config
from core.node_client import NodeClient from aitbc_cli.core.node_client import NodeClient
def info(message): def info(message):
click.echo(message) click.echo(message)

View File

@@ -13,8 +13,9 @@ from enum import Enum
import uuid import uuid
from collections import defaultdict from collections import defaultdict
from core.config import MultiChainConfig from aitbc_cli.core.config import MultiChainConfig
from core.node_client import NodeClient from aitbc_cli.core.node_client import NodeClient
from aitbc_cli.models.chain import ChainInfo, ChainType, ChainStatus
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -11,9 +11,9 @@ from dataclasses import dataclass, asdict
from collections import defaultdict, deque from collections import defaultdict, deque
import statistics import statistics
from core.config import MultiChainConfig from aitbc_cli.core.config import MultiChainConfig
from core.node_client import NodeClient from aitbc_cli.core.node_client import NodeClient
from models.chain import ChainInfo, ChainType, ChainStatus from aitbc_cli.models.chain import ChainInfo, ChainType, ChainStatus
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -8,8 +8,8 @@ import yaml
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
from core.config import MultiChainConfig from aitbc_cli.core.config import MultiChainConfig
from models.chain import GenesisBlock, GenesisConfig, ChainType, ConsensusAlgorithm from aitbc_cli.models.chain import GenesisBlock, GenesisConfig, ChainType, ConsensusAlgorithm
class GenesisValidationError(Exception): class GenesisValidationError(Exception):
"""Genesis validation error""" """Genesis validation error"""

View File

@@ -7,7 +7,7 @@ import sys
from pathlib import Path from pathlib import Path
# Ensure parent directory is on path for aitbc_cli imports # Ensure parent directory is on path for aitbc_cli imports
CLI_DIR = Path(__file__).parent.parent CLI_DIR = Path(__file__).parent.parent.parent
if str(CLI_DIR) not in sys.path: if str(CLI_DIR) not in sys.path:
sys.path.insert(0, str(CLI_DIR)) sys.path.insert(0, str(CLI_DIR))

View File

@@ -14,8 +14,9 @@ import uuid
from decimal import Decimal from decimal import Decimal
from collections import defaultdict from collections import defaultdict
from core.config import MultiChainConfig from aitbc_cli.core.config import MultiChainConfig
from core.node_client import NodeClient from aitbc_cli.core.node_client import NodeClient
from aitbc_cli.models.chain import ChainInfo, ChainType, ChainStatus
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -8,8 +8,8 @@ import json
import os import os
import logging import logging
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
from core.config import NodeConfig from aitbc_cli.core.config import NodeConfig
from models.chain import ChainInfo, ChainType, ChainStatus, ConsensusAlgorithm from aitbc_cli.models.chain import ChainInfo, ChainType, ChainStatus, ConsensusAlgorithm
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -0,0 +1,237 @@
"""
Cryptographic Utilities for CLI Security
Provides real signature verification for multisig operations
"""
import hashlib
import secrets
from typing import Dict, Optional, Tuple
from eth_account import Account
from eth_utils import to_checksum_address, keccak
import json
def create_signature_challenge(tx_data: Dict, nonce: str) -> str:
"""
Create a cryptographic challenge for transaction signing
Args:
tx_data: Transaction data to sign
nonce: Unique nonce to prevent replay attacks
Returns:
Challenge string to be signed
"""
# Create deterministic challenge from transaction data
challenge_data = {
"tx_id": tx_data.get("tx_id"),
"to": tx_data.get("to"),
"amount": tx_data.get("amount"),
"nonce": nonce,
"timestamp": tx_data.get("timestamp")
}
# Sort keys for deterministic ordering
challenge_str = json.dumps(challenge_data, sort_keys=True, separators=(',', ':'))
challenge_hash = keccak(challenge_str.encode())
return f"AITBC_MULTISIG_CHALLENGE:{challenge_hash.hex()}"
def verify_signature(
challenge: str,
signature: str,
signer_address: str
) -> bool:
"""
Verify that a signature was created by the specified signer
Args:
challenge: Challenge string that was signed
signature: Hex signature string
signer_address: Expected signer address
Returns:
True if signature is valid
"""
try:
# Remove 0x prefix if present
if signature.startswith("0x"):
signature = signature[2:]
# Convert to bytes
signature_bytes = bytes.fromhex(signature)
# Recover address from signature
message_hash = keccak(challenge.encode())
recovered_address = Account.recover_message(
signable_hash=message_hash,
signature=signature_bytes
)
# Compare with expected signer
return to_checksum_address(recovered_address) == to_checksum_address(signer_address)
except Exception:
return False
def sign_challenge(challenge: str, private_key: str) -> str:
"""
Sign a challenge with a private key
Args:
challenge: Challenge string to sign
private_key: Private key in hex format
Returns:
Signature as hex string
"""
try:
# Remove 0x prefix if present
if private_key.startswith("0x"):
private_key = private_key[2:]
account = Account.from_key("0x" + private_key)
message_hash = keccak(challenge.encode())
signature = account.sign_message(message_hash)
return "0x" + signature.signature.hex()
except Exception as e:
raise ValueError(f"Failed to sign challenge: {e}")
def generate_nonce() -> str:
"""Generate a secure nonce for transaction challenges"""
return secrets.token_hex(16)
def validate_multisig_transaction(tx_data: Dict) -> Tuple[bool, str]:
"""
Validate multisig transaction structure
Args:
tx_data: Transaction data to validate
Returns:
Tuple of (is_valid, error_message)
"""
required_fields = ["tx_id", "to", "amount", "timestamp", "nonce"]
for field in required_fields:
if field not in tx_data:
return False, f"Missing required field: {field}"
# Validate address format (AITBC addresses start with 'ait')
to_address = tx_data["to"]
if not to_address.startswith("ait"):
return False, "Invalid recipient address format: must start with 'ait'"
if len(to_address) < 50 or len(to_address) > 70:
return False, "Invalid recipient address format: invalid length"
# Check that the rest is hex-like (after 'ait' prefix)
if not all(c.lower() in '0123456789abcdef' for c in to_address[3:]):
return False, "Invalid recipient address format: invalid characters"
# Validate amount
try:
amount = float(tx_data["amount"])
if amount <= 0:
return False, "Amount must be positive"
except Exception:
return False, "Invalid amount format"
return True, ""
class MultisigSecurityManager:
"""Security manager for multisig operations"""
def __init__(self):
self.pending_challenges: Dict[str, Dict] = {}
def create_signing_request(
self,
tx_data: Dict,
multisig_wallet: str
) -> Dict[str, str]:
"""
Create a signing request with cryptographic challenge
Args:
tx_data: Transaction data
multisig_wallet: Multisig wallet identifier
Returns:
Signing request with challenge
"""
# Validate transaction
is_valid, error = validate_multisig_transaction(tx_data)
if not is_valid:
raise ValueError(f"Invalid transaction: {error}")
# Generate nonce and challenge
nonce = generate_nonce()
challenge = create_signature_challenge(tx_data, nonce)
# Store challenge for verification
self.pending_challenges[tx_data["tx_id"]] = {
"challenge": challenge,
"tx_data": tx_data,
"multisig_wallet": multisig_wallet,
"nonce": nonce,
"created_at": secrets.token_hex(8)
}
return {
"tx_id": tx_data["tx_id"],
"challenge": challenge,
"nonce": nonce,
"signers_required": len(tx_data.get("required_signers", [])),
"message": f"Please sign this challenge to authorize transaction {tx_data['tx_id']}"
}
def verify_and_add_signature(
self,
tx_id: str,
signature: str,
signer_address: str
) -> Tuple[bool, str]:
"""
Verify signature and add to transaction
Args:
tx_id: Transaction ID
signature: Signature to verify
signer_address: Address of signer
Returns:
Tuple of (success, message)
"""
if tx_id not in self.pending_challenges:
return False, "Transaction not found or expired"
challenge_data = self.pending_challenges[tx_id]
challenge = challenge_data["challenge"]
# Verify signature
if not verify_signature(challenge, signature, signer_address):
return False, f"Invalid signature for signer {signer_address}"
# Check if signer is authorized
tx_data = challenge_data["tx_data"]
authorized_signers = tx_data.get("required_signers", [])
if signer_address not in authorized_signers:
return False, f"Signer {signer_address} is not authorized"
return True, "Signature verified successfully"
def cleanup_challenge(self, tx_id: str):
"""Clean up challenge after transaction completion"""
if tx_id in self.pending_challenges:
del self.pending_challenges[tx_id]
# Global security manager instance
multisig_security = MultisigSecurityManager()

View File

@@ -12,9 +12,9 @@ from typing import Dict, Any, Optional, List, Union
from datetime import datetime from datetime import datetime
sys.path.insert(0, "/opt/aitbc/cli") sys.path.insert(0, "/opt/aitbc/cli")
from utils.wallet_daemon_client import WalletDaemonClient, WalletInfo, WalletBalance, ChainInfo, WalletMigrationResult from aitbc_cli.utils.wallet_daemon_client import WalletDaemonClient, WalletInfo, WalletBalance, ChainInfo, WalletMigrationResult
from aitbc_cli.config import CLIConfig as Config from aitbc_cli.config import CLIConfig as Config
from utils import error, success, output from aitbc_cli.utils import error, success, output
class DualModeWalletAdapter: class DualModeWalletAdapter:

View File

@@ -0,0 +1,181 @@
"""
Common error handling utilities for AITBC CLI
Provides standardized error handling patterns and utilities for CLI commands
"""
import sys
from typing import Optional, Callable, Any
from functools import wraps
from . import error, warning, info
class CLIError(Exception):
"""Base exception for CLI errors"""
def __init__(self, message: str, exit_code: int = 1):
self.message = message
self.exit_code = exit_code
super().__init__(self.message)
class NetworkError(CLIError):
"""Network-related errors"""
def __init__(self, message: str):
super().__init__(f"Network error: {message}", exit_code=2)
class ConfigurationError(CLIError):
"""Configuration-related errors"""
def __init__(self, message: str):
super().__init__(f"Configuration error: {message}", exit_code=3)
class ValidationError(CLIError):
"""Validation errors for user input"""
def __init__(self, message: str):
super().__init__(f"Validation error: {message}", exit_code=4)
class APIError(CLIError):
"""API-related errors"""
def __init__(self, message: str, status_code: Optional[int] = None):
msg = f"API error: {message}"
if status_code:
msg += f" (HTTP {status_code})"
super().__init__(msg, exit_code=5)
def handle_cli_error(func: Callable) -> Callable:
"""
Decorator to standardize error handling in CLI commands.
Catches common exceptions and displays user-friendly error messages.
Args:
func: Function to wrap with error handling
Returns:
Wrapped function with standardized error handling
"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except CLIError as e:
error(e.message)
sys.exit(e.exit_code)
except KeyboardInterrupt:
warning("\nOperation cancelled by user")
sys.exit(130)
except Exception as e:
error(f"Unexpected error: {e}")
sys.exit(1)
return wrapper
def handle_async_cli_error(func: Callable) -> Callable:
"""
Decorator to standardize error handling in async CLI commands.
Args:
func: Async function to wrap with error handling
Returns:
Wrapped async function with standardized error handling
"""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except CLIError as e:
error(e.message)
sys.exit(e.exit_code)
except KeyboardInterrupt:
warning("\nOperation cancelled by user")
sys.exit(130)
except Exception as e:
error(f"Unexpected error: {e}")
sys.exit(1)
return wrapper
def safe_execute(
operation: Callable,
error_message: str = "Operation failed",
default_return: Any = None,
raise_on_error: bool = False
) -> Any:
"""
Safely execute an operation with standardized error handling.
Args:
operation: Function to execute
error_message: Custom error message prefix
default_return: Value to return on error (if not raising)
raise_on_error: Whether to raise exception on error
Returns:
Operation result or default_return on error
Raises:
Exception: If raise_on_error is True and operation fails
"""
try:
return operation()
except Exception as e:
if raise_on_error:
raise
error(f"{error_message}: {e}")
return default_return
def validate_required_fields(data: dict, required_fields: list) -> None:
"""
Validate that required fields are present in data dictionary.
Args:
data: Dictionary to validate
required_fields: List of required field names
Raises:
ValidationError: If any required field is missing
"""
missing_fields = [field for field in required_fields if field not in data or data[field] is None]
if missing_fields:
raise ValidationError(f"Missing required fields: {', '.join(missing_fields)}")
def validate_url(url: str) -> bool:
"""
Validate URL format.
Args:
url: URL string to validate
Returns:
True if valid, False otherwise
"""
import re
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return bool(url_pattern.match(url))
def validate_address(address: str) -> bool:
"""
Validate Ethereum address format.
Args:
address: Ethereum address string
Returns:
True if valid, False otherwise
"""
import re
# Basic Ethereum address validation (0x followed by 40 hex characters)
address_pattern = re.compile(r'^0x[a-fA-F0-9]{40}$')
return bool(address_pattern.match(address))

View File

@@ -0,0 +1,30 @@
import subprocess
import sys
from typing import List, Optional, Union, Any
from . import error, output
import logging
logger = logging.getLogger(__name__)
def run_subprocess(cmd: List[str], check: bool = True, capture_output: bool = True, shell: bool = False, **kwargs: Any) -> Optional[Union[str, subprocess.CompletedProcess]]:
"""Run a subprocess command safely with logging"""
try:
# Always use shell=False for security
result = subprocess.run(cmd, check=check, capture_output=capture_output, text=True, shell=False, **kwargs)
if capture_output:
return result.stdout.strip()
return result
except subprocess.CalledProcessError as e:
error(f"Command failed with exit code {e.returncode}")
if capture_output and getattr(e, 'stderr', None):
logger.info(e.stderr, file=sys.stderr)
if check:
sys.exit(e.returncode)
return getattr(e, 'stdout', None) if capture_output else None
except Exception as e:
error(f"Failed to execute command: {e}")
if check:
sys.exit(1)
return None

View File

@@ -0,0 +1,559 @@
"""Wallet Daemon Client for AITBC CLI
This module provides a client for interacting with the AITBC wallet daemon.
"""
import sys
import json
import base64
from typing import TYPE_CHECKING, Dict, Any, Optional, List
from pathlib import Path
from dataclasses import dataclass
from aitbc import AITBCHTTPClient, NetworkError
sys.path.insert(0, "/opt/aitbc/cli")
from utils import error, success
if TYPE_CHECKING:
from aitbc_cli.core.config import Config
@dataclass
class ChainInfo:
"""Chain information from daemon"""
chain_id: str
name: str
status: str
coordinator_url: str
created_at: str
updated_at: str
wallet_count: int
recent_activity: int
@dataclass
class WalletInfo:
"""Wallet information from daemon"""
wallet_id: str
chain_id: str
public_key: str
address: Optional[str] = None
created_at: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
@dataclass
class WalletBalance:
"""Wallet balance information"""
wallet_id: str
chain_id: str
balance: float
address: Optional[str] = None
last_updated: Optional[str] = None
@dataclass
class WalletMigrationResult:
"""Result of wallet migration between chains"""
success: bool
source_wallet: WalletInfo
target_wallet: WalletInfo
migration_timestamp: str
class WalletDaemonClient:
"""Client for interacting with AITBC wallet daemon"""
def __init__(self, config: "Config"):
self.config = config
self.base_url = config.wallet_url.rstrip('/')
self.timeout = getattr(config, 'timeout', 30)
def _get_http_client(self) -> AITBCHTTPClient:
"""Create HTTP client with appropriate settings"""
return AITBCHTTPClient(
base_url=self.base_url,
timeout=self.timeout,
headers={"Content-Type": "application/json"}
)
def is_available(self) -> bool:
"""Check if wallet daemon is available and responsive"""
try:
client = self._get_http_client()
client.get("/health")
return True
except NetworkError:
return False
except Exception:
return False
def get_status(self) -> Dict[str, Any]:
"""Get wallet daemon status information"""
try:
client = self._get_http_client()
return client.get("/health")
except NetworkError as e:
return {"status": "unavailable", "error": str(e)}
except Exception as e:
return {"status": "error", "error": str(e)}
def create_wallet(self, wallet_id: str, password: str, metadata: Optional[Dict[str, Any]] = None) -> WalletInfo:
"""Create a new wallet in the daemon"""
try:
client = self._get_http_client()
payload = {
"wallet_id": wallet_id,
"password": password,
"metadata": metadata or {}
}
data = client.post("/v1/wallets", json=payload)
return WalletInfo(
wallet_id=data["wallet_id"],
chain_id=data.get("chain_id", "default"),
public_key=data["public_key"],
address=data.get("address"),
created_at=data.get("created_at"),
metadata=data.get("metadata")
)
except NetworkError as e:
error(f"Error creating wallet: {e}")
raise
except Exception as e:
error(f"Error creating wallet: {str(e)}")
raise
def list_wallets(self) -> List[WalletInfo]:
"""List all wallets in the daemon"""
try:
client = self._get_http_client()
data = client.get("/v1/wallets")
wallets = []
# Handle both "wallets" and "items" keys for compatibility
wallet_list = data.get("wallets", data.get("items", []))
for wallet_data in wallet_list:
wallets.append(WalletInfo(
wallet_id=wallet_data.get("wallet_id", wallet_data.get("wallet_name", "")),
chain_id=wallet_data.get("chain_id", "default"),
public_key=wallet_data.get("public_key", ""),
address=wallet_data.get("address", ""),
created_at=wallet_data.get("created_at", ""),
metadata=wallet_data.get("metadata", {})
))
return wallets
except NetworkError as e:
error(f"Failed to list daemon wallets: {str(e)}")
raise
except Exception as e:
error(f"Error listing wallets: {str(e)}")
raise
def get_wallet_info(self, wallet_id: str) -> Optional[WalletInfo]:
"""Get information about a specific wallet"""
try:
client = self._get_http_client()
data = client.get(f"/v1/wallets/{wallet_id}")
return WalletInfo(
wallet_id=data["wallet_id"],
chain_id=data.get("chain_id", "default"),
public_key=data["public_key"],
address=data.get("address"),
created_at=data.get("created_at"),
metadata=data.get("metadata")
)
except NetworkError as e:
error(f"Failed to get wallet info: {e}")
return None
except Exception as e:
error(f"Error getting wallet info: {str(e)}")
return None
def get_wallet_balance(self, wallet_id: str) -> Optional[WalletBalance]:
"""Get wallet balance from daemon"""
try:
client = self._get_http_client()
data = client.get(f"/v1/wallets/{wallet_id}/balance")
return WalletBalance(
wallet_id=wallet_id,
chain_id=data.get("chain_id", "default"),
balance=data["balance"],
address=data.get("address"),
last_updated=data.get("last_updated")
)
except NetworkError as e:
error(f"Failed to get wallet balance: {e}")
return None
except Exception as e:
error(f"Error getting wallet balance: {str(e)}")
return None
def sign_message(self, wallet_id: str, password: str, message: bytes) -> str:
"""Sign a message with wallet private key"""
try:
with self._get_http_client() as client:
# Encode message as base64 for transmission
message_b64 = base64.b64encode(message).decode()
payload = {
"password": password,
"message": message_b64
}
response = client.post(f"/v1/wallets/{wallet_id}/sign", json=payload)
if response.status_code == 200:
data = response.json()
return data["signature_base64"]
else:
error(f"Failed to sign message: {response.text}")
raise Exception(f"HTTP {response.status_code}: {response.text}")
except Exception as e:
error(f"Error signing message: {str(e)}")
raise
def send_transaction(self, wallet_id: str, password: str, to_address: str, amount: float,
description: Optional[str] = None) -> Dict[str, Any]:
"""Send a transaction via the daemon"""
try:
with self._get_http_client() as client:
payload = {
"password": password,
"to_address": to_address,
"amount": amount,
"description": description or ""
}
response = client.post(f"/v1/wallets/{wallet_id}/send", json=payload)
if response.status_code == 201:
return response.json()
else:
error(f"Failed to send transaction: {response.text}")
raise Exception(f"HTTP {response.status_code}: {response.text}")
except Exception as e:
error(f"Error sending transaction: {str(e)}")
raise
def unlock_wallet(self, wallet_id: str, password: str) -> bool:
"""Unlock a wallet for operations"""
try:
with self._get_http_client() as client:
payload = {"password": password}
response = client.post(f"/v1/wallets/{wallet_id}/unlock", json=payload)
return response.status_code == 200
except Exception:
return False
def lock_wallet(self, wallet_id: str) -> bool:
"""Lock a wallet"""
try:
with self._get_http_client() as client:
response = client.post(f"/v1/wallets/{wallet_id}/lock")
return response.status_code == 200
except Exception:
return False
def delete_wallet(self, wallet_id: str, password: str) -> bool:
"""Delete a wallet from daemon"""
try:
with self._get_http_client() as client:
payload = {"password": password}
response = client.delete(f"/v1/wallets/{wallet_id}", json=payload)
return response.status_code == 200
except Exception:
return False
def jsonrpc_call(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Make a JSON-RPC call to the daemon"""
try:
with self._get_http_client() as client:
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": 1
}
response = client.post("/rpc", json=payload)
if response.status_code == 200:
return response.json()
else:
error(f"JSON-RPC call failed: {response.text}")
raise Exception(f"HTTP {response.status_code}: {response.text}")
except Exception as e:
error(f"Error making JSON-RPC call: {str(e)}")
raise
# Multi-Chain Methods
def list_chains(self) -> List[ChainInfo]:
"""List all blockchain chains"""
try:
with self._get_http_client() as client:
response = client.get("/v1/chains")
if response.status_code == 200:
data = response.json()
chains = []
for chain_data in data.get("chains", []):
chains.append(ChainInfo(
chain_id=chain_data["chain_id"],
name=chain_data["name"],
status=chain_data["status"],
coordinator_url=chain_data["coordinator_url"],
created_at=chain_data["created_at"],
updated_at=chain_data["updated_at"],
wallet_count=chain_data["wallet_count"],
recent_activity=chain_data["recent_activity"]
))
return chains
else:
error(f"Failed to list chains: {response.text}")
raise Exception(f"HTTP {response.status_code}: {response.text}")
except Exception as e:
error(f"Error listing chains: {str(e)}")
raise
def create_chain(self, chain_id: str, name: str, coordinator_url: str,
coordinator_api_key: str, metadata: Optional[Dict[str, Any]] = None) -> ChainInfo:
"""Create a new blockchain chain"""
try:
with self._get_http_client() as client:
payload = {
"chain_id": chain_id,
"name": name,
"coordinator_url": coordinator_url,
"coordinator_api_key": coordinator_api_key,
"metadata": metadata or {}
}
response = client.post("/v1/chains", json=payload)
if response.status_code == 201:
data = response.json()
chain_data = data["chain"]
return ChainInfo(
chain_id=chain_data["chain_id"],
name=chain_data["name"],
status=chain_data["status"],
coordinator_url=chain_data["coordinator_url"],
created_at=chain_data["created_at"],
updated_at=chain_data["updated_at"],
wallet_count=chain_data["wallet_count"],
recent_activity=chain_data["recent_activity"]
)
else:
error(f"Failed to create chain: {response.text}")
raise Exception(f"HTTP {response.status_code}: {response.text}")
except Exception as e:
error(f"Error creating chain: {str(e)}")
raise
def create_wallet(self, wallet_id: str, password: str, metadata: Optional[Dict[str, Any]] = None) -> WalletInfo:
"""Create a new wallet in the daemon"""
try:
client = self._get_http_client()
payload = {
"wallet_id": wallet_id,
"password": password,
"metadata": metadata or {}
}
data = client.post("/v1/wallets", json=payload)
return WalletInfo(
wallet_id=data["wallet_id"],
public_key=data["public_key"],
address=data.get("address"),
created_at=data.get("created_at"),
metadata=data.get("metadata")
)
except NetworkError as e:
error(f"Failed to create wallet: {e}")
raise
except Exception as e:
error(f"Error creating wallet: {str(e)}")
raise
def create_wallet_in_chain(self, chain_id: str, wallet_id: str, password: str,
metadata: Optional[Dict[str, Any]] = None) -> WalletInfo:
"""Create a wallet in a specific chain"""
try:
with self._get_http_client() as client:
payload = {
"chain_id": chain_id,
"wallet_id": wallet_id,
"password": password,
"metadata": metadata or {}
}
response = client.post(f"/v1/chains/{chain_id}/wallets", json=payload)
if response.status_code == 201:
data = response.json()
wallet_data = data["wallet"]
return WalletInfo(
wallet_id=wallet_data["wallet_id"],
chain_id=wallet_data["chain_id"],
public_key=wallet_data["public_key"],
address=wallet_data.get("address"),
created_at=wallet_data.get("created_at"),
metadata=wallet_data.get("metadata")
)
else:
error(f"Failed to create wallet in chain {chain_id}: {response.text}")
raise Exception(f"HTTP {response.status_code}: {response.text}")
except Exception as e:
error(f"Error creating wallet in chain {chain_id}: {str(e)}")
raise
def list_wallets_in_chain(self, chain_id: str) -> List[WalletInfo]:
"""List wallets in a specific chain"""
try:
with self._get_http_client() as client:
response = client.get(f"/v1/chains/{chain_id}/wallets")
if response.status_code == 200:
data = response.json()
wallets = []
for wallet_data in data.get("items", []):
wallets.append(WalletInfo(
wallet_id=wallet_data["wallet_id"],
chain_id=wallet_data["chain_id"],
public_key=wallet_data["public_key"],
address=wallet_data.get("address"),
created_at=wallet_data.get("created_at"),
metadata=wallet_data.get("metadata")
))
return wallets
else:
error(f"Failed to list wallets in chain {chain_id}: {response.text}")
raise Exception(f"HTTP {response.status_code}: {response.text}")
except Exception as e:
error(f"Error listing wallets in chain {chain_id}: {str(e)}")
raise
def get_wallet_info_in_chain(self, chain_id: str, wallet_id: str) -> Optional[WalletInfo]:
"""Get wallet information from a specific chain"""
try:
wallets = self.list_wallets_in_chain(chain_id)
for wallet in wallets:
if wallet.wallet_id == wallet_id:
return wallet
return None
except Exception as e:
error(f"Error getting wallet info from chain {chain_id}: {str(e)}")
return None
def unlock_wallet_in_chain(self, chain_id: str, wallet_id: str, password: str) -> bool:
"""Unlock a wallet in a specific chain"""
try:
with self._get_http_client() as client:
payload = {"password": password}
response = client.post(f"/v1/chains/{chain_id}/wallets/{wallet_id}/unlock", json=payload)
return response.status_code == 200
except Exception:
return False
def sign_message_in_chain(self, chain_id: str, wallet_id: str, password: str, message: bytes) -> Optional[str]:
"""Sign a message with a wallet in a specific chain"""
try:
with self._get_http_client() as client:
payload = {
"password": password,
"message_base64": base64.b64encode(message).decode()
}
response = client.post(f"/v1/chains/{chain_id}/wallets/{wallet_id}/sign", json=payload)
if response.status_code == 200:
data = response.json()
return data.get("signature_base64")
else:
return None
except Exception:
return None
def get_wallet_balance_in_chain(self, chain_id: str, wallet_id: str) -> Optional[WalletBalance]:
"""Get wallet balance in a specific chain"""
try:
# For now, return a placeholder balance
# In a real implementation, this would call the chain-specific balance endpoint
wallet_info = self.get_wallet_info_in_chain(chain_id, wallet_id)
if wallet_info:
return WalletBalance(
wallet_id=wallet_id,
chain_id=chain_id,
balance=0.0, # Placeholder
address=wallet_info.address
)
return None
except Exception as e:
error(f"Error getting wallet balance in chain {chain_id}: {str(e)}")
return None
def migrate_wallet(self, source_chain_id: str, target_chain_id: str, wallet_id: str,
password: str, new_password: Optional[str] = None) -> Optional[WalletMigrationResult]:
"""Migrate a wallet from one chain to another"""
try:
with self._get_http_client() as client:
payload = {
"source_chain_id": source_chain_id,
"target_chain_id": target_chain_id,
"wallet_id": wallet_id,
"password": password
}
if new_password:
payload["new_password"] = new_password
response = client.post("/v1/wallets/migrate", json=payload)
if response.status_code == 200:
data = response.json()
source_wallet = WalletInfo(
wallet_id=data["source_wallet"]["wallet_id"],
chain_id=data["source_wallet"]["chain_id"],
public_key=data["source_wallet"]["public_key"],
address=data["source_wallet"].get("address"),
metadata=data["source_wallet"].get("metadata")
)
target_wallet = WalletInfo(
wallet_id=data["target_wallet"]["wallet_id"],
chain_id=data["target_wallet"]["chain_id"],
public_key=data["target_wallet"]["public_key"],
address=data["target_wallet"].get("address"),
metadata=data["target_wallet"].get("metadata")
)
return WalletMigrationResult(
success=data["success"],
source_wallet=source_wallet,
target_wallet=target_wallet,
migration_timestamp=data["migration_timestamp"]
)
else:
error(f"Failed to migrate wallet: {response.text}")
return None
except Exception as e:
error(f"Error migrating wallet: {str(e)}")
return None
def get_chain_status(self) -> Dict[str, Any]:
"""Get overall chain status and statistics"""
try:
chains = self.list_chains()
active_chains = [c for c in chains if c.status == "active"]
return {
"total_chains": len(chains),
"active_chains": len(active_chains),
"total_wallets": sum(c.wallet_count for c in chains),
"chains": [
{
"chain_id": chain.chain_id,
"name": chain.name,
"status": chain.status,
"wallet_count": chain.wallet_count,
"recent_activity": chain.recent_activity
}
for chain in chains
]
}
except Exception as e:
error(f"Error getting chain status: {str(e)}")
return {"error": str(e)}

View File

@@ -16,7 +16,7 @@ sys.path.insert(0, "/opt/aitbc/cli")
from utils import error, success from utils import error, success
if TYPE_CHECKING: if TYPE_CHECKING:
from core.config import Config from aitbc_cli.core.config import Config
@dataclass @dataclass