From c3e5faa62a79d4c4777af5dd1ee2b846658c21bf Mon Sep 17 00:00:00 2001 From: oib Date: Thu, 5 Mar 2026 07:05:09 +0100 Subject: [PATCH] feat: add mine-ollama, client result, offers create commands for GPU rental flow --- cli/aitbc_cli/commands/client.py | 47 ++++++++ cli/aitbc_cli/commands/marketplace.py | 44 +++++++ cli/aitbc_cli/commands/miner.py | 163 +++++++++++++++++++++++++- 3 files changed, 251 insertions(+), 3 deletions(-) diff --git a/cli/aitbc_cli/commands/client.py b/cli/aitbc_cli/commands/client.py index a0bb67d6..002c98c3 100644 --- a/cli/aitbc_cli/commands/client.py +++ b/cli/aitbc_cli/commands/client.py @@ -163,6 +163,53 @@ def cancel(ctx, job_id: str): ctx.exit(1) +@client.command() +@click.argument("job_id") +@click.option("--wait", is_flag=True, help="Wait for job to complete before showing result") +@click.option("--timeout", type=int, default=120, help="Max wait time in seconds") +@click.pass_context +def result(ctx, job_id: str, wait: bool, timeout: int): + """Retrieve the result of a completed job""" + config = ctx.obj['config'] + + start = time.time() + while True: + try: + with httpx.Client() as client: + response = client.get( + f"{config.coordinator_url}/v1/jobs/{job_id}", + headers={"X-Api-Key": config.api_key or ""} + ) + + if response.status_code == 200: + job_data = response.json() + state = job_data.get("state", "UNKNOWN") + + if state == "COMPLETED": + result_data = job_data.get("result") + if result_data: + success(f"Job {job_id} completed") + output(result_data, ctx.obj['output_format']) + else: + output({"job_id": job_id, "state": state, "result": None}, ctx.obj['output_format']) + return + elif state in ("FAILED", "EXPIRED"): + error(f"Job {job_id} {state}: {job_data.get('error', 'no details')}") + return + elif wait and (time.time() - start) < timeout: + time.sleep(3) + continue + else: + output({"job_id": job_id, "state": state, "message": "Job not yet completed"}, ctx.obj['output_format']) + return + else: + error(f"Failed to get job: {response.status_code}") + return + except Exception as e: + error(f"Network error: {e}") + return + + @client.command() @click.option("--limit", default=10, help="Number of receipts to show") @click.option("--job-id", help="Filter by job ID") diff --git a/cli/aitbc_cli/commands/marketplace.py b/cli/aitbc_cli/commands/marketplace.py index 56ab348e..49a943e3 100644 --- a/cli/aitbc_cli/commands/marketplace.py +++ b/cli/aitbc_cli/commands/marketplace.py @@ -426,6 +426,50 @@ def offers(): pass +@offers.command() +@click.option("--gpu-id", required=True, help="GPU ID to create offer for") +@click.option("--price-per-hour", type=float, required=True, help="Price per hour in AITBC") +@click.option("--min-hours", type=float, default=1, help="Minimum rental hours") +@click.option("--max-hours", type=float, default=24, help="Maximum rental hours") +@click.option("--models", help="Supported models (comma-separated, e.g. gemma3:1b,qwen2.5)") +@click.pass_context +def create(ctx, gpu_id: str, price_per_hour: float, min_hours: float, + max_hours: float, models: Optional[str]): + """Create a marketplace offer for a registered GPU""" + config = ctx.obj['config'] + + offer_data = { + "gpu_id": gpu_id, + "price_per_hour": price_per_hour, + "min_hours": min_hours, + "max_hours": max_hours, + "supported_models": models.split(",") if models else [], + "status": "open" + } + + try: + with httpx.Client() as client: + response = client.post( + f"{config.coordinator_url}/v1/marketplace/offers", + headers={ + "Content-Type": "application/json", + "X-Api-Key": config.api_key or "" + }, + json=offer_data + ) + + if response.status_code in (200, 201, 202): + result = response.json() + success(f"Offer created: {result.get('id', 'ok')}") + output(result, ctx.obj['output_format']) + else: + error(f"Failed to create offer: {response.status_code}") + if response.text: + error(response.text) + except Exception as e: + error(f"Network error: {e}") + + @offers.command() @click.option("--status", help="Filter by offer status (open, reserved, closed)") @click.option("--gpu-model", help="Filter by GPU model") diff --git a/cli/aitbc_cli/commands/miner.py b/cli/aitbc_cli/commands/miner.py index 85439554..3e227bd9 100644 --- a/cli/aitbc_cli/commands/miner.py +++ b/cli/aitbc_cli/commands/miner.py @@ -195,7 +195,7 @@ def heartbeat(ctx, miner_id: str): headers={ "X-Api-Key": config.api_key or "" }, - json={"capabilities": capabilities} + json={} ) if response.status_code in (200, 204): @@ -390,6 +390,7 @@ def _process_single_job(config, miner_id: str, worker_id: int) -> Dict[str, Any] with httpx.Client() as http_client: response = http_client.post( f"{config.coordinator_url}/v1/miners/poll", + json={"max_wait_seconds": 5}, headers={ "X-Api-Key": config.api_key or "", "X-Miner-ID": miner_id @@ -397,7 +398,9 @@ def _process_single_job(config, miner_id: str, worker_id: int) -> Dict[str, Any] timeout=30 ) - if response.status_code in (200, 204): + if response.status_code == 204: + return {"worker": worker_id, "status": "no_job"} + if response.status_code == 200: job = response.json() if job: job_id = job.get('job_id') @@ -410,7 +413,7 @@ def _process_single_job(config, miner_id: str, worker_id: int) -> Dict[str, Any] "X-Api-Key": config.api_key or "", "X-Miner-ID": miner_id }, - json={"result": f"Processed by worker {worker_id}", "success": True} + json={"result": {"output": f"Processed by worker {worker_id}"}, "metrics": {}} ) return { @@ -423,6 +426,160 @@ def _process_single_job(config, miner_id: str, worker_id: int) -> Dict[str, Any] return {"worker": worker_id, "status": "error", "error": str(e)} +def _run_ollama_inference(ollama_url: str, model: str, prompt: str) -> Dict[str, Any]: + """Run inference through local Ollama instance""" + try: + with httpx.Client(timeout=120) as client: + response = client.post( + f"{ollama_url}/api/generate", + json={ + "model": model, + "prompt": prompt, + "stream": False + } + ) + if response.status_code == 200: + data = response.json() + return { + "response": data.get("response", ""), + "model": data.get("model", model), + "total_duration": data.get("total_duration", 0), + "eval_count": data.get("eval_count", 0), + "eval_duration": data.get("eval_duration", 0), + } + else: + return {"error": f"Ollama returned {response.status_code}"} + except Exception as e: + return {"error": str(e)} + + +@miner.command(name="mine-ollama") +@click.option("--jobs", type=int, default=1, help="Number of jobs to process") +@click.option("--miner-id", default="cli-miner", help="Miner ID") +@click.option("--ollama-url", default="http://localhost:11434", help="Ollama API URL") +@click.option("--model", default="gemma3:1b", help="Ollama model to use") +@click.pass_context +def mine_ollama(ctx, jobs: int, miner_id: str, ollama_url: str, model: str): + """Mine jobs using local Ollama for GPU inference""" + config = ctx.obj['config'] + + # Verify Ollama is reachable + try: + with httpx.Client(timeout=5) as client: + resp = client.get(f"{ollama_url}/api/tags") + if resp.status_code != 200: + error(f"Cannot reach Ollama at {ollama_url}") + return + models = [m["name"] for m in resp.json().get("models", [])] + if model not in models: + error(f"Model '{model}' not found. Available: {', '.join(models)}") + return + success(f"Ollama connected: {ollama_url} | model: {model}") + except Exception as e: + error(f"Cannot connect to Ollama: {e}") + return + + processed = 0 + while processed < jobs: + try: + with httpx.Client() as client: + response = client.post( + f"{config.coordinator_url}/v1/miners/poll", + json={"max_wait_seconds": 10}, + headers={ + "X-Api-Key": config.api_key or "", + "X-Miner-ID": miner_id + }, + timeout=30 + ) + + if response.status_code == 204: + time.sleep(5) + continue + + if response.status_code != 200: + error(f"Failed to poll: {response.status_code}") + break + + job = response.json() + if not job: + time.sleep(5) + continue + + job_id = job.get('job_id') + payload = job.get('payload', {}) + prompt = payload.get('prompt', '') + job_model = payload.get('model', model) + + output({ + "job_id": job_id, + "status": "processing", + "prompt": prompt[:80] + ("..." if len(prompt) > 80 else ""), + "model": job_model, + "job_number": processed + 1 + }, ctx.obj['output_format']) + + # Run inference through Ollama + start_time = time.time() + ollama_result = _run_ollama_inference(ollama_url, job_model, prompt) + duration_ms = int((time.time() - start_time) * 1000) + + if "error" in ollama_result: + error(f"Ollama inference failed: {ollama_result['error']}") + # Submit failure + client.post( + f"{config.coordinator_url}/v1/miners/{job_id}/fail", + headers={ + "Content-Type": "application/json", + "X-Api-Key": config.api_key or "", + "X-Miner-ID": miner_id + }, + json={"error_code": "INFERENCE_FAILED", "error_message": ollama_result['error'], "metrics": {}} + ) + continue + + # Submit successful result + result_response = client.post( + f"{config.coordinator_url}/v1/miners/{job_id}/result", + headers={ + "Content-Type": "application/json", + "X-Api-Key": config.api_key or "", + "X-Miner-ID": miner_id + }, + json={ + "result": { + "response": ollama_result.get("response", ""), + "model": ollama_result.get("model", job_model), + "provider": "ollama", + "eval_count": ollama_result.get("eval_count", 0), + }, + "metrics": { + "duration_ms": duration_ms, + "eval_count": ollama_result.get("eval_count", 0), + "eval_duration": ollama_result.get("eval_duration", 0), + "total_duration": ollama_result.get("total_duration", 0), + } + } + ) + + if result_response.status_code == 200: + success(f"Job {job_id} completed via Ollama ({duration_ms}ms)") + processed += 1 + else: + error(f"Failed to submit result: {result_response.status_code}") + + except Exception as e: + error(f"Error: {e}") + break + + output({ + "total_processed": processed, + "miner_id": miner_id, + "model": model, + "provider": "ollama" + }, ctx.obj['output_format']) + + @miner.command(name="concurrent-mine") @click.option("--workers", type=int, default=2, help="Number of concurrent workers") @click.option("--jobs", "total_jobs", type=int, default=5, help="Total jobs to process")