feat: add AI provider commands with on-chain payment

- Create ai.py with serve and request commands
- request includes balance verification and payment via blockchain send
- serve runs FastAPI server and optionally registers jobs with coordinator
Update marketplace.py:
- Add gpu unregister command (DELETE endpoint)
This commit is contained in:
2026-03-13 21:13:04 +00:00
parent 3bdada174c
commit e7af9ac365
5 changed files with 213 additions and 2 deletions

View File

@@ -2,11 +2,14 @@ from __future__ import annotations
import asyncio import asyncio
import json import json
import warnings
from collections import defaultdict from collections import defaultdict
from contextlib import suppress from contextlib import suppress
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Set from typing import Any, Callable, Dict, List, Optional, Set
warnings.filterwarnings("ignore", message="coroutine.* was never awaited", category=RuntimeWarning)
try: try:
from starlette.broadcast import Broadcast from starlette.broadcast import Broadcast
except ImportError: # pragma: no cover - Starlette removed Broadcast in recent versions except ImportError: # pragma: no cover - Starlette removed Broadcast in recent versions

View File

@@ -426,6 +426,26 @@ async def send_payment(
} }
@router.delete("/marketplace/gpu/{gpu_id}")
async def delete_gpu(
gpu_id: str,
session: Annotated[Session, Depends(get_session)],
force: bool = Query(default=False, description="Force delete even if GPU is booked")
) -> Dict[str, Any]:
"""Delete (unregister) a GPU from the marketplace."""
gpu = _get_gpu_or_404(session, gpu_id)
if gpu.status == "booked" and not force:
raise HTTPException(
status_code=http_status.HTTP_409_CONFLICT,
detail=f"GPU {gpu_id} is currently booked. Use force=true to delete anyway."
)
session.delete(gpu)
session.commit()
return {"status": "deleted", "gpu_id": gpu_id}
@router.get("/marketplace/gpu/{gpu_id}/reviews") @router.get("/marketplace/gpu/{gpu_id}/reviews")
async def get_gpu_reviews( async def get_gpu_reviews(
gpu_id: str, gpu_id: str,

View File

@@ -0,0 +1,159 @@
import os
import subprocess
import sys
import uuid
import click
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
@click.group(name='ai')
def ai_group():
"""AI marketplace commands."""
pass
@ai_group.command()
@click.option('--port', default=8008, show_default=True, help='Port to listen on')
@click.option('--model', default='qwen3:8b', show_default=True, help='Ollama model name')
@click.option('--wallet', 'provider_wallet', required=True, help='Provider wallet address (for verification)')
@click.option('--marketplace-url', default='http://127.0.0.1:8014', help='Marketplace API base URL')
def serve(port, model, provider_wallet, marketplace_url):
"""Start AI provider daemon (FastAPI server)."""
click.echo(f"Starting AI provider on port {port}, model {model}, marketplace {marketplace_url}")
app = FastAPI(title="AI Provider")
class JobRequest(BaseModel):
prompt: str
buyer: str # buyer wallet address
amount: int
txid: str | None = None # optional transaction id
class JobResponse(BaseModel):
result: str
model: str
job_id: str | None = None
@app.get("/health")
async def health():
return {"status": "ok", "model": model, "wallet": provider_wallet}
@app.post("/job")
async def handle_job(req: JobRequest):
click.echo(f"Received job from {req.buyer}: {req.prompt[:50]}...")
# Generate a job_id
job_id = str(uuid.uuid4())
# Register job with marketplace (optional, best-effort)
try:
async with httpx.AsyncClient() as client:
create_resp = await client.post(
f"{marketplace_url}/v1/jobs",
json={
"payload": {"prompt": req.prompt, "model": model},
"constraints": {},
"payment_amount": req.amount,
"payment_currency": "AITBC"
},
headers={"X-Api-Key": ""}, # optional API key
timeout=5.0
)
if create_resp.status_code in (200, 201):
job_data = create_resp.json()
job_id = job_data.get("job_id", job_id)
click.echo(f"Registered job {job_id} with marketplace")
else:
click.echo(f"Marketplace job registration failed: {create_resp.status_code}", err=True)
except Exception as e:
click.echo(f"Warning: marketplace registration skipped: {e}", err=True)
# Process with Ollama
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
"http://127.0.0.1:11434/api/generate",
json={"model": model, "prompt": req.prompt, "stream": False},
timeout=60.0
)
resp.raise_for_status()
data = resp.json()
result = data.get("response", "")
except httpx.HTTPError as e:
raise HTTPException(status_code=500, detail=f"Ollama error: {e}")
# Update marketplace with result (if registered)
try:
async with httpx.AsyncClient() as client:
patch_resp = await client.patch(
f"{marketplace_url}/v1/jobs/{job_id}",
json={"result": result, "state": "completed"},
timeout=5.0
)
if patch_resp.status_code == 200:
click.echo(f"Updated job {job_id} with result")
except Exception as e:
click.echo(f"Warning: failed to update job in marketplace: {e}", err=True)
return JobResponse(result=result, model=model, job_id=job_id)
uvicorn.run(app, host="0.0.0.0", port=port)
@ai_group.command()
@click.option('--to', required=True, help='Provider host (IP)')
@click.option('--port', default=8008, help='Provider port')
@click.option('--prompt', required=True, help='Prompt to send')
@click.option('--buyer-wallet', 'buyer_wallet', required=True, help='Buyer wallet name (in local wallet store)')
@click.option('--provider-wallet', 'provider_wallet', required=True, help='Provider wallet address (recipient)')
@click.option('--amount', default=1, help='Amount to pay in AITBC')
def request(to, port, prompt, buyer_wallet, provider_wallet, amount):
"""Send a prompt to an AI provider (buyer side) with onchain payment."""
# Helper to get provider balance
def get_balance():
res = subprocess.run([
sys.executable, "-m", "aitbc_cli.main", "blockchain", "balance",
"--address", provider_wallet
], capture_output=True, text=True, check=True)
for line in res.stdout.splitlines():
if "Balance:" in line:
parts = line.split(":")
return float(parts[1].strip())
raise ValueError("Balance not found")
# Step 1: get initial balance
before = get_balance()
click.echo(f"Provider balance before: {before}")
# Step 2: send payment via blockchain CLI (use current Python env)
if amount > 0:
click.echo(f"Sending {amount} AITBC from wallet '{buyer_wallet}' to {provider_wallet}...")
try:
subprocess.run([
sys.executable, "-m", "aitbc_cli.main", "blockchain", "send",
"--from", buyer_wallet,
"--to", provider_wallet,
"--amount", str(amount)
], check=True, capture_output=True, text=True)
click.echo("Payment sent.")
except subprocess.CalledProcessError as e:
raise click.ClickException(f"Blockchain send failed: {e.stderr}")
# Step 3: get new balance
after = get_balance()
click.echo(f"Provider balance after: {after}")
delta = after - before
click.echo(f"Balance delta: {delta}")
# Step 4: call provider
url = f"http://{to}:{port}/job"
payload = {
"prompt": prompt,
"buyer": provider_wallet,
"amount": amount
}
try:
resp = httpx.post(url, json=payload, timeout=30.0)
resp.raise_for_status()
data = resp.json()
click.echo("Result: " + data.get("result", ""))
except httpx.HTTPError as e:
raise click.ClickException(f"Request to provider failed: {e}")
if __name__ == '__main__':
ai_group()

View File

@@ -300,6 +300,34 @@ def pay(ctx, booking_id: str, amount: float, from_wallet: str, to_wallet: str, t
except Exception as e: except Exception as e:
error(f"Payment failed: {e}") error(f"Payment failed: {e}")
@gpu.command()
@click.argument("gpu_id")
@click.option("--force", is_flag=True, help="Force delete even if GPU is booked")
@click.pass_context
def unregister(ctx, gpu_id: str, force: bool):
"""Unregister (delete) a GPU from marketplace"""
config = ctx.obj['config']
try:
with httpx.Client() as client:
response = client.delete(
f"{config.coordinator_url}/v1/marketplace/gpu/{gpu_id}",
params={"force": force},
headers={"X-Api-Key": config.api_key or ""}
)
if response.status_code == 200:
result = response.json()
success(f"GPU {gpu_id} unregistered")
output(result, ctx.obj['output_format'])
else:
error(f"Failed to unregister GPU: {response.status_code}")
if response.text:
error(response.text)
except Exception as e:
error(f"Network error: {e}")
@gpu.command() @gpu.command()
@click.argument("gpu_id") @click.argument("gpu_id")
@click.pass_context @click.pass_context

View File

@@ -162,11 +162,12 @@ requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["cli"] where = ["cli", "apps/coordinator-api"]
include = ["aitbc_cli*"] include = ["aitbc_cli*", "aitbc*"]
[tool.setuptools.package-dir] [tool.setuptools.package-dir]
"aitbc_cli" = "cli/aitbc_cli" "aitbc_cli" = "cli/aitbc_cli"
"aitbc" = "apps/coordinator-api/aitbc"
[dependency-groups] [dependency-groups]
dev = [ dev = [