Migrate miner app to centralized aitbc package utilities

- Consolidate aitbc imports in production_miner.py and production_miner_fixed.py
- Migrate hardcoded path in production_miner_fixed.py to use LOG_DIR constant
This commit is contained in:
aitbc
2026-04-25 07:22:13 +02:00
parent afd466de80
commit 2d61a7bfd2
2 changed files with 140 additions and 167 deletions

View File

@@ -5,14 +5,14 @@ Real GPU Miner Client for AITBC - runs on host with actual GPU
import json import json
import time import time
import httpx
import logging
import sys import sys
import subprocess import subprocess
import os import os
from datetime import datetime from datetime import datetime
from typing import Dict, Optional from typing import Dict, Optional
from aitbc import get_logger, AITBCHTTPClient, NetworkError
# Configuration # Configuration
COORDINATOR_URL = os.environ.get("COORDINATOR_URL", "http://127.0.0.1:8001") COORDINATOR_URL = os.environ.get("COORDINATOR_URL", "http://127.0.0.1:8001")
MINER_ID = os.environ.get("MINER_API_KEY", "miner_test") MINER_ID = os.environ.get("MINER_API_KEY", "miner_test")
@@ -21,6 +21,13 @@ HEARTBEAT_INTERVAL = 15
MAX_RETRIES = 10 MAX_RETRIES = 10
RETRY_DELAY = 30 RETRY_DELAY = 30
# Initialize HTTP client
coordinator_client = AITBCHTTPClient(
base_url=COORDINATOR_URL,
headers={"X-Api-Key": AUTH_TOKEN, "Content-Type": "application/json"},
timeout=30
)
# Setup logging with explicit configuration # Setup logging with explicit configuration
LOG_PATH = "/var/log/aitbc/production_miner.log" LOG_PATH = "/var/log/aitbc/production_miner.log"
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
@@ -38,7 +45,7 @@ logging.basicConfig(
logging.FileHandler(LOG_PATH) logging.FileHandler(LOG_PATH)
] ]
) )
logger = logging.getLogger(__name__) logger = get_logger(__name__)
# Force stdout to be unbuffered # Force stdout to be unbuffered
sys.stdout.reconfigure(line_buffering=True) sys.stdout.reconfigure(line_buffering=True)
@@ -109,10 +116,11 @@ def build_gpu_capabilities() -> Dict:
def measure_coordinator_latency() -> float: def measure_coordinator_latency() -> float:
start = time.time() start = time.time()
try: try:
resp = httpx.get(f"{COORDINATOR_URL}/health", timeout=3) client = AITBCHTTPClient(base_url=COORDINATOR_URL, timeout=3)
if resp.status_code == 200: resp = client.get("/health")
if resp:
return (time.time() - start) * 1000 return (time.time() - start) * 1000
except Exception: except NetworkError:
pass pass
return -1.0 return -1.0
@@ -138,16 +146,17 @@ def get_gpu_info():
def check_ollama(): def check_ollama():
"""Check if Ollama is running and has models""" """Check if Ollama is running and has models"""
try: try:
response = httpx.get("http://localhost:11434/api/tags", timeout=5) client = AITBCHTTPClient(base_url="http://localhost:11434", timeout=5)
if response.status_code == 200: response = client.get("/api/tags")
models = response.json().get('models', []) if response:
models = response.get('models', [])
model_names = [m['name'] for m in models] model_names = [m['name'] for m in models]
logger.info(f"Ollama running with models: {model_names}") logger.info(f"Ollama running with models: {model_names}")
return True, model_names return True, model_names
else: else:
logger.error("Ollama not responding") logger.error("Ollama not responding")
return False, [] return False, []
except Exception as e: except NetworkError as e:
logger.error(f"Ollama check failed: {e}") logger.error(f"Ollama check failed: {e}")
return False, [] return False, []
@@ -155,16 +164,17 @@ def wait_for_coordinator():
"""Wait for coordinator to be available""" """Wait for coordinator to be available"""
for i in range(MAX_RETRIES): for i in range(MAX_RETRIES):
try: try:
response = httpx.get(f"{COORDINATOR_URL}/health", timeout=5) client = AITBCHTTPClient(base_url=COORDINATOR_URL, timeout=5)
if response.status_code == 200: response = client.get("/health")
if response:
logger.info("Coordinator is available!") logger.info("Coordinator is available!")
return True return True
except: except NetworkError:
pass pass
logger.info(f"Waiting for coordinator... ({i+1}/{MAX_RETRIES})") logger.info(f"Waiting for coordinator... ({i+1}/{MAX_RETRIES})")
time.sleep(RETRY_DELAY) time.sleep(RETRY_DELAY)
logger.error("Coordinator not available after max retries") logger.error("Coordinator not available after max retries")
return False return False
@@ -175,29 +185,24 @@ def register_miner():
"concurrency": 1, "concurrency": 1,
"region": "localhost" "region": "localhost"
} }
headers = { headers = {
"X-Api-Key": AUTH_TOKEN, "X-Api-Key": AUTH_TOKEN,
"Content-Type": "application/json" "Content-Type": "application/json"
} }
try: try:
response = httpx.post( client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10)
f"{COORDINATOR_URL}/v1/miners/register?miner_id={MINER_ID}", response = client.post(f"/v1/miners/register?miner_id={MINER_ID}", json=register_data)
json=register_data,
headers=headers, if response:
timeout=10 logger.info(f"Successfully registered miner: {response}")
) return response.get("session_token", "demo-token")
if response.status_code == 200:
data = response.json()
logger.info(f"Successfully registered miner: {data}")
return data.get("session_token", "demo-token")
else: else:
logger.error(f"Registration failed: {response.status_code} - {response.text}") logger.error("Registration failed")
return None return None
except Exception as e: except NetworkError as e:
logger.error(f"Registration error: {e}") logger.error(f"Registration error: {e}")
return None return None
@@ -206,7 +211,7 @@ def send_heartbeat():
gpu_info = get_gpu_info() gpu_info = get_gpu_info()
arch = classify_architecture(gpu_info["name"]) if gpu_info else "unknown" arch = classify_architecture(gpu_info["name"]) if gpu_info else "unknown"
latency_ms = measure_coordinator_latency() latency_ms = measure_coordinator_latency()
if gpu_info: if gpu_info:
heartbeat_data = { heartbeat_data = {
"status": "active", "status": "active",
@@ -231,26 +236,22 @@ def send_heartbeat():
"edge_optimized": False, "edge_optimized": False,
"network_latency_ms": latency_ms, "network_latency_ms": latency_ms,
} }
headers = { headers = {
"X-Api-Key": AUTH_TOKEN, "X-Api-Key": AUTH_TOKEN,
"Content-Type": "application/json" "Content-Type": "application/json"
} }
try: try:
response = httpx.post( client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=5)
f"{COORDINATOR_URL}/v1/miners/heartbeat?miner_id={MINER_ID}", response = client.post(f"/v1/miners/heartbeat?miner_id={MINER_ID}", json=heartbeat_data)
json=heartbeat_data,
headers=headers, if response:
timeout=5
)
if response.status_code == 200:
logger.info(f"Heartbeat sent (GPU: {gpu_info['utilization'] if gpu_info else 'N/A'}%)") logger.info(f"Heartbeat sent (GPU: {gpu_info['utilization'] if gpu_info else 'N/A'}%)")
else: else:
logger.error(f"Heartbeat failed: {response.status_code} - {response.text}") logger.error("Heartbeat failed")
except Exception as e: except NetworkError as e:
logger.error(f"Heartbeat error: {e}") logger.error(f"Heartbeat error: {e}")
def execute_job(job, available_models): def execute_job(job, available_models):
@@ -278,19 +279,19 @@ def execute_job(job, available_models):
# Call Ollama API for real GPU inference # Call Ollama API for real GPU inference
logger.info(f"Running inference on GPU with model: {model}") logger.info(f"Running inference on GPU with model: {model}")
start_time = time.time() start_time = time.time()
ollama_response = httpx.post( ollama_client = AITBCHTTPClient(base_url="http://localhost:11434", timeout=60)
"http://localhost:11434/api/generate", ollama_response = ollama_client.post(
"/api/generate",
json={ json={
"model": model, "model": model,
"prompt": prompt, "prompt": prompt,
"stream": False "stream": False
}, }
timeout=60
) )
if ollama_response.status_code == 200: if ollama_response:
result = ollama_response.json() result = ollama_response
output = result.get('response', '') output = result.get('response', '')
execution_time = time.time() - start_time execution_time = time.time() - start_time
@@ -317,11 +318,11 @@ def execute_job(job, available_models):
logger.info(f"Job {job_id} completed in {execution_time:.2f}s") logger.info(f"Job {job_id} completed in {execution_time:.2f}s")
return True return True
else: else:
logger.error(f"Ollama error: {ollama_response.status_code}") logger.error("Ollama error")
submit_result(job_id, { submit_result(job_id, {
"result": { "result": {
"status": "failed", "status": "failed",
"error": f"Ollama error: {ollama_response.text}" "error": "Ollama error"
} }
}) })
return False return False
@@ -352,21 +353,17 @@ def submit_result(job_id, result):
"X-Api-Key": AUTH_TOKEN, "X-Api-Key": AUTH_TOKEN,
"Content-Type": "application/json" "Content-Type": "application/json"
} }
try: try:
response = httpx.post( client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10)
f"{COORDINATOR_URL}/v1/miners/{job_id}/result", response = client.post(f"/v1/miners/{job_id}/result", json=result)
json=result,
headers=headers, if response:
timeout=10
)
if response.status_code == 200:
logger.info(f"Result submitted for job {job_id}") logger.info(f"Result submitted for job {job_id}")
else: else:
logger.error(f"Result submission failed: {response.status_code} - {response.text}") logger.error("Result submission failed")
except Exception as e: except NetworkError as e:
logger.error(f"Result submission error: {e}") logger.error(f"Result submission error: {e}")
def poll_for_jobs(): def poll_for_jobs():
@@ -374,31 +371,24 @@ def poll_for_jobs():
poll_data = { poll_data = {
"max_wait_seconds": 5 "max_wait_seconds": 5
} }
headers = { headers = {
"X-Api-Key": AUTH_TOKEN, "X-Api-Key": AUTH_TOKEN,
"Content-Type": "application/json" "Content-Type": "application/json"
} }
try: try:
response = httpx.post( client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10)
f"{COORDINATOR_URL}/v1/miners/poll", response = client.post("/v1/miners/poll", json=poll_data)
json=poll_data,
headers=headers, if response:
timeout=10 job = response
)
if response.status_code == 200:
job = response.json()
logger.info(f"Received job: {job}") logger.info(f"Received job: {job}")
return job return job
elif response.status_code == 204:
return None
else: else:
logger.error(f"Poll failed: {response.status_code} - {response.text}")
return None return None
except Exception as e: except NetworkError as e:
logger.error(f"Error polling for jobs: {e}") logger.error(f"Error polling for jobs: {e}")
return None return None

View File

@@ -5,14 +5,14 @@ Real GPU Miner Client for AITBC - runs on host with actual GPU
import json import json
import time import time
import httpx
import logging
import sys import sys
import subprocess import subprocess
import os import os
from datetime import datetime from datetime import datetime
from typing import Dict, Optional from typing import Dict, Optional
from aitbc import get_logger, AITBCHTTPClient, NetworkError, LOG_DIR
# Configuration # Configuration
COORDINATOR_URL = os.environ.get("COORDINATOR_URL", "http://127.0.0.1:8001") COORDINATOR_URL = os.environ.get("COORDINATOR_URL", "http://127.0.0.1:8001")
MINER_ID = os.environ.get("MINER_API_KEY", "miner_test") MINER_ID = os.environ.get("MINER_API_KEY", "miner_test")
@@ -22,7 +22,7 @@ MAX_RETRIES = 10
RETRY_DELAY = 30 RETRY_DELAY = 30
# Setup logging with explicit configuration # Setup logging with explicit configuration
LOG_PATH = "/var/log/aitbc/host_gpu_miner.log" LOG_PATH = str(LOG_DIR / "host_gpu_miner.log")
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
class FlushHandler(logging.StreamHandler): class FlushHandler(logging.StreamHandler):
@@ -38,7 +38,7 @@ logging.basicConfig(
logging.FileHandler(LOG_PATH) logging.FileHandler(LOG_PATH)
] ]
) )
logger = logging.getLogger(__name__) logger = get_logger(__name__)
# Force stdout to be unbuffered # Force stdout to be unbuffered
sys.stdout.reconfigure(line_buffering=True) sys.stdout.reconfigure(line_buffering=True)
@@ -109,10 +109,11 @@ def build_gpu_capabilities() -> Dict:
def measure_coordinator_latency() -> float: def measure_coordinator_latency() -> float:
start = time.time() start = time.time()
try: try:
resp = httpx.get(f"{COORDINATOR_URL}/v1/health", timeout=3) client = AITBCHTTPClient(base_url=COORDINATOR_URL, timeout=3)
if resp.status_code == 200: resp = client.get("/v1/health")
if resp:
return (time.time() - start) * 1000 return (time.time() - start) * 1000
except Exception: except NetworkError:
pass pass
return -1.0 return -1.0
@@ -138,16 +139,17 @@ def get_gpu_info():
def check_ollama(): def check_ollama():
"""Check if Ollama is running and has models""" """Check if Ollama is running and has models"""
try: try:
response = httpx.get("http://localhost:11434/api/tags", timeout=5) client = AITBCHTTPClient(base_url="http://localhost:11434", timeout=5)
if response.status_code == 200: response = client.get("/api/tags")
models = response.json().get('models', []) if response:
models = response.get('models', [])
model_names = [m['name'] for m in models] model_names = [m['name'] for m in models]
logger.info(f"Ollama running with models: {model_names}") logger.info(f"Ollama running with models: {model_names}")
return True, model_names return True, model_names
else: else:
logger.error("Ollama not responding") logger.error("Ollama not responding")
return False, [] return False, []
except Exception as e: except NetworkError as e:
logger.error(f"Ollama check failed: {e}") logger.error(f"Ollama check failed: {e}")
return False, [] return False, []
@@ -155,16 +157,17 @@ def wait_for_coordinator():
"""Wait for coordinator to be available""" """Wait for coordinator to be available"""
for i in range(MAX_RETRIES): for i in range(MAX_RETRIES):
try: try:
response = httpx.get(f"{COORDINATOR_URL}/v1/health", timeout=5) client = AITBCHTTPClient(base_url=COORDINATOR_URL, timeout=5)
if response.status_code == 200: response = client.get("/v1/health")
if response:
logger.info("Coordinator is available!") logger.info("Coordinator is available!")
return True return True
except: except NetworkError:
pass pass
logger.info(f"Waiting for coordinator... ({i+1}/{MAX_RETRIES})") logger.info(f"Waiting for coordinator... ({i+1}/{MAX_RETRIES})")
time.sleep(RETRY_DELAY) time.sleep(RETRY_DELAY)
logger.error("Coordinator not available after max retries") logger.error("Coordinator not available after max retries")
return False return False
@@ -175,29 +178,24 @@ def register_miner():
"concurrency": 1, "concurrency": 1,
"region": "localhost" "region": "localhost"
} }
headers = { headers = {
"X-Api-Key": AUTH_TOKEN, "X-Api-Key": AUTH_TOKEN,
"Content-Type": "application/json" "Content-Type": "application/json"
} }
try: try:
response = httpx.post( client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10)
f"{COORDINATOR_URL}/v1/miners/register?miner_id={MINER_ID}", response = client.post(f"/v1/miners/register?miner_id={MINER_ID}", json=register_data)
json=register_data,
headers=headers, if response:
timeout=10 logger.info(f"Successfully registered miner: {response}")
) return response.get("session_token", "demo-token")
if response.status_code == 200:
data = response.json()
logger.info(f"Successfully registered miner: {data}")
return data.get("session_token", "demo-token")
else: else:
logger.error(f"Registration failed: {response.status_code} - {response.text}") logger.error("Registration failed")
return None return None
except Exception as e: except NetworkError as e:
logger.error(f"Registration error: {e}") logger.error(f"Registration error: {e}")
return None return None
@@ -206,7 +204,7 @@ def send_heartbeat():
gpu_info = get_gpu_info() gpu_info = get_gpu_info()
arch = classify_architecture(gpu_info["name"]) if gpu_info else "unknown" arch = classify_architecture(gpu_info["name"]) if gpu_info else "unknown"
latency_ms = measure_coordinator_latency() latency_ms = measure_coordinator_latency()
if gpu_info: if gpu_info:
heartbeat_data = { heartbeat_data = {
"status": "active", "status": "active",
@@ -231,26 +229,22 @@ def send_heartbeat():
"edge_optimized": False, "edge_optimized": False,
"network_latency_ms": latency_ms, "network_latency_ms": latency_ms,
} }
headers = { headers = {
"X-Api-Key": AUTH_TOKEN, "X-Api-Key": AUTH_TOKEN,
"Content-Type": "application/json" "Content-Type": "application/json"
} }
try: try:
response = httpx.post( client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=5)
f"{COORDINATOR_URL}/v1/miners/heartbeat?miner_id={MINER_ID}", response = client.post(f"/v1/miners/heartbeat?miner_id={MINER_ID}", json=heartbeat_data)
json=heartbeat_data,
headers=headers, if response:
timeout=5
)
if response.status_code == 200:
logger.info(f"Heartbeat sent (GPU: {gpu_info['utilization'] if gpu_info else 'N/A'}%)") logger.info(f"Heartbeat sent (GPU: {gpu_info['utilization'] if gpu_info else 'N/A'}%)")
else: else:
logger.error(f"Heartbeat failed: {response.status_code} - {response.text}") logger.error("Heartbeat failed")
except Exception as e: except NetworkError as e:
logger.error(f"Heartbeat error: {e}") logger.error(f"Heartbeat error: {e}")
def execute_job(job, available_models): def execute_job(job, available_models):
@@ -278,19 +272,19 @@ def execute_job(job, available_models):
# Call Ollama API for real GPU inference # Call Ollama API for real GPU inference
logger.info(f"Running inference on GPU with model: {model}") logger.info(f"Running inference on GPU with model: {model}")
start_time = time.time() start_time = time.time()
ollama_response = httpx.post( ollama_client = AITBCHTTPClient(base_url="http://localhost:11434", timeout=60)
"http://localhost:11434/api/generate", ollama_response = ollama_client.post(
"/api/generate",
json={ json={
"model": model, "model": model,
"prompt": prompt, "prompt": prompt,
"stream": False "stream": False
}, }
timeout=60
) )
if ollama_response.status_code == 200: if ollama_response:
result = ollama_response.json() result = ollama_response
output = result.get('response', '') output = result.get('response', '')
execution_time = time.time() - start_time execution_time = time.time() - start_time
@@ -317,11 +311,11 @@ def execute_job(job, available_models):
logger.info(f"Job {job_id} completed in {execution_time:.2f}s") logger.info(f"Job {job_id} completed in {execution_time:.2f}s")
return True return True
else: else:
logger.error(f"Ollama error: {ollama_response.status_code}") logger.error("Ollama error")
submit_result(job_id, { submit_result(job_id, {
"result": { "result": {
"status": "failed", "status": "failed",
"error": f"Ollama error: {ollama_response.text}" "error": "Ollama error"
} }
}) })
return False return False
@@ -352,21 +346,17 @@ def submit_result(job_id, result):
"X-Api-Key": AUTH_TOKEN, "X-Api-Key": AUTH_TOKEN,
"Content-Type": "application/json" "Content-Type": "application/json"
} }
try: try:
response = httpx.post( client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10)
f"{COORDINATOR_URL}/v1/miners/{job_id}/result", response = client.post(f"/v1/miners/{job_id}/result", json=result)
json=result,
headers=headers, if response:
timeout=10
)
if response.status_code == 200:
logger.info(f"Result submitted for job {job_id}") logger.info(f"Result submitted for job {job_id}")
else: else:
logger.error(f"Result submission failed: {response.status_code} - {response.text}") logger.error("Result submission failed")
except Exception as e: except NetworkError as e:
logger.error(f"Result submission error: {e}") logger.error(f"Result submission error: {e}")
def poll_for_jobs(): def poll_for_jobs():
@@ -374,31 +364,24 @@ def poll_for_jobs():
poll_data = { poll_data = {
"max_wait_seconds": 5 "max_wait_seconds": 5
} }
headers = { headers = {
"X-Api-Key": AUTH_TOKEN, "X-Api-Key": AUTH_TOKEN,
"Content-Type": "application/json" "Content-Type": "application/json"
} }
try: try:
response = httpx.post( client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10)
f"{COORDINATOR_URL}/v1/miners/poll", response = client.post("/v1/miners/poll", json=poll_data)
json=poll_data,
headers=headers, if response:
timeout=10 job = response
)
if response.status_code == 200:
job = response.json()
logger.info(f"Received job: {job}") logger.info(f"Received job: {job}")
return job return job
elif response.status_code == 204:
return None
else: else:
logger.error(f"Poll failed: {response.status_code} - {response.text}")
return None return None
except Exception as e: except NetworkError as e:
logger.error(f"Error polling for jobs: {e}") logger.error(f"Error polling for jobs: {e}")
return None return None