- Upgrade ollama-gpu-provider skill to v2.0 with complete test workflow documentation - Add comprehensive troubleshooting, monitoring commands, and CI/CD integration examples - Update client.py default coordinator port from 8000 to 18000 - Clear currentissue.md and add usage guidelines for issue tracking - Create dev-utils/ directory and move aitbc-pythonpath.pth from root - Create docs/guides/ and docs
259 lines
9.4 KiB
Python
Executable File
259 lines
9.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Ollama GPU Provider Test with Blockchain Verification
|
|
Submits an inference job and verifies the complete flow:
|
|
- Job submission to coordinator
|
|
- Processing by GPU miner
|
|
- Receipt generation
|
|
- Blockchain transaction recording
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
import time
|
|
from typing import Optional
|
|
import json
|
|
|
|
import httpx
|
|
|
|
# Configuration
|
|
DEFAULT_COORDINATOR = "http://127.0.0.1:18000"
|
|
DEFAULT_BLOCKCHAIN = "http://127.0.0.1:19000"
|
|
DEFAULT_API_KEY = "REDACTED_CLIENT_KEY"
|
|
DEFAULT_PROMPT = "What is the capital of France?"
|
|
DEFAULT_MODEL = "llama3.2:latest"
|
|
DEFAULT_TIMEOUT = 180
|
|
POLL_INTERVAL = 3
|
|
|
|
|
|
def submit_job(client: httpx.Client, base_url: str, api_key: str, prompt: str, model: str) -> Optional[str]:
|
|
"""Submit an inference job to the coordinator"""
|
|
payload = {
|
|
"payload": {
|
|
"type": "inference",
|
|
"prompt": prompt,
|
|
"parameters": {
|
|
"prompt": prompt,
|
|
"model": model,
|
|
"stream": False
|
|
},
|
|
},
|
|
"ttl_seconds": 900,
|
|
}
|
|
response = client.post(
|
|
f"{base_url}/v1/jobs",
|
|
headers={"X-Api-Key": api_key, "Content-Type": "application/json"},
|
|
json=payload,
|
|
timeout=10,
|
|
)
|
|
if response.status_code != 201:
|
|
print(f"❌ Job submission failed: {response.status_code} {response.text}")
|
|
return None
|
|
return response.json().get("job_id")
|
|
|
|
|
|
def fetch_status(client: httpx.Client, base_url: str, api_key: str, job_id: str) -> Optional[dict]:
|
|
"""Fetch job status from coordinator"""
|
|
response = client.get(
|
|
f"{base_url}/v1/jobs/{job_id}",
|
|
headers={"X-Api-Key": api_key},
|
|
timeout=10,
|
|
)
|
|
if response.status_code != 200:
|
|
print(f"❌ Status check failed: {response.status_code} {response.text}")
|
|
return None
|
|
return response.json()
|
|
|
|
|
|
def fetch_result(client: httpx.Client, base_url: str, api_key: str, job_id: str) -> Optional[dict]:
|
|
"""Fetch job result from coordinator"""
|
|
response = client.get(
|
|
f"{base_url}/v1/jobs/{job_id}/result",
|
|
headers={"X-Api-Key": api_key},
|
|
timeout=10,
|
|
)
|
|
if response.status_code != 200:
|
|
print(f"❌ Result fetch failed: {response.status_code} {response.text}")
|
|
return None
|
|
return response.json()
|
|
|
|
|
|
def fetch_receipt(client: httpx.Client, base_url: str, api_key: str, job_id: str) -> Optional[dict]:
|
|
"""Fetch job receipt from coordinator"""
|
|
response = client.get(
|
|
f"{base_url}/v1/jobs/{job_id}/receipt",
|
|
headers={"X-Api-Key": api_key},
|
|
timeout=10,
|
|
)
|
|
if response.status_code != 200:
|
|
print(f"❌ Receipt fetch failed: {response.status_code} {response.text}")
|
|
return None
|
|
return response.json()
|
|
|
|
|
|
def check_blockchain_transaction(client: httpx.Client, blockchain_url: str, receipt_id: str) -> Optional[dict]:
|
|
"""Check if receipt is recorded on blockchain"""
|
|
# Search for transaction by receipt ID
|
|
response = client.get(
|
|
f"{blockchain_url}/rpc/transactions/search",
|
|
params={"receipt_id": receipt_id},
|
|
timeout=10,
|
|
)
|
|
if response.status_code != 200:
|
|
print(f"⚠️ Blockchain search failed: {response.status_code}")
|
|
return None
|
|
|
|
transactions = response.json().get("transactions", [])
|
|
if transactions:
|
|
return transactions[0] # Return the first matching transaction
|
|
return None
|
|
|
|
|
|
def get_miner_info(client: httpx.Client, base_url: str, api_key: str) -> Optional[dict]:
|
|
"""Get registered miner information"""
|
|
response = client.get(
|
|
f"{base_url}/v1/admin/miners",
|
|
headers={"X-Api-Key": api_key},
|
|
timeout=10,
|
|
)
|
|
if response.status_code != 200:
|
|
print(f"⚠️ Could not fetch miner info: {response.status_code}")
|
|
return None
|
|
|
|
data = response.json()
|
|
# Handle different response formats
|
|
if isinstance(data, list):
|
|
return data[0] if data else None
|
|
elif isinstance(data, dict):
|
|
if 'miners' in data:
|
|
miners = data['miners']
|
|
return miners[0] if miners else None
|
|
elif 'items' in data:
|
|
items = data['items']
|
|
return items[0] if items else None
|
|
return None
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Ollama GPU provider with blockchain verification")
|
|
parser.add_argument("--coordinator-url", default=DEFAULT_COORDINATOR, help="Coordinator base URL")
|
|
parser.add_argument("--blockchain-url", default=DEFAULT_BLOCKCHAIN, help="Blockchain node URL")
|
|
parser.add_argument("--api-key", default=DEFAULT_API_KEY, help="Client API key")
|
|
parser.add_argument("--prompt", default=DEFAULT_PROMPT, help="Prompt to send")
|
|
parser.add_argument("--model", default=DEFAULT_MODEL, help="Model to use")
|
|
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="Timeout in seconds")
|
|
args = parser.parse_args()
|
|
|
|
print("🚀 Starting Ollama GPU Provider Test with Blockchain Verification")
|
|
print("=" * 60)
|
|
|
|
# Check miner registration
|
|
print("\n📋 Checking miner registration...")
|
|
with httpx.Client() as client:
|
|
miner_info = get_miner_info(client, args.coordinator_url, "REDACTED_ADMIN_KEY")
|
|
if miner_info:
|
|
print(f"✅ Found registered miner: {miner_info.get('miner_id')}")
|
|
print(f" Status: {miner_info.get('status')}")
|
|
print(f" Last seen: {miner_info.get('last_seen')}")
|
|
else:
|
|
print("⚠️ No miners registered. Job may not be processed.")
|
|
|
|
# Submit job
|
|
print(f"\n📤 Submitting inference job...")
|
|
print(f" Prompt: {args.prompt}")
|
|
print(f" Model: {args.model}")
|
|
|
|
with httpx.Client() as client:
|
|
job_id = submit_job(client, args.coordinator_url, args.api_key, args.prompt, args.model)
|
|
if not job_id:
|
|
return 1
|
|
print(f"✅ Job submitted successfully: {job_id}")
|
|
|
|
# Monitor job progress
|
|
print(f"\n⏳ Monitoring job progress...")
|
|
deadline = time.time() + args.timeout
|
|
status = None
|
|
|
|
while time.time() < deadline:
|
|
status = fetch_status(client, args.coordinator_url, args.api_key, job_id)
|
|
if not status:
|
|
return 1
|
|
|
|
state = status.get("state")
|
|
assigned_miner = status.get("assigned_miner_id", "None")
|
|
|
|
print(f" State: {state} | Miner: {assigned_miner}")
|
|
|
|
if state == "COMPLETED":
|
|
break
|
|
if state in {"FAILED", "CANCELED", "EXPIRED"}:
|
|
print(f"❌ Job ended in state: {state}")
|
|
if status.get("error"):
|
|
print(f" Error: {status['error']}")
|
|
return 1
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
if not status or status.get("state") != "COMPLETED":
|
|
print("❌ Job did not complete within timeout")
|
|
return 1
|
|
|
|
# Fetch result and receipt
|
|
print(f"\n📊 Fetching job results...")
|
|
result = fetch_result(client, args.coordinator_url, args.api_key, job_id)
|
|
if result is None:
|
|
return 1
|
|
|
|
receipt = fetch_receipt(client, args.coordinator_url, args.api_key, job_id)
|
|
if receipt is None:
|
|
print("⚠️ No receipt found (payment may not be processed)")
|
|
receipt = {}
|
|
|
|
# Display results
|
|
payload = result.get("result") or {}
|
|
output = payload.get("output", "No output")
|
|
|
|
print(f"\n✅ Job completed successfully!")
|
|
print(f"📝 Output: {output[:200]}{'...' if len(output) > 200 else ''}")
|
|
|
|
if receipt:
|
|
print(f"\n🧾 Receipt Information:")
|
|
print(f" Receipt ID: {receipt.get('receipt_id')}")
|
|
print(f" Provider: {receipt.get('provider')}")
|
|
print(f" Units: {receipt.get('units')} {receipt.get('unit_type', 'seconds')}")
|
|
print(f" Unit Price: {receipt.get('unit_price')} AITBC")
|
|
print(f" Total Price: {receipt.get('price')} AITBC")
|
|
print(f" Status: {receipt.get('status')}")
|
|
|
|
# Check blockchain
|
|
print(f"\n⛓️ Checking blockchain recording...")
|
|
receipt_id = receipt.get('receipt_id')
|
|
|
|
with httpx.Client() as bc_client:
|
|
tx = check_blockchain_transaction(bc_client, args.blockchain_url, receipt_id)
|
|
|
|
if tx:
|
|
print(f"✅ Transaction found on blockchain!")
|
|
print(f" TX Hash: {tx.get('tx_hash')}")
|
|
print(f" Block: {tx.get('block_height')}")
|
|
print(f" From: {tx.get('sender')}")
|
|
print(f" To: {tx.get('recipient')}")
|
|
print(f" Amount: {tx.get('amount')} AITBC")
|
|
|
|
# Show transaction payload
|
|
payload = tx.get('payload', {})
|
|
if 'receipt_id' in payload:
|
|
print(f" Payload Receipt: {payload['receipt_id']}")
|
|
else:
|
|
print(f"⚠️ Transaction not yet found on blockchain")
|
|
print(f" This may take a few moments to be mined...")
|
|
print(f" Receipt ID: {receipt_id}")
|
|
else:
|
|
print(f"\n❌ No receipt generated - payment not processed")
|
|
|
|
print(f"\n🎉 Test completed!")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|