Files
aitbc/cli/miner_management.py
aitbc f646bd7ed4
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Successful in 37s
Integration Tests / test-service-integration (push) Successful in 57s
Python Tests / test-python (push) Failing after 4m15s
CLI Tests / test-cli (push) Failing after 6m48s
Security Scanning / security-scan (push) Successful in 2m16s
fix: add fixed marketplace offers endpoint to avoid AttributeError
Marketplace Offers Router Enhancement:
 NEW ENDPOINT: GET /offers for listing all marketplace offers
- Added fixed version to avoid AttributeError from GlobalMarketplaceService
- Uses direct database query with SQLModel select
- Safely extracts offer attributes with fallback defaults
- Returns structured offer data with GPU specs and metadata

 ENDPOINT FEATURES:
🔧 Direct Query: Bypasses service layer to avoid attribute
2026-03-30 22:34:05 +02:00

506 lines
16 KiB
Python

#!/usr/bin/env python3
"""
AITBC Miner Management Module
Complete command-line interface for AI compute miner operations including:
- Miner Registration
- Status Management
- Job Polling & Execution
- Marketplace Integration
- Payment Management
"""
import json
import time
import requests
from typing import Optional, Dict, Any
# Default configuration
DEFAULT_COORDINATOR_URL = "http://localhost:8000"
DEFAULT_API_KEY = "miner_prod_key_use_real_value"
def register_miner(
miner_id: str,
wallet: str,
api_key: str = DEFAULT_API_KEY,
coordinator_url: str = DEFAULT_COORDINATOR_URL,
capabilities: Optional[str] = None,
gpu_memory: Optional[int] = None,
models: Optional[list] = None,
pricing: Optional[float] = None,
concurrency: int = 1,
region: Optional[str] = None
) -> Optional[Dict]:
"""Register miner as AI compute provider"""
try:
headers = {
"X-Api-Key": api_key,
"X-Miner-ID": miner_id,
"Content-Type": "application/json"
}
# Build capabilities from arguments
caps = {}
if gpu_memory:
caps["gpu_memory"] = gpu_memory
caps["gpu_memory_gb"] = gpu_memory
if models:
caps["models"] = models
caps["supported_models"] = models
if pricing:
caps["pricing_per_hour"] = pricing
caps["price_per_hour"] = pricing
caps["gpu"] = "AI-GPU"
caps["gpu_count"] = 1
caps["cuda_version"] = "12.0"
# Override with capabilities JSON if provided
if capabilities:
caps.update(json.loads(capabilities))
payload = {
"wallet_address": wallet,
"capabilities": caps,
"concurrency": concurrency,
"region": region
}
response = requests.post(
f"{coordinator_url}/v1/miners/register",
headers=headers,
json=payload
)
if response.status_code == 200:
result = response.json()
return {
"action": "register",
"miner_id": miner_id,
"status": "✅ Registered successfully",
"session_token": result.get("session_token"),
"coordinator_url": coordinator_url,
"capabilities": caps
}
else:
return {
"action": "register",
"status": "❌ Registration failed",
"error": response.text,
"status_code": response.status_code
}
except Exception as e:
return {"action": "register", "status": f"❌ Error: {str(e)}"}
def get_miner_status(
miner_id: str,
api_key: str = DEFAULT_API_KEY,
coordinator_url: str = DEFAULT_COORDINATOR_URL
) -> Optional[Dict]:
"""Get miner status and statistics"""
try:
# Use admin API key to get miner status
admin_api_key = api_key.replace("miner_", "admin_")
headers = {"X-Api-Key": admin_api_key}
response = requests.get(
f"{coordinator_url}/v1/admin/miners",
headers=headers
)
if response.status_code == 200:
miners = response.json().get("items", [])
miner_info = next((m for m in miners if m["miner_id"] == miner_id), None)
if miner_info:
return {
"action": "status",
"miner_id": miner_id,
"status": f"{miner_info['status']}",
"inflight": miner_info["inflight"],
"concurrency": miner_info["concurrency"],
"region": miner_info["region"],
"last_heartbeat": miner_info["last_heartbeat"],
"jobs_completed": miner_info["jobs_completed"],
"jobs_failed": miner_info["jobs_failed"],
"average_job_duration_ms": miner_info["average_job_duration_ms"],
"success_rate": (
miner_info["jobs_completed"] /
max(1, miner_info["jobs_completed"] + miner_info["jobs_failed"]) * 100
)
}
else:
return {
"action": "status",
"miner_id": miner_id,
"status": "❌ Miner not found"
}
else:
return {"action": "status", "status": "❌ Failed to get status", "error": response.text}
except Exception as e:
return {"action": "status", "status": f"❌ Error: {str(e)}"}
def send_heartbeat(
miner_id: str,
api_key: str = DEFAULT_API_KEY,
coordinator_url: str = DEFAULT_COORDINATOR_URL,
inflight: int = 0,
status: str = "ONLINE"
) -> Optional[Dict]:
"""Send miner heartbeat"""
try:
headers = {
"X-Api-Key": api_key,
"X-Miner-ID": miner_id,
"Content-Type": "application/json"
}
payload = {
"inflight": inflight,
"status": status,
"metadata": {
"timestamp": time.time(),
"version": "1.0.0",
"system_info": "AI Compute Miner"
}
}
response = requests.post(
f"{coordinator_url}/v1/miners/heartbeat",
headers=headers,
json=payload
)
if response.status_code == 200:
return {
"action": "heartbeat",
"miner_id": miner_id,
"status": "✅ Heartbeat sent successfully",
"inflight": inflight,
"miner_status": status
}
else:
return {"action": "heartbeat", "status": "❌ Heartbeat failed", "error": response.text}
except Exception as e:
return {"action": "heartbeat", "status": f"❌ Error: {str(e)}"}
def poll_jobs(
miner_id: str,
api_key: str = DEFAULT_API_KEY,
coordinator_url: str = DEFAULT_COORDINATOR_URL,
max_wait: int = 30,
auto_execute: bool = False
) -> Optional[Dict]:
"""Poll for available jobs"""
try:
headers = {
"X-Api-Key": api_key,
"X-Miner-ID": miner_id,
"Content-Type": "application/json"
}
payload = {"max_wait_seconds": max_wait}
response = requests.post(
f"{coordinator_url}/v1/miners/poll",
headers=headers,
json=payload
)
if response.status_code == 200 and response.content:
job = response.json()
result = {
"action": "poll",
"miner_id": miner_id,
"status": "✅ Job assigned",
"job_id": job.get("job_id"),
"payload": job.get("payload"),
"constraints": job.get("constraints"),
"assigned_at": time.strftime("%Y-%m-%d %H:%M:%S")
}
if auto_execute:
result["auto_execution"] = "🤖 Job execution would start here"
result["execution_status"] = "Ready to execute"
return result
elif response.status_code == 204:
return {
"action": "poll",
"miner_id": miner_id,
"status": "⏸️ No jobs available",
"message": "No jobs in queue"
}
else:
return {"action": "poll", "status": "❌ Poll failed", "error": response.text}
except Exception as e:
return {"action": "poll", "status": f"❌ Error: {str(e)}"}
def submit_job_result(
job_id: str,
miner_id: str,
result: str,
api_key: str = DEFAULT_API_KEY,
coordinator_url: str = DEFAULT_COORDINATOR_URL,
success: bool = True,
duration: Optional[int] = None,
result_file: Optional[str] = None
) -> Optional[Dict]:
"""Submit job result"""
try:
headers = {
"X-Api-Key": api_key,
"X-Miner-ID": miner_id,
"Content-Type": "application/json"
}
# Load result from file if specified
if result_file:
with open(result_file, 'r') as f:
result = f.read()
payload = {
"result": result,
"success": success,
"metrics": {
"duration_ms": duration,
"completed_at": time.time()
}
}
response = requests.post(
f"{coordinator_url}/v1/miners/{job_id}/result",
headers=headers,
json=payload
)
if response.status_code == 200:
return {
"action": "result",
"job_id": job_id,
"miner_id": miner_id,
"status": "✅ Result submitted successfully",
"success": success,
"duration_ms": duration
}
else:
return {"action": "result", "status": "❌ Result submission failed", "error": response.text}
except Exception as e:
return {"action": "result", "status": f"❌ Error: {str(e)}"}
def update_capabilities(
miner_id: str,
api_key: str = DEFAULT_API_KEY,
coordinator_url: str = DEFAULT_COORDINATOR_URL,
capabilities: Optional[str] = None,
gpu_memory: Optional[int] = None,
models: Optional[list] = None,
pricing: Optional[float] = None,
concurrency: Optional[int] = None,
region: Optional[str] = None,
wallet: Optional[str] = None
) -> Optional[Dict]:
"""Update miner capabilities"""
try:
headers = {
"X-Api-Key": api_key,
"X-Miner-ID": miner_id,
"Content-Type": "application/json"
}
# Build capabilities from arguments
caps = {}
if gpu_memory:
caps["gpu_memory"] = gpu_memory
caps["gpu_memory_gb"] = gpu_memory
if models:
caps["models"] = models
caps["supported_models"] = models
if pricing:
caps["pricing_per_hour"] = pricing
caps["price_per_hour"] = pricing
# Override with capabilities JSON if provided
if capabilities:
caps.update(json.loads(capabilities))
payload = {
"capabilities": caps,
"concurrency": concurrency,
"region": region
}
if wallet:
payload["wallet_address"] = wallet
response = requests.put(
f"{coordinator_url}/v1/miners/{miner_id}/capabilities",
headers=headers,
json=payload
)
if response.status_code == 200:
return {
"action": "update",
"miner_id": miner_id,
"status": "✅ Capabilities updated successfully",
"updated_capabilities": caps
}
else:
return {"action": "update", "status": "❌ Update failed", "error": response.text}
except Exception as e:
return {"action": "update", "status": f"❌ Error: {str(e)}"}
def check_earnings(
miner_id: str,
api_key: str = DEFAULT_API_KEY,
coordinator_url: str = DEFAULT_COORDINATOR_URL,
period: str = "all"
) -> Optional[Dict]:
"""Check miner earnings (placeholder for payment integration)"""
try:
# This would integrate with payment system when implemented
return {
"action": "earnings",
"miner_id": miner_id,
"period": period,
"status": "📊 Earnings calculation",
"total_earnings": 0.0,
"jobs_completed": 0,
"average_payment": 0.0,
"note": "Payment integration coming soon"
}
except Exception as e:
return {"action": "earnings", "status": f"❌ Error: {str(e)}"}
def list_marketplace_offers(
miner_id: Optional[str] = None,
region: Optional[str] = None,
api_key: str = DEFAULT_API_KEY,
coordinator_url: str = DEFAULT_COORDINATOR_URL
) -> Optional[Dict]:
"""List marketplace offers"""
try:
admin_headers = {"X-Api-Key": api_key.replace("miner_", "admin_")}
params = {}
if region:
params["region"] = region
response = requests.get(
f"{coordinator_url}/v1/marketplace/miner-offers",
headers=admin_headers,
params=params
)
if response.status_code == 200:
offers = response.json()
# Filter by miner if specified
if miner_id:
offers = [o for o in offers if miner_id in str(o).lower()]
return {
"action": "marketplace_list",
"status": "✅ Offers retrieved",
"offers": offers,
"count": len(offers),
"region_filter": region,
"miner_filter": miner_id
}
else:
return {"action": "marketplace_list", "status": "❌ Failed to get offers", "error": response.text}
except Exception as e:
return {"action": "marketplace_list", "status": f"❌ Error: {str(e)}"}
def create_marketplace_offer(
miner_id: str,
price: float,
api_key: str = DEFAULT_API_KEY,
coordinator_url: str = DEFAULT_COORDINATOR_URL,
capacity: int = 1,
region: Optional[str] = None
) -> Optional[Dict]:
"""Create marketplace offer"""
try:
admin_headers = {"X-Api-Key": api_key.replace("miner_", "admin_")}
payload = {
"miner_id": miner_id,
"price": price,
"capacity": capacity,
"region": region
}
response = requests.post(
f"{coordinator_url}/v1/marketplace/offers",
headers=admin_headers,
json=payload
)
if response.status_code == 200:
return {
"action": "marketplace_create",
"miner_id": miner_id,
"status": "✅ Offer created successfully",
"price": price,
"capacity": capacity,
"region": region
}
else:
return {"action": "marketplace_create", "status": "❌ Offer creation failed", "error": response.text}
except Exception as e:
return {"action": "marketplace_create", "status": f"❌ Error: {str(e)}"}
# Main function for CLI integration
def miner_cli_dispatcher(action: str, **kwargs) -> Optional[Dict]:
"""Main dispatcher for miner management CLI commands"""
actions = {
"register": register_miner,
"status": get_miner_status,
"heartbeat": send_heartbeat,
"poll": poll_jobs,
"result": submit_job_result,
"update": update_capabilities,
"earnings": check_earnings,
"marketplace_list": list_marketplace_offers,
"marketplace_create": create_marketplace_offer
}
if action in actions:
return actions[action](**kwargs)
else:
return {
"action": action,
"status": f"❌ Unknown action. Available: {', '.join(actions.keys())}"
}
if __name__ == "__main__":
# Test the module
print("🚀 AITBC Miner Management Module")
print("Available functions:")
for func in [register_miner, get_miner_status, send_heartbeat, poll_jobs,
submit_job_result, update_capabilities, check_earnings,
list_marketplace_offers, create_marketplace_offer]:
print(f" - {func.__name__}")