diff --git a/apps/miner/production_miner.py b/apps/miner/production_miner.py index 43f08bfc..430d5899 100755 --- a/apps/miner/production_miner.py +++ b/apps/miner/production_miner.py @@ -5,14 +5,14 @@ Real GPU Miner Client for AITBC - runs on host with actual GPU import json import time -import httpx -import logging import sys import subprocess import os from datetime import datetime from typing import Dict, Optional +from aitbc import get_logger, AITBCHTTPClient, NetworkError + # Configuration COORDINATOR_URL = os.environ.get("COORDINATOR_URL", "http://127.0.0.1:8001") MINER_ID = os.environ.get("MINER_API_KEY", "miner_test") @@ -21,6 +21,13 @@ HEARTBEAT_INTERVAL = 15 MAX_RETRIES = 10 RETRY_DELAY = 30 +# Initialize HTTP client +coordinator_client = AITBCHTTPClient( + base_url=COORDINATOR_URL, + headers={"X-Api-Key": AUTH_TOKEN, "Content-Type": "application/json"}, + timeout=30 +) + # Setup logging with explicit configuration LOG_PATH = "/var/log/aitbc/production_miner.log" os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) @@ -38,7 +45,7 @@ logging.basicConfig( logging.FileHandler(LOG_PATH) ] ) -logger = logging.getLogger(__name__) +logger = get_logger(__name__) # Force stdout to be unbuffered sys.stdout.reconfigure(line_buffering=True) @@ -109,10 +116,11 @@ def build_gpu_capabilities() -> Dict: def measure_coordinator_latency() -> float: start = time.time() try: - resp = httpx.get(f"{COORDINATOR_URL}/health", timeout=3) - if resp.status_code == 200: + client = AITBCHTTPClient(base_url=COORDINATOR_URL, timeout=3) + resp = client.get("/health") + if resp: return (time.time() - start) * 1000 - except Exception: + except NetworkError: pass return -1.0 @@ -138,16 +146,17 @@ def get_gpu_info(): def check_ollama(): """Check if Ollama is running and has models""" try: - response = httpx.get("http://localhost:11434/api/tags", timeout=5) - if response.status_code == 200: - models = response.json().get('models', []) + client = AITBCHTTPClient(base_url="http://localhost:11434", timeout=5) + response = client.get("/api/tags") + if response: + models = response.get('models', []) model_names = [m['name'] for m in models] logger.info(f"Ollama running with models: {model_names}") return True, model_names else: logger.error("Ollama not responding") return False, [] - except Exception as e: + except NetworkError as e: logger.error(f"Ollama check failed: {e}") return False, [] @@ -155,16 +164,17 @@ def wait_for_coordinator(): """Wait for coordinator to be available""" for i in range(MAX_RETRIES): try: - response = httpx.get(f"{COORDINATOR_URL}/health", timeout=5) - if response.status_code == 200: + client = AITBCHTTPClient(base_url=COORDINATOR_URL, timeout=5) + response = client.get("/health") + if response: logger.info("Coordinator is available!") return True - except: + except NetworkError: pass - + logger.info(f"Waiting for coordinator... ({i+1}/{MAX_RETRIES})") time.sleep(RETRY_DELAY) - + logger.error("Coordinator not available after max retries") return False @@ -175,29 +185,24 @@ def register_miner(): "concurrency": 1, "region": "localhost" } - + headers = { "X-Api-Key": AUTH_TOKEN, "Content-Type": "application/json" } - + try: - response = httpx.post( - f"{COORDINATOR_URL}/v1/miners/register?miner_id={MINER_ID}", - json=register_data, - headers=headers, - timeout=10 - ) - - if response.status_code == 200: - data = response.json() - logger.info(f"Successfully registered miner: {data}") - return data.get("session_token", "demo-token") + client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10) + response = client.post(f"/v1/miners/register?miner_id={MINER_ID}", json=register_data) + + if response: + logger.info(f"Successfully registered miner: {response}") + return response.get("session_token", "demo-token") else: - logger.error(f"Registration failed: {response.status_code} - {response.text}") + logger.error("Registration failed") return None - - except Exception as e: + + except NetworkError as e: logger.error(f"Registration error: {e}") return None @@ -206,7 +211,7 @@ def send_heartbeat(): gpu_info = get_gpu_info() arch = classify_architecture(gpu_info["name"]) if gpu_info else "unknown" latency_ms = measure_coordinator_latency() - + if gpu_info: heartbeat_data = { "status": "active", @@ -231,26 +236,22 @@ def send_heartbeat(): "edge_optimized": False, "network_latency_ms": latency_ms, } - + headers = { "X-Api-Key": AUTH_TOKEN, "Content-Type": "application/json" } - + try: - response = httpx.post( - f"{COORDINATOR_URL}/v1/miners/heartbeat?miner_id={MINER_ID}", - json=heartbeat_data, - headers=headers, - timeout=5 - ) - - if response.status_code == 200: + client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=5) + response = client.post(f"/v1/miners/heartbeat?miner_id={MINER_ID}", json=heartbeat_data) + + if response: logger.info(f"Heartbeat sent (GPU: {gpu_info['utilization'] if gpu_info else 'N/A'}%)") else: - logger.error(f"Heartbeat failed: {response.status_code} - {response.text}") - - except Exception as e: + logger.error("Heartbeat failed") + + except NetworkError as e: logger.error(f"Heartbeat error: {e}") def execute_job(job, available_models): @@ -278,19 +279,19 @@ def execute_job(job, available_models): # Call Ollama API for real GPU inference logger.info(f"Running inference on GPU with model: {model}") start_time = time.time() - - ollama_response = httpx.post( - "http://localhost:11434/api/generate", + + ollama_client = AITBCHTTPClient(base_url="http://localhost:11434", timeout=60) + ollama_response = ollama_client.post( + "/api/generate", json={ "model": model, "prompt": prompt, "stream": False - }, - timeout=60 + } ) - - if ollama_response.status_code == 200: - result = ollama_response.json() + + if ollama_response: + result = ollama_response output = result.get('response', '') execution_time = time.time() - start_time @@ -317,11 +318,11 @@ def execute_job(job, available_models): logger.info(f"Job {job_id} completed in {execution_time:.2f}s") return True else: - logger.error(f"Ollama error: {ollama_response.status_code}") + logger.error("Ollama error") submit_result(job_id, { "result": { "status": "failed", - "error": f"Ollama error: {ollama_response.text}" + "error": "Ollama error" } }) return False @@ -352,21 +353,17 @@ def submit_result(job_id, result): "X-Api-Key": AUTH_TOKEN, "Content-Type": "application/json" } - + try: - response = httpx.post( - f"{COORDINATOR_URL}/v1/miners/{job_id}/result", - json=result, - headers=headers, - timeout=10 - ) - - if response.status_code == 200: + client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10) + response = client.post(f"/v1/miners/{job_id}/result", json=result) + + if response: logger.info(f"Result submitted for job {job_id}") else: - logger.error(f"Result submission failed: {response.status_code} - {response.text}") - - except Exception as e: + logger.error("Result submission failed") + + except NetworkError as e: logger.error(f"Result submission error: {e}") def poll_for_jobs(): @@ -374,31 +371,24 @@ def poll_for_jobs(): poll_data = { "max_wait_seconds": 5 } - + headers = { "X-Api-Key": AUTH_TOKEN, "Content-Type": "application/json" } - + try: - response = httpx.post( - f"{COORDINATOR_URL}/v1/miners/poll", - json=poll_data, - headers=headers, - timeout=10 - ) - - if response.status_code == 200: - job = response.json() + client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10) + response = client.post("/v1/miners/poll", json=poll_data) + + if response: + job = response logger.info(f"Received job: {job}") return job - elif response.status_code == 204: - return None else: - logger.error(f"Poll failed: {response.status_code} - {response.text}") return None - - except Exception as e: + + except NetworkError as e: logger.error(f"Error polling for jobs: {e}") return None diff --git a/apps/miner/production_miner_fixed.py b/apps/miner/production_miner_fixed.py index e3baf4cd..4d55db8c 100755 --- a/apps/miner/production_miner_fixed.py +++ b/apps/miner/production_miner_fixed.py @@ -5,14 +5,14 @@ Real GPU Miner Client for AITBC - runs on host with actual GPU import json import time -import httpx -import logging import sys import subprocess import os from datetime import datetime from typing import Dict, Optional +from aitbc import get_logger, AITBCHTTPClient, NetworkError, LOG_DIR + # Configuration COORDINATOR_URL = os.environ.get("COORDINATOR_URL", "http://127.0.0.1:8001") MINER_ID = os.environ.get("MINER_API_KEY", "miner_test") @@ -22,7 +22,7 @@ MAX_RETRIES = 10 RETRY_DELAY = 30 # Setup logging with explicit configuration -LOG_PATH = "/var/log/aitbc/host_gpu_miner.log" +LOG_PATH = str(LOG_DIR / "host_gpu_miner.log") os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) class FlushHandler(logging.StreamHandler): @@ -38,7 +38,7 @@ logging.basicConfig( logging.FileHandler(LOG_PATH) ] ) -logger = logging.getLogger(__name__) +logger = get_logger(__name__) # Force stdout to be unbuffered sys.stdout.reconfigure(line_buffering=True) @@ -109,10 +109,11 @@ def build_gpu_capabilities() -> Dict: def measure_coordinator_latency() -> float: start = time.time() try: - resp = httpx.get(f"{COORDINATOR_URL}/v1/health", timeout=3) - if resp.status_code == 200: + client = AITBCHTTPClient(base_url=COORDINATOR_URL, timeout=3) + resp = client.get("/v1/health") + if resp: return (time.time() - start) * 1000 - except Exception: + except NetworkError: pass return -1.0 @@ -138,16 +139,17 @@ def get_gpu_info(): def check_ollama(): """Check if Ollama is running and has models""" try: - response = httpx.get("http://localhost:11434/api/tags", timeout=5) - if response.status_code == 200: - models = response.json().get('models', []) + client = AITBCHTTPClient(base_url="http://localhost:11434", timeout=5) + response = client.get("/api/tags") + if response: + models = response.get('models', []) model_names = [m['name'] for m in models] logger.info(f"Ollama running with models: {model_names}") return True, model_names else: logger.error("Ollama not responding") return False, [] - except Exception as e: + except NetworkError as e: logger.error(f"Ollama check failed: {e}") return False, [] @@ -155,16 +157,17 @@ def wait_for_coordinator(): """Wait for coordinator to be available""" for i in range(MAX_RETRIES): try: - response = httpx.get(f"{COORDINATOR_URL}/v1/health", timeout=5) - if response.status_code == 200: + client = AITBCHTTPClient(base_url=COORDINATOR_URL, timeout=5) + response = client.get("/v1/health") + if response: logger.info("Coordinator is available!") return True - except: + except NetworkError: pass - + logger.info(f"Waiting for coordinator... ({i+1}/{MAX_RETRIES})") time.sleep(RETRY_DELAY) - + logger.error("Coordinator not available after max retries") return False @@ -175,29 +178,24 @@ def register_miner(): "concurrency": 1, "region": "localhost" } - + headers = { "X-Api-Key": AUTH_TOKEN, "Content-Type": "application/json" } - + try: - response = httpx.post( - f"{COORDINATOR_URL}/v1/miners/register?miner_id={MINER_ID}", - json=register_data, - headers=headers, - timeout=10 - ) - - if response.status_code == 200: - data = response.json() - logger.info(f"Successfully registered miner: {data}") - return data.get("session_token", "demo-token") + client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10) + response = client.post(f"/v1/miners/register?miner_id={MINER_ID}", json=register_data) + + if response: + logger.info(f"Successfully registered miner: {response}") + return response.get("session_token", "demo-token") else: - logger.error(f"Registration failed: {response.status_code} - {response.text}") + logger.error("Registration failed") return None - - except Exception as e: + + except NetworkError as e: logger.error(f"Registration error: {e}") return None @@ -206,7 +204,7 @@ def send_heartbeat(): gpu_info = get_gpu_info() arch = classify_architecture(gpu_info["name"]) if gpu_info else "unknown" latency_ms = measure_coordinator_latency() - + if gpu_info: heartbeat_data = { "status": "active", @@ -231,26 +229,22 @@ def send_heartbeat(): "edge_optimized": False, "network_latency_ms": latency_ms, } - + headers = { "X-Api-Key": AUTH_TOKEN, "Content-Type": "application/json" } - + try: - response = httpx.post( - f"{COORDINATOR_URL}/v1/miners/heartbeat?miner_id={MINER_ID}", - json=heartbeat_data, - headers=headers, - timeout=5 - ) - - if response.status_code == 200: + client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=5) + response = client.post(f"/v1/miners/heartbeat?miner_id={MINER_ID}", json=heartbeat_data) + + if response: logger.info(f"Heartbeat sent (GPU: {gpu_info['utilization'] if gpu_info else 'N/A'}%)") else: - logger.error(f"Heartbeat failed: {response.status_code} - {response.text}") - - except Exception as e: + logger.error("Heartbeat failed") + + except NetworkError as e: logger.error(f"Heartbeat error: {e}") def execute_job(job, available_models): @@ -278,19 +272,19 @@ def execute_job(job, available_models): # Call Ollama API for real GPU inference logger.info(f"Running inference on GPU with model: {model}") start_time = time.time() - - ollama_response = httpx.post( - "http://localhost:11434/api/generate", + + ollama_client = AITBCHTTPClient(base_url="http://localhost:11434", timeout=60) + ollama_response = ollama_client.post( + "/api/generate", json={ "model": model, "prompt": prompt, "stream": False - }, - timeout=60 + } ) - - if ollama_response.status_code == 200: - result = ollama_response.json() + + if ollama_response: + result = ollama_response output = result.get('response', '') execution_time = time.time() - start_time @@ -317,11 +311,11 @@ def execute_job(job, available_models): logger.info(f"Job {job_id} completed in {execution_time:.2f}s") return True else: - logger.error(f"Ollama error: {ollama_response.status_code}") + logger.error("Ollama error") submit_result(job_id, { "result": { "status": "failed", - "error": f"Ollama error: {ollama_response.text}" + "error": "Ollama error" } }) return False @@ -352,21 +346,17 @@ def submit_result(job_id, result): "X-Api-Key": AUTH_TOKEN, "Content-Type": "application/json" } - + try: - response = httpx.post( - f"{COORDINATOR_URL}/v1/miners/{job_id}/result", - json=result, - headers=headers, - timeout=10 - ) - - if response.status_code == 200: + client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10) + response = client.post(f"/v1/miners/{job_id}/result", json=result) + + if response: logger.info(f"Result submitted for job {job_id}") else: - logger.error(f"Result submission failed: {response.status_code} - {response.text}") - - except Exception as e: + logger.error("Result submission failed") + + except NetworkError as e: logger.error(f"Result submission error: {e}") def poll_for_jobs(): @@ -374,31 +364,24 @@ def poll_for_jobs(): poll_data = { "max_wait_seconds": 5 } - + headers = { "X-Api-Key": AUTH_TOKEN, "Content-Type": "application/json" } - + try: - response = httpx.post( - f"{COORDINATOR_URL}/v1/miners/poll", - json=poll_data, - headers=headers, - timeout=10 - ) - - if response.status_code == 200: - job = response.json() + client = AITBCHTTPClient(base_url=COORDINATOR_URL, headers=headers, timeout=10) + response = client.post("/v1/miners/poll", json=poll_data) + + if response: + job = response logger.info(f"Received job: {job}") return job - elif response.status_code == 204: - return None else: - logger.error(f"Poll failed: {response.status_code} - {response.text}") return None - - except Exception as e: + + except NetworkError as e: logger.error(f"Error polling for jobs: {e}") return None