feat: add mine-ollama, client result, offers create commands for GPU rental flow
This commit is contained in:
@@ -163,6 +163,53 @@ def cancel(ctx, job_id: str):
|
||||
ctx.exit(1)
|
||||
|
||||
|
||||
@client.command()
|
||||
@click.argument("job_id")
|
||||
@click.option("--wait", is_flag=True, help="Wait for job to complete before showing result")
|
||||
@click.option("--timeout", type=int, default=120, help="Max wait time in seconds")
|
||||
@click.pass_context
|
||||
def result(ctx, job_id: str, wait: bool, timeout: int):
|
||||
"""Retrieve the result of a completed job"""
|
||||
config = ctx.obj['config']
|
||||
|
||||
start = time.time()
|
||||
while True:
|
||||
try:
|
||||
with httpx.Client() as client:
|
||||
response = client.get(
|
||||
f"{config.coordinator_url}/v1/jobs/{job_id}",
|
||||
headers={"X-Api-Key": config.api_key or ""}
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
job_data = response.json()
|
||||
state = job_data.get("state", "UNKNOWN")
|
||||
|
||||
if state == "COMPLETED":
|
||||
result_data = job_data.get("result")
|
||||
if result_data:
|
||||
success(f"Job {job_id} completed")
|
||||
output(result_data, ctx.obj['output_format'])
|
||||
else:
|
||||
output({"job_id": job_id, "state": state, "result": None}, ctx.obj['output_format'])
|
||||
return
|
||||
elif state in ("FAILED", "EXPIRED"):
|
||||
error(f"Job {job_id} {state}: {job_data.get('error', 'no details')}")
|
||||
return
|
||||
elif wait and (time.time() - start) < timeout:
|
||||
time.sleep(3)
|
||||
continue
|
||||
else:
|
||||
output({"job_id": job_id, "state": state, "message": "Job not yet completed"}, ctx.obj['output_format'])
|
||||
return
|
||||
else:
|
||||
error(f"Failed to get job: {response.status_code}")
|
||||
return
|
||||
except Exception as e:
|
||||
error(f"Network error: {e}")
|
||||
return
|
||||
|
||||
|
||||
@client.command()
|
||||
@click.option("--limit", default=10, help="Number of receipts to show")
|
||||
@click.option("--job-id", help="Filter by job ID")
|
||||
|
||||
@@ -426,6 +426,50 @@ def offers():
|
||||
pass
|
||||
|
||||
|
||||
@offers.command()
|
||||
@click.option("--gpu-id", required=True, help="GPU ID to create offer for")
|
||||
@click.option("--price-per-hour", type=float, required=True, help="Price per hour in AITBC")
|
||||
@click.option("--min-hours", type=float, default=1, help="Minimum rental hours")
|
||||
@click.option("--max-hours", type=float, default=24, help="Maximum rental hours")
|
||||
@click.option("--models", help="Supported models (comma-separated, e.g. gemma3:1b,qwen2.5)")
|
||||
@click.pass_context
|
||||
def create(ctx, gpu_id: str, price_per_hour: float, min_hours: float,
|
||||
max_hours: float, models: Optional[str]):
|
||||
"""Create a marketplace offer for a registered GPU"""
|
||||
config = ctx.obj['config']
|
||||
|
||||
offer_data = {
|
||||
"gpu_id": gpu_id,
|
||||
"price_per_hour": price_per_hour,
|
||||
"min_hours": min_hours,
|
||||
"max_hours": max_hours,
|
||||
"supported_models": models.split(",") if models else [],
|
||||
"status": "open"
|
||||
}
|
||||
|
||||
try:
|
||||
with httpx.Client() as client:
|
||||
response = client.post(
|
||||
f"{config.coordinator_url}/v1/marketplace/offers",
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"X-Api-Key": config.api_key or ""
|
||||
},
|
||||
json=offer_data
|
||||
)
|
||||
|
||||
if response.status_code in (200, 201, 202):
|
||||
result = response.json()
|
||||
success(f"Offer created: {result.get('id', 'ok')}")
|
||||
output(result, ctx.obj['output_format'])
|
||||
else:
|
||||
error(f"Failed to create offer: {response.status_code}")
|
||||
if response.text:
|
||||
error(response.text)
|
||||
except Exception as e:
|
||||
error(f"Network error: {e}")
|
||||
|
||||
|
||||
@offers.command()
|
||||
@click.option("--status", help="Filter by offer status (open, reserved, closed)")
|
||||
@click.option("--gpu-model", help="Filter by GPU model")
|
||||
|
||||
@@ -195,7 +195,7 @@ def heartbeat(ctx, miner_id: str):
|
||||
headers={
|
||||
"X-Api-Key": config.api_key or ""
|
||||
},
|
||||
json={"capabilities": capabilities}
|
||||
json={}
|
||||
)
|
||||
|
||||
if response.status_code in (200, 204):
|
||||
@@ -390,6 +390,7 @@ def _process_single_job(config, miner_id: str, worker_id: int) -> Dict[str, Any]
|
||||
with httpx.Client() as http_client:
|
||||
response = http_client.post(
|
||||
f"{config.coordinator_url}/v1/miners/poll",
|
||||
json={"max_wait_seconds": 5},
|
||||
headers={
|
||||
"X-Api-Key": config.api_key or "",
|
||||
"X-Miner-ID": miner_id
|
||||
@@ -397,7 +398,9 @@ def _process_single_job(config, miner_id: str, worker_id: int) -> Dict[str, Any]
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if response.status_code in (200, 204):
|
||||
if response.status_code == 204:
|
||||
return {"worker": worker_id, "status": "no_job"}
|
||||
if response.status_code == 200:
|
||||
job = response.json()
|
||||
if job:
|
||||
job_id = job.get('job_id')
|
||||
@@ -410,7 +413,7 @@ def _process_single_job(config, miner_id: str, worker_id: int) -> Dict[str, Any]
|
||||
"X-Api-Key": config.api_key or "",
|
||||
"X-Miner-ID": miner_id
|
||||
},
|
||||
json={"result": f"Processed by worker {worker_id}", "success": True}
|
||||
json={"result": {"output": f"Processed by worker {worker_id}"}, "metrics": {}}
|
||||
)
|
||||
|
||||
return {
|
||||
@@ -423,6 +426,160 @@ def _process_single_job(config, miner_id: str, worker_id: int) -> Dict[str, Any]
|
||||
return {"worker": worker_id, "status": "error", "error": str(e)}
|
||||
|
||||
|
||||
def _run_ollama_inference(ollama_url: str, model: str, prompt: str) -> Dict[str, Any]:
|
||||
"""Run inference through local Ollama instance"""
|
||||
try:
|
||||
with httpx.Client(timeout=120) as client:
|
||||
response = client.post(
|
||||
f"{ollama_url}/api/generate",
|
||||
json={
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"stream": False
|
||||
}
|
||||
)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
return {
|
||||
"response": data.get("response", ""),
|
||||
"model": data.get("model", model),
|
||||
"total_duration": data.get("total_duration", 0),
|
||||
"eval_count": data.get("eval_count", 0),
|
||||
"eval_duration": data.get("eval_duration", 0),
|
||||
}
|
||||
else:
|
||||
return {"error": f"Ollama returned {response.status_code}"}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
@miner.command(name="mine-ollama")
|
||||
@click.option("--jobs", type=int, default=1, help="Number of jobs to process")
|
||||
@click.option("--miner-id", default="cli-miner", help="Miner ID")
|
||||
@click.option("--ollama-url", default="http://localhost:11434", help="Ollama API URL")
|
||||
@click.option("--model", default="gemma3:1b", help="Ollama model to use")
|
||||
@click.pass_context
|
||||
def mine_ollama(ctx, jobs: int, miner_id: str, ollama_url: str, model: str):
|
||||
"""Mine jobs using local Ollama for GPU inference"""
|
||||
config = ctx.obj['config']
|
||||
|
||||
# Verify Ollama is reachable
|
||||
try:
|
||||
with httpx.Client(timeout=5) as client:
|
||||
resp = client.get(f"{ollama_url}/api/tags")
|
||||
if resp.status_code != 200:
|
||||
error(f"Cannot reach Ollama at {ollama_url}")
|
||||
return
|
||||
models = [m["name"] for m in resp.json().get("models", [])]
|
||||
if model not in models:
|
||||
error(f"Model '{model}' not found. Available: {', '.join(models)}")
|
||||
return
|
||||
success(f"Ollama connected: {ollama_url} | model: {model}")
|
||||
except Exception as e:
|
||||
error(f"Cannot connect to Ollama: {e}")
|
||||
return
|
||||
|
||||
processed = 0
|
||||
while processed < jobs:
|
||||
try:
|
||||
with httpx.Client() as client:
|
||||
response = client.post(
|
||||
f"{config.coordinator_url}/v1/miners/poll",
|
||||
json={"max_wait_seconds": 10},
|
||||
headers={
|
||||
"X-Api-Key": config.api_key or "",
|
||||
"X-Miner-ID": miner_id
|
||||
},
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if response.status_code == 204:
|
||||
time.sleep(5)
|
||||
continue
|
||||
|
||||
if response.status_code != 200:
|
||||
error(f"Failed to poll: {response.status_code}")
|
||||
break
|
||||
|
||||
job = response.json()
|
||||
if not job:
|
||||
time.sleep(5)
|
||||
continue
|
||||
|
||||
job_id = job.get('job_id')
|
||||
payload = job.get('payload', {})
|
||||
prompt = payload.get('prompt', '')
|
||||
job_model = payload.get('model', model)
|
||||
|
||||
output({
|
||||
"job_id": job_id,
|
||||
"status": "processing",
|
||||
"prompt": prompt[:80] + ("..." if len(prompt) > 80 else ""),
|
||||
"model": job_model,
|
||||
"job_number": processed + 1
|
||||
}, ctx.obj['output_format'])
|
||||
|
||||
# Run inference through Ollama
|
||||
start_time = time.time()
|
||||
ollama_result = _run_ollama_inference(ollama_url, job_model, prompt)
|
||||
duration_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
if "error" in ollama_result:
|
||||
error(f"Ollama inference failed: {ollama_result['error']}")
|
||||
# Submit failure
|
||||
client.post(
|
||||
f"{config.coordinator_url}/v1/miners/{job_id}/fail",
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"X-Api-Key": config.api_key or "",
|
||||
"X-Miner-ID": miner_id
|
||||
},
|
||||
json={"error_code": "INFERENCE_FAILED", "error_message": ollama_result['error'], "metrics": {}}
|
||||
)
|
||||
continue
|
||||
|
||||
# Submit successful result
|
||||
result_response = client.post(
|
||||
f"{config.coordinator_url}/v1/miners/{job_id}/result",
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"X-Api-Key": config.api_key or "",
|
||||
"X-Miner-ID": miner_id
|
||||
},
|
||||
json={
|
||||
"result": {
|
||||
"response": ollama_result.get("response", ""),
|
||||
"model": ollama_result.get("model", job_model),
|
||||
"provider": "ollama",
|
||||
"eval_count": ollama_result.get("eval_count", 0),
|
||||
},
|
||||
"metrics": {
|
||||
"duration_ms": duration_ms,
|
||||
"eval_count": ollama_result.get("eval_count", 0),
|
||||
"eval_duration": ollama_result.get("eval_duration", 0),
|
||||
"total_duration": ollama_result.get("total_duration", 0),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
if result_response.status_code == 200:
|
||||
success(f"Job {job_id} completed via Ollama ({duration_ms}ms)")
|
||||
processed += 1
|
||||
else:
|
||||
error(f"Failed to submit result: {result_response.status_code}")
|
||||
|
||||
except Exception as e:
|
||||
error(f"Error: {e}")
|
||||
break
|
||||
|
||||
output({
|
||||
"total_processed": processed,
|
||||
"miner_id": miner_id,
|
||||
"model": model,
|
||||
"provider": "ollama"
|
||||
}, ctx.obj['output_format'])
|
||||
|
||||
|
||||
@miner.command(name="concurrent-mine")
|
||||
@click.option("--workers", type=int, default=2, help="Number of concurrent workers")
|
||||
@click.option("--jobs", "total_jobs", type=int, default=5, help="Total jobs to process")
|
||||
|
||||
Reference in New Issue
Block a user