diff --git a/apps/coordinator-api/src/app/routers/marketplace_gpu.py b/apps/coordinator-api/src/app/routers/marketplace_gpu.py index 76d23608..0952b56a 100755 --- a/apps/coordinator-api/src/app/routers/marketplace_gpu.py +++ b/apps/coordinator-api/src/app/routers/marketplace_gpu.py @@ -4,6 +4,7 @@ GPU marketplace endpoints backed by persistent SQLModel tables. from typing import Any, Dict, List, Optional from datetime import datetime, timedelta +from uuid import uuid4 import statistics from fastapi import APIRouter, HTTPException, Query, Depends @@ -67,6 +68,25 @@ class GPUBookRequest(BaseModel): job_id: Optional[str] = None +class GPUConfirmRequest(BaseModel): + client_id: Optional[str] = None + + +class OllamaTaskRequest(BaseModel): + gpu_id: str + model: str = "llama2" + prompt: str + parameters: Dict[str, Any] = {} + + +class PaymentRequest(BaseModel): + from_wallet: str + to_wallet: str + amount: float + booking_id: Optional[str] = None + task_id: Optional[str] = None + + class GPUReviewRequest(BaseModel): rating: int = Field(ge=1, le=5) comment: str @@ -338,6 +358,105 @@ async def release_gpu(gpu_id: str, session: SessionDep) -> Dict[str, Any]: } +# --------------------------------------------------------------------------- +# New endpoints: confirm booking, submit Ollama task, payment hook +# --------------------------------------------------------------------------- + + +@router.post("/marketplace/gpu/{gpu_id}/confirm") +async def confirm_gpu_booking( + gpu_id: str, + request: GPUConfirmRequest, + session: SessionDep, +) -> Dict[str, Any]: + """Confirm a booking (client ACK).""" + gpu = _get_gpu_or_404(session, gpu_id) + + if gpu.status != "booked": + raise HTTPException( + status_code=http_status.HTTP_409_CONFLICT, + detail=f"GPU {gpu_id} is not booked", + ) + + booking = session.execute( + select(GPUBooking) + .where(GPUBooking.gpu_id == gpu_id, GPUBooking.status == "active") + .limit(1) + ).scalar_one_or_none() + + if not booking: + raise HTTPException( + status_code=http_status.HTTP_404_NOT_FOUND, + detail=f"Active booking for {gpu_id} not found", + ) + + if request.client_id: + booking.client_id = request.client_id + session.add(booking) + session.commit() + session.refresh(booking) + + return { + "status": "confirmed", + "gpu_id": gpu_id, + "booking_id": booking.id, + "client_id": booking.client_id, + "message": f"Booking confirmed for GPU {gpu_id}", + } + + +@router.post("/tasks/ollama") +async def submit_ollama_task( + request: OllamaTaskRequest, + session: SessionDep, +) -> Dict[str, Any]: + """Stub Ollama task submission endpoint.""" + # Ensure GPU exists and is booked + gpu = _get_gpu_or_404(session, request.gpu_id) + if gpu.status != "booked": + raise HTTPException( + status_code=http_status.HTTP_409_CONFLICT, + detail=f"GPU {request.gpu_id} is not booked", + ) + + task_id = f"task_{uuid4().hex[:10]}" + submitted_at = datetime.utcnow().isoformat() + "Z" + + return { + "task_id": task_id, + "status": "submitted", + "submitted_at": submitted_at, + "gpu_id": request.gpu_id, + "model": request.model, + "prompt": request.prompt, + "parameters": request.parameters, + } + + +@router.post("/payments/send") +async def send_payment(request: PaymentRequest) -> Dict[str, Any]: + """Stub payment endpoint (hook for blockchain processor).""" + if request.amount <= 0: + raise HTTPException( + status_code=http_status.HTTP_400_BAD_REQUEST, + detail="Amount must be greater than zero", + ) + + tx_id = f"tx_{uuid4().hex[:10]}" + processed_at = datetime.utcnow().isoformat() + "Z" + + return { + "tx_id": tx_id, + "status": "processed", + "processed_at": processed_at, + "from": request.from_wallet, + "to": request.to_wallet, + "amount": request.amount, + "booking_id": request.booking_id, + "task_id": request.task_id, + } + + @router.get("/marketplace/gpu/{gpu_id}/reviews") async def get_gpu_reviews( gpu_id: str, diff --git a/cli/aitbc_cli/commands/marketplace.py b/cli/aitbc_cli/commands/marketplace.py index fd23c805..89a7d289 100755 --- a/cli/aitbc_cli/commands/marketplace.py +++ b/cli/aitbc_cli/commands/marketplace.py @@ -207,6 +207,96 @@ def book(ctx, gpu_id: str, hours: float, job_id: Optional[str]): error(f"Network error: {e}") +@gpu.command() +@click.argument("gpu_id") +@click.pass_context +def confirm(ctx, gpu_id: str): + """Confirm booking (client ACK).""" + config = ctx.obj["config"] + try: + with httpx.Client() as client: + response = client.post( + f"{config.coordinator_url}/marketplace/gpu/{gpu_id}/confirm", + headers={"Content-Type": "application/json", "X-Api-Key": config.api_key or ""}, + json={"client_id": config.api_key or "client"}, + ) + if response.status_code in (200, 201): + result = response.json() + success(f"Booking confirmed for GPU {gpu_id}") + output(result, ctx.obj["output_format"]) + else: + error(f"Failed to confirm booking: {response.status_code} {response.text}") + except Exception as e: + error(f"Confirmation failed: {e}") + + +@gpu.command(name="ollama-task") +@click.argument("gpu_id") +@click.option("--model", default="llama2", help="Model name for Ollama task") +@click.option("--prompt", required=True, help="Prompt to execute") +@click.option("--temperature", type=float, default=0.7, show_default=True) +@click.option("--max-tokens", type=int, default=128, show_default=True) +@click.pass_context +def ollama_task(ctx, gpu_id: str, model: str, prompt: str, temperature: float, max_tokens: int): + """Submit Ollama task via coordinator API.""" + config = ctx.obj["config"] + try: + payload = { + "gpu_id": gpu_id, + "model": model, + "prompt": prompt, + "parameters": {"temperature": temperature, "max_tokens": max_tokens}, + } + with httpx.Client() as client: + response = client.post( + f"{config.coordinator_url}/tasks/ollama", + headers={"Content-Type": "application/json", "X-Api-Key": config.api_key or ""}, + json=payload, + ) + if response.status_code in (200, 201): + result = response.json() + success(f"Ollama task submitted: {result.get('task_id')}") + output(result, ctx.obj["output_format"]) + else: + error(f"Failed to submit Ollama task: {response.status_code} {response.text}") + except Exception as e: + error(f"Ollama task submission failed: {e}") + + +@gpu.command(name="pay") +@click.argument("booking_id") +@click.argument("amount", type=float) +@click.option("--from-wallet", required=True, help="Sender wallet address") +@click.option("--to-wallet", required=True, help="Recipient wallet address") +@click.option("--task-id", help="Optional task id to link payment") +@click.pass_context +def pay(ctx, booking_id: str, amount: float, from_wallet: str, to_wallet: str, task_id: Optional[str]): + """Send payment via coordinator payment hook (for real blockchain processor).""" + config = ctx.obj["config"] + try: + payload = { + "booking_id": booking_id, + "amount": amount, + "from_wallet": from_wallet, + "to_wallet": to_wallet, + } + if task_id: + payload["task_id"] = task_id + with httpx.Client() as client: + response = client.post( + f"{config.coordinator_url}/payments/send", + headers={"Content-Type": "application/json", "X-Api-Key": config.api_key or ""}, + json=payload, + ) + if response.status_code in (200, 201): + result = response.json() + success(f"Payment sent: {result.get('tx_id')}") + output(result, ctx.obj["output_format"]) + else: + error(f"Failed to send payment: {response.status_code} {response.text}") + except Exception as e: + error(f"Payment failed: {e}") + @gpu.command() @click.argument("gpu_id") @click.pass_context diff --git a/scripts/debug_performance_test.py b/scripts/debug_performance_test.py new file mode 100644 index 00000000..4a638d73 --- /dev/null +++ b/scripts/debug_performance_test.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +""" +Simple Performance Test with Debugging and Timeout +""" + +import time +import requests +import signal +import sys +from typing import Dict, List + +class TimeoutError(Exception): + pass + +def timeout_handler(signum, frame): + raise TimeoutError("Operation timed out") + +def test_endpoint_with_timeout(url: str, method: str = "GET", data: Dict = None, timeout: int = 5) -> Dict: + """Test single endpoint with timeout and debugging""" + print(f"šŸ” Testing {method} {url}") + + # Set timeout + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(timeout) + + try: + start_time = time.time() + + if method == "GET": + response = requests.get(url, timeout=timeout) + elif method == "POST": + response = requests.post(url, json=data, timeout=timeout) + + end_time = time.time() + signal.alarm(0) # Cancel timeout + + response_time_ms = (end_time - start_time) * 1000 + + result = { + "url": url, + "method": method, + "status_code": response.status_code, + "response_time_ms": response_time_ms, + "success": True, + "error": None + } + + print(f"āœ… Status: {response.status_code}") + print(f"ā±ļø Response Time: {response_time_ms:.2f}ms") + print(f"šŸ“„ Response Size: {len(response.content)} bytes") + + return result + + except TimeoutError as e: + signal.alarm(0) + print(f"āŒ Timeout: {e}") + return { + "url": url, + "method": method, + "status_code": None, + "response_time_ms": timeout * 1000, + "success": False, + "error": str(e) + } + except Exception as e: + signal.alarm(0) + print(f"āŒ Error: {e}") + return { + "url": url, + "method": method, + "status_code": None, + "response_time_ms": 0, + "success": False, + "error": str(e) + } + +def run_performance_tests(): + """Run performance tests with debugging""" + print("šŸŽÆ AITBC GPU Marketplace Performance Test") + print("=" * 50) + + base_url = "http://localhost:8000" + results = [] + + # Test 1: Health endpoint + print("\n1ļøāƒ£ Health Endpoint Test") + result = test_endpoint_with_timeout(f"{base_url}/health", timeout=3) + results.append(result) + + # Test 2: GPU List endpoint + print("\n2ļøāƒ£ GPU List Endpoint Test") + result = test_endpoint_with_timeout(f"{base_url}/v1/marketplace/gpu/list", timeout=5) + results.append(result) + + # Test 3: GPU Booking endpoint + print("\n3ļøāƒ£ GPU Booking Endpoint Test") + booking_data = {"duration_hours": 1} + result = test_endpoint_with_timeout( + f"{base_url}/v1/marketplace/gpu/gpu_c5be877c/book", + "POST", + booking_data, + timeout=10 + ) + results.append(result) + + # Test 4: GPU Release endpoint + print("\n4ļøāƒ£ GPU Release Endpoint Test") + result = test_endpoint_with_timeout( + f"{base_url}/v1/marketplace/gpu/gpu_c5be877c/release", + "POST", + timeout=10 + ) + results.append(result) + + # Summary + print("\nšŸ“Š PERFORMANCE SUMMARY") + print("=" * 50) + + successful_tests = sum(1 for r in results if r["success"]) + total_tests = len(results) + + print(f"āœ… Successful Tests: {successful_tests}/{total_tests} ({successful_tests/total_tests*100:.1f}%)") + + print(f"\nšŸ“ˆ Response Times:") + for result in results: + if result["success"]: + status = "🟢" if result["response_time_ms"] < 100 else "🟔" if result["response_time_ms"] < 200 else "šŸ”“" + endpoint = result['url'].split('/')[-1] if '/' in result['url'] else result['url'] + print(f" {status} {result['method']} {endpoint}: {result['response_time_ms']:.2f}ms") + else: + endpoint = result['url'].split('/')[-1] if '/' in result['url'] else result['url'] + print(f" āŒ {result['method']} {endpoint}: {result['error']}") + + # Performance grade + successful_times = [r["response_time_ms"] for r in results if r["success"]] + if successful_times: + avg_response_time = sum(successful_times) / len(successful_times) + if avg_response_time < 50: + grade = "🟢 EXCELLENT" + elif avg_response_time < 100: + grade = "🟔 GOOD" + elif avg_response_time < 200: + grade = "🟠 FAIR" + else: + grade = "šŸ”“ POOR" + + print(f"\nšŸŽÆ Overall Performance: {grade}") + print(f"šŸ“Š Average Response Time: {avg_response_time:.2f}ms") + + print(f"\nāœ… Performance testing complete!") + +if __name__ == "__main__": + try: + run_performance_tests() + except KeyboardInterrupt: + print("\nāš ļø Test interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\nāŒ Unexpected error: {e}") + sys.exit(1) diff --git a/scripts/end_to_end_workflow.py b/scripts/end_to_end_workflow.py new file mode 100644 index 00000000..9774f6d6 --- /dev/null +++ b/scripts/end_to_end_workflow.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +""" +End-to-End GPU Marketplace Workflow +User (aitbc server) → GPU Bidding → Ollama Task → Blockchain Payment +""" + +import requests +import json +import time +import sys +from typing import Dict, List + +class MarketplaceWorkflow: + def __init__(self, coordinator_url: str = "http://localhost:8000"): + self.coordinator_url = coordinator_url + self.workflow_steps = [] + + def log_step(self, step: str, status: str, details: str = ""): + """Log workflow step""" + timestamp = time.strftime("%H:%M:%S") + self.workflow_steps.append({ + "timestamp": timestamp, + "step": step, + "status": status, + "details": details + }) + status_icon = "āœ…" if status == "success" else "āŒ" if status == "error" else "šŸ”„" + print(f"{timestamp} {status_icon} {step}") + if details: + print(f" {details}") + + def get_available_gpus(self) -> List[Dict]: + """Get list of available GPUs""" + try: + print(f"šŸ” DEBUG: Requesting GPU list from {self.coordinator_url}/v1/marketplace/gpu/list") + response = requests.get(f"{self.coordinator_url}/v1/marketplace/gpu/list") + print(f"šŸ” DEBUG: Response status: {response.status_code}") + response.raise_for_status() + gpus = response.json() + print(f"šŸ” DEBUG: Total GPUs found: {len(gpus)}") + available_gpus = [gpu for gpu in gpus if gpu["status"] == "available"] + print(f"šŸ” DEBUG: Available GPUs: {len(available_gpus)}") + return available_gpus + except Exception as e: + print(f"šŸ” DEBUG: Error in get_available_gpus: {str(e)}") + self.log_step("Get Available GPUs", "error", str(e)) + return [] + + def book_gpu(self, gpu_id: str, duration_hours: int = 2) -> Dict: + """Book a GPU for computation""" + try: + print(f"šŸ” DEBUG: Attempting to book GPU {gpu_id} for {duration_hours} hours") + booking_data = {"duration_hours": duration_hours} + print(f"šŸ” DEBUG: Booking data: {booking_data}") + response = requests.post( + f"{self.coordinator_url}/v1/marketplace/gpu/{gpu_id}/book", + json=booking_data + ) + print(f"šŸ” DEBUG: Booking response status: {response.status_code}") + print(f"šŸ” DEBUG: Booking response: {response.text}") + response.raise_for_status() + booking = response.json() + print(f"šŸ” DEBUG: Booking successful: {booking}") + self.log_step("Book GPU", "success", f"GPU {gpu_id} booked for {duration_hours} hours") + return booking + except Exception as e: + print(f"šŸ” DEBUG: Error in book_gpu: {str(e)}") + self.log_step("Book GPU", "error", str(e)) + return {} + + def submit_ollama_task(self, gpu_id: str, task_data: Dict) -> Dict: + """Submit Ollama task to the booked GPU""" + try: + print(f"šŸ” DEBUG: Submitting Ollama task to GPU {gpu_id}") + print(f"šŸ” DEBUG: Task data: {task_data}") + # Simulate Ollama task submission + task_payload = { + "gpu_id": gpu_id, + "model": task_data.get("model", "llama2"), + "prompt": task_data.get("prompt", "Hello, world!"), + "parameters": task_data.get("parameters", {}) + } + print(f"šŸ” DEBUG: Task payload: {task_payload}") + + # This would integrate with actual Ollama service + # For now, simulate task submission + task_id = f"task_{int(time.time())}" + print(f"šŸ” DEBUG: Generated task ID: {task_id}") + + self.log_step("Submit Ollama Task", "success", f"Task {task_id} submitted to GPU {gpu_id}") + + return { + "task_id": task_id, + "gpu_id": gpu_id, + "status": "submitted", + "model": task_payload["model"] + } + except Exception as e: + print(f"šŸ” DEBUG: Error in submit_ollama_task: {str(e)}") + self.log_step("Submit Ollama Task", "error", str(e)) + return {} + + def process_blockchain_payment(self, booking: Dict, task_result: Dict) -> Dict: + """Process payment via blockchain""" + try: + print(f"šŸ” DEBUG: Processing blockchain payment") + print(f"šŸ” DEBUG: Booking data: {booking}") + print(f"šŸ” DEBUG: Task result: {task_result}") + # Calculate payment amount + payment_amount = booking.get("total_cost", 0.0) + print(f"šŸ” DEBUG: Payment amount: {payment_amount} AITBC") + + # Simulate blockchain payment processing + payment_data = { + "from": "aitbc_server_user", + "to": "gpu_provider", + "amount": payment_amount, + "currency": "AITBC", + "booking_id": booking.get("booking_id"), + "task_id": task_result.get("task_id"), + "gpu_id": booking.get("gpu_id") + } + print(f"šŸ” DEBUG: Payment data: {payment_data}") + + # This would integrate with actual blockchain service + # For now, simulate payment + transaction_id = f"tx_{int(time.time())}" + print(f"šŸ” DEBUG: Generated transaction ID: {transaction_id}") + + self.log_step("Process Blockchain Payment", "success", + f"Payment {payment_amount} AITBC processed (TX: {transaction_id})") + + return { + "transaction_id": transaction_id, + "amount": payment_amount, + "status": "confirmed", + "payment_data": payment_data + } + except Exception as e: + print(f"šŸ” DEBUG: Error in process_blockchain_payment: {str(e)}") + self.log_step("Process Blockchain Payment", "error", str(e)) + return {} + + def release_gpu(self, gpu_id: str) -> Dict: + """Release the GPU after task completion""" + try: + print(f"šŸ” DEBUG: Releasing GPU {gpu_id}") + response = requests.post(f"{self.coordinator_url}/v1/marketplace/gpu/{gpu_id}/release") + print(f"šŸ” DEBUG: Release response status: {response.status_code}") + print(f"šŸ” DEBUG: Release response: {response.text}") + response.raise_for_status() + release_result = response.json() + print(f"šŸ” DEBUG: GPU release successful: {release_result}") + self.log_step("Release GPU", "success", f"GPU {gpu_id} released") + return release_result + except Exception as e: + print(f"šŸ” DEBUG: Error in release_gpu: {str(e)}") + self.log_step("Release GPU", "error", str(e)) + return {} + + def run_complete_workflow(self, task_data: Dict = None) -> bool: + """Run the complete end-to-end workflow""" + print("šŸš€ Starting End-to-End GPU Marketplace Workflow") + print("=" * 60) + + # Default task data if not provided + if not task_data: + task_data = { + "model": "llama2", + "prompt": "Analyze this data and provide insights", + "parameters": {"temperature": 0.7, "max_tokens": 100} + } + + # Step 1: Get available GPUs + self.log_step("Initialize Workflow", "info", "Starting GPU marketplace workflow") + available_gpus = self.get_available_gpus() + + if not available_gpus: + self.log_step("Workflow Failed", "error", "No available GPUs in marketplace") + return False + + # Select best GPU (lowest price) + selected_gpu = min(available_gpus, key=lambda x: x["price_per_hour"]) + gpu_id = selected_gpu["id"] + + self.log_step("Select GPU", "success", + f"Selected {selected_gpu['model']} @ ${selected_gpu['price_per_hour']}/hour") + + # Step 2: Book GPU + booking = self.book_gpu(gpu_id, duration_hours=2) + if not booking: + return False + + # Step 3: Submit Ollama Task + task_result = self.submit_ollama_task(gpu_id, task_data) + if not task_result: + return False + + # Simulate task processing time + self.log_step("Process Task", "info", "Simulating Ollama task execution...") + time.sleep(2) # Simulate processing + + # Step 4: Process Blockchain Payment + payment = self.process_blockchain_payment(booking, task_result) + if not payment: + return False + + # Step 5: Release GPU + release_result = self.release_gpu(gpu_id) + if not release_result: + return False + + # Workflow Summary + self.print_workflow_summary() + return True + + def print_workflow_summary(self): + """Print workflow execution summary""" + print("\nšŸ“Š WORKFLOW EXECUTION SUMMARY") + print("=" * 60) + + successful_steps = sum(1 for step in self.workflow_steps if step["status"] == "success") + total_steps = len(self.workflow_steps) + + print(f"āœ… Successful Steps: {successful_steps}/{total_steps}") + print(f"šŸ“ˆ Success Rate: {successful_steps/total_steps*100:.1f}%") + + print(f"\nšŸ“‹ Step-by-Step Details:") + for step in self.workflow_steps: + status_icon = "āœ…" if step["status"] == "success" else "āŒ" if step["status"] == "error" else "šŸ”„" + print(f" {step['timestamp']} {status_icon} {step['step']}") + if step["details"]: + print(f" {step['details']}") + + print(f"\nšŸŽ‰ Workflow Status: {'āœ… COMPLETED' if successful_steps == total_steps else 'āŒ FAILED'}") + +def main(): + """Main execution function""" + workflow = MarketplaceWorkflow() + + # Example task data + task_data = { + "model": "llama2", + "prompt": "Analyze the following GPU marketplace data and provide investment insights", + "parameters": { + "temperature": 0.7, + "max_tokens": 150, + "top_p": 0.9 + } + } + + # Run the complete workflow + success = workflow.run_complete_workflow(task_data) + + if success: + print("\nšŸŽŠ End-to-End GPU Marketplace Workflow completed successfully!") + print("āœ… User bid on GPU → Ollama task executed → Blockchain payment processed") + else: + print("\nāŒ Workflow failed. Check the logs above for details.") + sys.exit(1) + +if __name__ == "__main__": + main()