feat: add GPU booking confirmation, Ollama task submission, and payment endpoints

- Add POST /marketplace/gpu/{gpu_id}/confirm endpoint for client booking acknowledgment
- Add POST /tasks/ollama endpoint for Ollama task submission with model and prompt parameters
- Add POST /payments/send endpoint as stub for blockchain payment processing
- Add GPUConfirmRequest, OllamaTaskRequest, and PaymentRequest models
- Add CLI commands: confirm, ollama-task, and pay for new endpoints
- Include validation
This commit is contained in:
oib
2026-03-07 13:59:28 +01:00
parent 52244c3ca5
commit a24e160b67
4 changed files with 632 additions and 0 deletions

View File

@@ -4,6 +4,7 @@ GPU marketplace endpoints backed by persistent SQLModel tables.
from typing import Any, Dict, List, Optional
from datetime import datetime, timedelta
from uuid import uuid4
import statistics
from fastapi import APIRouter, HTTPException, Query, Depends
@@ -67,6 +68,25 @@ class GPUBookRequest(BaseModel):
job_id: Optional[str] = None
class GPUConfirmRequest(BaseModel):
client_id: Optional[str] = None
class OllamaTaskRequest(BaseModel):
gpu_id: str
model: str = "llama2"
prompt: str
parameters: Dict[str, Any] = {}
class PaymentRequest(BaseModel):
from_wallet: str
to_wallet: str
amount: float
booking_id: Optional[str] = None
task_id: Optional[str] = None
class GPUReviewRequest(BaseModel):
rating: int = Field(ge=1, le=5)
comment: str
@@ -338,6 +358,105 @@ async def release_gpu(gpu_id: str, session: SessionDep) -> Dict[str, Any]:
}
# ---------------------------------------------------------------------------
# New endpoints: confirm booking, submit Ollama task, payment hook
# ---------------------------------------------------------------------------
@router.post("/marketplace/gpu/{gpu_id}/confirm")
async def confirm_gpu_booking(
gpu_id: str,
request: GPUConfirmRequest,
session: SessionDep,
) -> Dict[str, Any]:
"""Confirm a booking (client ACK)."""
gpu = _get_gpu_or_404(session, gpu_id)
if gpu.status != "booked":
raise HTTPException(
status_code=http_status.HTTP_409_CONFLICT,
detail=f"GPU {gpu_id} is not booked",
)
booking = session.execute(
select(GPUBooking)
.where(GPUBooking.gpu_id == gpu_id, GPUBooking.status == "active")
.limit(1)
).scalar_one_or_none()
if not booking:
raise HTTPException(
status_code=http_status.HTTP_404_NOT_FOUND,
detail=f"Active booking for {gpu_id} not found",
)
if request.client_id:
booking.client_id = request.client_id
session.add(booking)
session.commit()
session.refresh(booking)
return {
"status": "confirmed",
"gpu_id": gpu_id,
"booking_id": booking.id,
"client_id": booking.client_id,
"message": f"Booking confirmed for GPU {gpu_id}",
}
@router.post("/tasks/ollama")
async def submit_ollama_task(
request: OllamaTaskRequest,
session: SessionDep,
) -> Dict[str, Any]:
"""Stub Ollama task submission endpoint."""
# Ensure GPU exists and is booked
gpu = _get_gpu_or_404(session, request.gpu_id)
if gpu.status != "booked":
raise HTTPException(
status_code=http_status.HTTP_409_CONFLICT,
detail=f"GPU {request.gpu_id} is not booked",
)
task_id = f"task_{uuid4().hex[:10]}"
submitted_at = datetime.utcnow().isoformat() + "Z"
return {
"task_id": task_id,
"status": "submitted",
"submitted_at": submitted_at,
"gpu_id": request.gpu_id,
"model": request.model,
"prompt": request.prompt,
"parameters": request.parameters,
}
@router.post("/payments/send")
async def send_payment(request: PaymentRequest) -> Dict[str, Any]:
"""Stub payment endpoint (hook for blockchain processor)."""
if request.amount <= 0:
raise HTTPException(
status_code=http_status.HTTP_400_BAD_REQUEST,
detail="Amount must be greater than zero",
)
tx_id = f"tx_{uuid4().hex[:10]}"
processed_at = datetime.utcnow().isoformat() + "Z"
return {
"tx_id": tx_id,
"status": "processed",
"processed_at": processed_at,
"from": request.from_wallet,
"to": request.to_wallet,
"amount": request.amount,
"booking_id": request.booking_id,
"task_id": request.task_id,
}
@router.get("/marketplace/gpu/{gpu_id}/reviews")
async def get_gpu_reviews(
gpu_id: str,

View File

@@ -207,6 +207,96 @@ def book(ctx, gpu_id: str, hours: float, job_id: Optional[str]):
error(f"Network error: {e}")
@gpu.command()
@click.argument("gpu_id")
@click.pass_context
def confirm(ctx, gpu_id: str):
"""Confirm booking (client ACK)."""
config = ctx.obj["config"]
try:
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/marketplace/gpu/{gpu_id}/confirm",
headers={"Content-Type": "application/json", "X-Api-Key": config.api_key or ""},
json={"client_id": config.api_key or "client"},
)
if response.status_code in (200, 201):
result = response.json()
success(f"Booking confirmed for GPU {gpu_id}")
output(result, ctx.obj["output_format"])
else:
error(f"Failed to confirm booking: {response.status_code} {response.text}")
except Exception as e:
error(f"Confirmation failed: {e}")
@gpu.command(name="ollama-task")
@click.argument("gpu_id")
@click.option("--model", default="llama2", help="Model name for Ollama task")
@click.option("--prompt", required=True, help="Prompt to execute")
@click.option("--temperature", type=float, default=0.7, show_default=True)
@click.option("--max-tokens", type=int, default=128, show_default=True)
@click.pass_context
def ollama_task(ctx, gpu_id: str, model: str, prompt: str, temperature: float, max_tokens: int):
"""Submit Ollama task via coordinator API."""
config = ctx.obj["config"]
try:
payload = {
"gpu_id": gpu_id,
"model": model,
"prompt": prompt,
"parameters": {"temperature": temperature, "max_tokens": max_tokens},
}
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/tasks/ollama",
headers={"Content-Type": "application/json", "X-Api-Key": config.api_key or ""},
json=payload,
)
if response.status_code in (200, 201):
result = response.json()
success(f"Ollama task submitted: {result.get('task_id')}")
output(result, ctx.obj["output_format"])
else:
error(f"Failed to submit Ollama task: {response.status_code} {response.text}")
except Exception as e:
error(f"Ollama task submission failed: {e}")
@gpu.command(name="pay")
@click.argument("booking_id")
@click.argument("amount", type=float)
@click.option("--from-wallet", required=True, help="Sender wallet address")
@click.option("--to-wallet", required=True, help="Recipient wallet address")
@click.option("--task-id", help="Optional task id to link payment")
@click.pass_context
def pay(ctx, booking_id: str, amount: float, from_wallet: str, to_wallet: str, task_id: Optional[str]):
"""Send payment via coordinator payment hook (for real blockchain processor)."""
config = ctx.obj["config"]
try:
payload = {
"booking_id": booking_id,
"amount": amount,
"from_wallet": from_wallet,
"to_wallet": to_wallet,
}
if task_id:
payload["task_id"] = task_id
with httpx.Client() as client:
response = client.post(
f"{config.coordinator_url}/payments/send",
headers={"Content-Type": "application/json", "X-Api-Key": config.api_key or ""},
json=payload,
)
if response.status_code in (200, 201):
result = response.json()
success(f"Payment sent: {result.get('tx_id')}")
output(result, ctx.obj["output_format"])
else:
error(f"Failed to send payment: {response.status_code} {response.text}")
except Exception as e:
error(f"Payment failed: {e}")
@gpu.command()
@click.argument("gpu_id")
@click.pass_context

View File

@@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""
Simple Performance Test with Debugging and Timeout
"""
import time
import requests
import signal
import sys
from typing import Dict, List
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("Operation timed out")
def test_endpoint_with_timeout(url: str, method: str = "GET", data: Dict = None, timeout: int = 5) -> Dict:
"""Test single endpoint with timeout and debugging"""
print(f"🔍 Testing {method} {url}")
# Set timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
start_time = time.time()
if method == "GET":
response = requests.get(url, timeout=timeout)
elif method == "POST":
response = requests.post(url, json=data, timeout=timeout)
end_time = time.time()
signal.alarm(0) # Cancel timeout
response_time_ms = (end_time - start_time) * 1000
result = {
"url": url,
"method": method,
"status_code": response.status_code,
"response_time_ms": response_time_ms,
"success": True,
"error": None
}
print(f"✅ Status: {response.status_code}")
print(f"⏱️ Response Time: {response_time_ms:.2f}ms")
print(f"📄 Response Size: {len(response.content)} bytes")
return result
except TimeoutError as e:
signal.alarm(0)
print(f"❌ Timeout: {e}")
return {
"url": url,
"method": method,
"status_code": None,
"response_time_ms": timeout * 1000,
"success": False,
"error": str(e)
}
except Exception as e:
signal.alarm(0)
print(f"❌ Error: {e}")
return {
"url": url,
"method": method,
"status_code": None,
"response_time_ms": 0,
"success": False,
"error": str(e)
}
def run_performance_tests():
"""Run performance tests with debugging"""
print("🎯 AITBC GPU Marketplace Performance Test")
print("=" * 50)
base_url = "http://localhost:8000"
results = []
# Test 1: Health endpoint
print("\n1⃣ Health Endpoint Test")
result = test_endpoint_with_timeout(f"{base_url}/health", timeout=3)
results.append(result)
# Test 2: GPU List endpoint
print("\n2⃣ GPU List Endpoint Test")
result = test_endpoint_with_timeout(f"{base_url}/v1/marketplace/gpu/list", timeout=5)
results.append(result)
# Test 3: GPU Booking endpoint
print("\n3⃣ GPU Booking Endpoint Test")
booking_data = {"duration_hours": 1}
result = test_endpoint_with_timeout(
f"{base_url}/v1/marketplace/gpu/gpu_c5be877c/book",
"POST",
booking_data,
timeout=10
)
results.append(result)
# Test 4: GPU Release endpoint
print("\n4⃣ GPU Release Endpoint Test")
result = test_endpoint_with_timeout(
f"{base_url}/v1/marketplace/gpu/gpu_c5be877c/release",
"POST",
timeout=10
)
results.append(result)
# Summary
print("\n📊 PERFORMANCE SUMMARY")
print("=" * 50)
successful_tests = sum(1 for r in results if r["success"])
total_tests = len(results)
print(f"✅ Successful Tests: {successful_tests}/{total_tests} ({successful_tests/total_tests*100:.1f}%)")
print(f"\n📈 Response Times:")
for result in results:
if result["success"]:
status = "🟢" if result["response_time_ms"] < 100 else "🟡" if result["response_time_ms"] < 200 else "🔴"
endpoint = result['url'].split('/')[-1] if '/' in result['url'] else result['url']
print(f" {status} {result['method']} {endpoint}: {result['response_time_ms']:.2f}ms")
else:
endpoint = result['url'].split('/')[-1] if '/' in result['url'] else result['url']
print(f"{result['method']} {endpoint}: {result['error']}")
# Performance grade
successful_times = [r["response_time_ms"] for r in results if r["success"]]
if successful_times:
avg_response_time = sum(successful_times) / len(successful_times)
if avg_response_time < 50:
grade = "🟢 EXCELLENT"
elif avg_response_time < 100:
grade = "🟡 GOOD"
elif avg_response_time < 200:
grade = "🟠 FAIR"
else:
grade = "🔴 POOR"
print(f"\n🎯 Overall Performance: {grade}")
print(f"📊 Average Response Time: {avg_response_time:.2f}ms")
print(f"\n✅ Performance testing complete!")
if __name__ == "__main__":
try:
run_performance_tests()
except KeyboardInterrupt:
print("\n⚠️ Test interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Unexpected error: {e}")
sys.exit(1)

View File

@@ -0,0 +1,263 @@
#!/usr/bin/env python3
"""
End-to-End GPU Marketplace Workflow
User (aitbc server) → GPU Bidding → Ollama Task → Blockchain Payment
"""
import requests
import json
import time
import sys
from typing import Dict, List
class MarketplaceWorkflow:
def __init__(self, coordinator_url: str = "http://localhost:8000"):
self.coordinator_url = coordinator_url
self.workflow_steps = []
def log_step(self, step: str, status: str, details: str = ""):
"""Log workflow step"""
timestamp = time.strftime("%H:%M:%S")
self.workflow_steps.append({
"timestamp": timestamp,
"step": step,
"status": status,
"details": details
})
status_icon = "" if status == "success" else "" if status == "error" else "🔄"
print(f"{timestamp} {status_icon} {step}")
if details:
print(f" {details}")
def get_available_gpus(self) -> List[Dict]:
"""Get list of available GPUs"""
try:
print(f"🔍 DEBUG: Requesting GPU list from {self.coordinator_url}/v1/marketplace/gpu/list")
response = requests.get(f"{self.coordinator_url}/v1/marketplace/gpu/list")
print(f"🔍 DEBUG: Response status: {response.status_code}")
response.raise_for_status()
gpus = response.json()
print(f"🔍 DEBUG: Total GPUs found: {len(gpus)}")
available_gpus = [gpu for gpu in gpus if gpu["status"] == "available"]
print(f"🔍 DEBUG: Available GPUs: {len(available_gpus)}")
return available_gpus
except Exception as e:
print(f"🔍 DEBUG: Error in get_available_gpus: {str(e)}")
self.log_step("Get Available GPUs", "error", str(e))
return []
def book_gpu(self, gpu_id: str, duration_hours: int = 2) -> Dict:
"""Book a GPU for computation"""
try:
print(f"🔍 DEBUG: Attempting to book GPU {gpu_id} for {duration_hours} hours")
booking_data = {"duration_hours": duration_hours}
print(f"🔍 DEBUG: Booking data: {booking_data}")
response = requests.post(
f"{self.coordinator_url}/v1/marketplace/gpu/{gpu_id}/book",
json=booking_data
)
print(f"🔍 DEBUG: Booking response status: {response.status_code}")
print(f"🔍 DEBUG: Booking response: {response.text}")
response.raise_for_status()
booking = response.json()
print(f"🔍 DEBUG: Booking successful: {booking}")
self.log_step("Book GPU", "success", f"GPU {gpu_id} booked for {duration_hours} hours")
return booking
except Exception as e:
print(f"🔍 DEBUG: Error in book_gpu: {str(e)}")
self.log_step("Book GPU", "error", str(e))
return {}
def submit_ollama_task(self, gpu_id: str, task_data: Dict) -> Dict:
"""Submit Ollama task to the booked GPU"""
try:
print(f"🔍 DEBUG: Submitting Ollama task to GPU {gpu_id}")
print(f"🔍 DEBUG: Task data: {task_data}")
# Simulate Ollama task submission
task_payload = {
"gpu_id": gpu_id,
"model": task_data.get("model", "llama2"),
"prompt": task_data.get("prompt", "Hello, world!"),
"parameters": task_data.get("parameters", {})
}
print(f"🔍 DEBUG: Task payload: {task_payload}")
# This would integrate with actual Ollama service
# For now, simulate task submission
task_id = f"task_{int(time.time())}"
print(f"🔍 DEBUG: Generated task ID: {task_id}")
self.log_step("Submit Ollama Task", "success", f"Task {task_id} submitted to GPU {gpu_id}")
return {
"task_id": task_id,
"gpu_id": gpu_id,
"status": "submitted",
"model": task_payload["model"]
}
except Exception as e:
print(f"🔍 DEBUG: Error in submit_ollama_task: {str(e)}")
self.log_step("Submit Ollama Task", "error", str(e))
return {}
def process_blockchain_payment(self, booking: Dict, task_result: Dict) -> Dict:
"""Process payment via blockchain"""
try:
print(f"🔍 DEBUG: Processing blockchain payment")
print(f"🔍 DEBUG: Booking data: {booking}")
print(f"🔍 DEBUG: Task result: {task_result}")
# Calculate payment amount
payment_amount = booking.get("total_cost", 0.0)
print(f"🔍 DEBUG: Payment amount: {payment_amount} AITBC")
# Simulate blockchain payment processing
payment_data = {
"from": "aitbc_server_user",
"to": "gpu_provider",
"amount": payment_amount,
"currency": "AITBC",
"booking_id": booking.get("booking_id"),
"task_id": task_result.get("task_id"),
"gpu_id": booking.get("gpu_id")
}
print(f"🔍 DEBUG: Payment data: {payment_data}")
# This would integrate with actual blockchain service
# For now, simulate payment
transaction_id = f"tx_{int(time.time())}"
print(f"🔍 DEBUG: Generated transaction ID: {transaction_id}")
self.log_step("Process Blockchain Payment", "success",
f"Payment {payment_amount} AITBC processed (TX: {transaction_id})")
return {
"transaction_id": transaction_id,
"amount": payment_amount,
"status": "confirmed",
"payment_data": payment_data
}
except Exception as e:
print(f"🔍 DEBUG: Error in process_blockchain_payment: {str(e)}")
self.log_step("Process Blockchain Payment", "error", str(e))
return {}
def release_gpu(self, gpu_id: str) -> Dict:
"""Release the GPU after task completion"""
try:
print(f"🔍 DEBUG: Releasing GPU {gpu_id}")
response = requests.post(f"{self.coordinator_url}/v1/marketplace/gpu/{gpu_id}/release")
print(f"🔍 DEBUG: Release response status: {response.status_code}")
print(f"🔍 DEBUG: Release response: {response.text}")
response.raise_for_status()
release_result = response.json()
print(f"🔍 DEBUG: GPU release successful: {release_result}")
self.log_step("Release GPU", "success", f"GPU {gpu_id} released")
return release_result
except Exception as e:
print(f"🔍 DEBUG: Error in release_gpu: {str(e)}")
self.log_step("Release GPU", "error", str(e))
return {}
def run_complete_workflow(self, task_data: Dict = None) -> bool:
"""Run the complete end-to-end workflow"""
print("🚀 Starting End-to-End GPU Marketplace Workflow")
print("=" * 60)
# Default task data if not provided
if not task_data:
task_data = {
"model": "llama2",
"prompt": "Analyze this data and provide insights",
"parameters": {"temperature": 0.7, "max_tokens": 100}
}
# Step 1: Get available GPUs
self.log_step("Initialize Workflow", "info", "Starting GPU marketplace workflow")
available_gpus = self.get_available_gpus()
if not available_gpus:
self.log_step("Workflow Failed", "error", "No available GPUs in marketplace")
return False
# Select best GPU (lowest price)
selected_gpu = min(available_gpus, key=lambda x: x["price_per_hour"])
gpu_id = selected_gpu["id"]
self.log_step("Select GPU", "success",
f"Selected {selected_gpu['model']} @ ${selected_gpu['price_per_hour']}/hour")
# Step 2: Book GPU
booking = self.book_gpu(gpu_id, duration_hours=2)
if not booking:
return False
# Step 3: Submit Ollama Task
task_result = self.submit_ollama_task(gpu_id, task_data)
if not task_result:
return False
# Simulate task processing time
self.log_step("Process Task", "info", "Simulating Ollama task execution...")
time.sleep(2) # Simulate processing
# Step 4: Process Blockchain Payment
payment = self.process_blockchain_payment(booking, task_result)
if not payment:
return False
# Step 5: Release GPU
release_result = self.release_gpu(gpu_id)
if not release_result:
return False
# Workflow Summary
self.print_workflow_summary()
return True
def print_workflow_summary(self):
"""Print workflow execution summary"""
print("\n📊 WORKFLOW EXECUTION SUMMARY")
print("=" * 60)
successful_steps = sum(1 for step in self.workflow_steps if step["status"] == "success")
total_steps = len(self.workflow_steps)
print(f"✅ Successful Steps: {successful_steps}/{total_steps}")
print(f"📈 Success Rate: {successful_steps/total_steps*100:.1f}%")
print(f"\n📋 Step-by-Step Details:")
for step in self.workflow_steps:
status_icon = "" if step["status"] == "success" else "" if step["status"] == "error" else "🔄"
print(f" {step['timestamp']} {status_icon} {step['step']}")
if step["details"]:
print(f" {step['details']}")
print(f"\n🎉 Workflow Status: {'✅ COMPLETED' if successful_steps == total_steps else '❌ FAILED'}")
def main():
"""Main execution function"""
workflow = MarketplaceWorkflow()
# Example task data
task_data = {
"model": "llama2",
"prompt": "Analyze the following GPU marketplace data and provide investment insights",
"parameters": {
"temperature": 0.7,
"max_tokens": 150,
"top_p": 0.9
}
}
# Run the complete workflow
success = workflow.run_complete_workflow(task_data)
if success:
print("\n🎊 End-to-End GPU Marketplace Workflow completed successfully!")
print("✅ User bid on GPU → Ollama task executed → Blockchain payment processed")
else:
print("\n❌ Workflow failed. Check the logs above for details.")
sys.exit(1)
if __name__ == "__main__":
main()