Files
aitbc/tests/verification/test_payment_integration.py
aitbc 0081b9ee4d
Some checks failed
Python Tests / test-python (push) Failing after 10s
P2P Network Verification / p2p-verification (push) Successful in 7s
Multi-Node Blockchain Health Monitoring / health-check (push) Successful in 7s
feat: migrate tests to use centralized aitbc package utilities
- Migrate HTTP client usage from httpx/requests to aitbc.AITBCHTTPClient
- Update test_payment_integration.py to use AITBCHTTPClient and get_logger
- Fix typo in test_cross_node_blockchain.py (NetworkErroration -> NetworkError)
- Add aitbc validators to test_model_validation.py (validate_address, validate_hash)
- conftest.py already uses aitbc path utilities (get_data_path, get_log_path)
- Other test files already migrated (test_tx_import, test_simple_import, test_minimal, test_block_import_complete, verify_transactions_fixed)
2026-04-24 21:50:35 +02:00

316 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Test script for AITBC Payment Integration
Tests job creation with payments, escrow, release, and refund flows
"""
import asyncio
import json
from datetime import datetime
from typing import Dict, Any
from aitbc import get_logger, AITBCHTTPClient
# Configure logging
logger = get_logger(__name__)
# Configuration
COORDINATOR_URL = "https://aitbc.bubuit.net/api"
CLIENT_KEY = "test_client_key_123"
MINER_KEY = "${MINER_API_KEY}"
class PaymentIntegrationTest:
def __init__(self):
self.client = AITBCHTTPClient(timeout=30.0)
self.job_id = None
self.payment_id = None
async def test_complete_payment_flow(self):
"""Test the complete payment flow from job creation to payment release"""
logger.info("=== Starting AITBC Payment Integration Test ===")
# Step 1: Check coordinator health
await self.check_health()
# Step 2: Submit a job with payment
await self.submit_job_with_payment()
# Step 3: Check job status and payment
await self.check_job_and_payment_status()
# Step 4: Simulate job completion by miner
await self.complete_job()
# Step 5: Verify payment was released
await self.verify_payment_release()
# Step 6: Test refund flow with a new job
await self.test_refund_flow()
logger.info("=== Payment Integration Test Complete ===")
async def check_health(self):
"""Check if coordinator API is healthy"""
logger.info("Step 1: Checking coordinator health...")
response = self.client.get(f"{COORDINATOR_URL}/health")
if response.status_code == 200:
logger.info(f"✓ Coordinator healthy: {response.json()}")
else:
raise Exception(f"Coordinator health check failed: {response.status_code}")
async def submit_job_with_payment(self):
"""Submit a job with AITBC token payment"""
logger.info("Step 2: Submitting job with payment...")
job_data = {
"service_type": "llm",
"service_params": {
"model": "llama3.2",
"prompt": "What is AITBC?",
"max_tokens": 100
},
"payment_amount": 1.0,
"payment_currency": "AITBC",
"escrow_timeout_seconds": 3600
}
headers = {"X-Client-Key": CLIENT_KEY}
response = self.client.post(
f"{COORDINATOR_URL}/v1/jobs",
json=job_data,
headers=headers
)
if response.status_code == 201:
job = response.json()
self.job_id = job["job_id"]
logger.info(f"✓ Job created with ID: {self.job_id}")
logger.info(f" Payment status: {job.get('payment_status', 'N/A')}")
else:
raise Exception(f"Failed to create job: {response.status_code} - {response.text}")
async def check_job_and_payment_status(self):
"""Check job status and payment details"""
logger.info("Step 3: Checking job and payment status...")
headers = {"X-Client-Key": CLIENT_KEY}
# Get job status
response = self.client.get(
f"{COORDINATOR_URL}/v1/jobs/{self.job_id}",
headers=headers
)
if response.status_code == 200:
job = response.json()
logger.info(f"✓ Job status: {job['state']}")
logger.info(f" Payment ID: {job.get('payment_id', 'N/A')}")
logger.info(f" Payment status: {job.get('payment_status', 'N/A')}")
self.payment_id = job.get('payment_id')
# Get payment details if payment_id exists
if self.payment_id:
payment_response = self.client.get(
f"{COORDINATOR_URL}/v1/payments/{self.payment_id}",
headers=headers
)
if payment_response.status_code == 200:
payment = payment_response.json()
logger.info(f"✓ Payment details:")
logger.info(f" Amount: {payment['amount']} {payment['currency']}")
logger.info(f" Status: {payment['status']}")
logger.info(f" Method: {payment['payment_method']}")
else:
logger.warning(f"Could not fetch payment details: {payment_response.status_code}")
else:
raise Exception(f"Failed to get job status: {response.status_code}")
async def complete_job(self):
"""Simulate miner completing the job"""
logger.info("Step 4: Simulating job completion...")
# First, poll for the job as miner
headers = {"X-Miner-Key": MINER_KEY}
poll_response = self.client.post(
f"{COORDINATOR_URL}/v1/miners/poll",
json={"capabilities": ["llm"]},
headers=headers
)
if poll_response.status_code == 200:
poll_data = poll_response.json()
if poll_data.get("job_id") == self.job_id:
logger.info(f"✓ Miner received job: {self.job_id}")
# Submit job result
result_data = {
"result": json.dumps({
"text": "AITBC is a decentralized AI computing marketplace that uses blockchain for payments and zero-knowledge proofs for privacy.",
"model": "llama3.2",
"tokens_used": 42
}),
"metrics": {
"duration_ms": 2500,
"tokens_used": 42,
"gpu_seconds": 0.5
}
}
submit_response = self.client.post(
f"{COORDINATOR_URL}/v1/miners/{self.job_id}/result",
json=result_data,
headers=headers
)
if submit_response.status_code == 200:
logger.info("✓ Job result submitted successfully")
logger.info(f" Receipt: {submit_response.json().get('receipt', {}).get('receipt_id', 'N/A')}")
else:
raise Exception(f"Failed to submit result: {submit_response.status_code}")
else:
logger.warning(f"Miner received different job: {poll_data.get('job_id')}")
else:
raise Exception(f"Failed to poll for job: {poll_response.status_code}")
async def verify_payment_release(self):
"""Verify that payment was released after job completion"""
logger.info("Step 5: Verifying payment release...")
# Wait a moment for payment processing
await asyncio.sleep(2)
headers = {"X-Client-Key": CLIENT_KEY}
# Check updated job status
response = self.client.get(
f"{COORDINATOR_URL}/v1/jobs/{self.job_id}",
headers=headers
)
if response.status_code == 200:
job = response.json()
logger.info(f"✓ Final job status: {job['state']}")
logger.info(f" Final payment status: {job.get('payment_status', 'N/A')}")
# Get payment receipt
if self.payment_id:
receipt_response = self.client.get(
f"{COORDINATOR_URL}/v1/payments/{self.payment_id}/receipt",
headers=headers
)
if receipt_response.status_code == 200:
receipt = receipt_response.json()
logger.info(f"✓ Payment receipt:")
logger.info(f" Status: {receipt['status']}")
logger.info(f" Verified at: {receipt.get('verified_at', 'N/A')}")
logger.info(f" Transaction hash: {receipt.get('transaction_hash', 'N/A')}")
else:
logger.warning(f"Could not fetch payment receipt: {receipt_response.status_code}")
else:
raise Exception(f"Failed to verify payment release: {response.status_code}")
async def test_refund_flow(self):
"""Test payment refund for failed jobs"""
logger.info("Step 6: Testing refund flow...")
# Create a new job that will fail
job_data = {
"service_type": "llm",
"service_params": {
"model": "nonexistent_model",
"prompt": "This should fail"
},
"payment_amount": 0.5,
"payment_currency": "AITBC"
}
headers = {"X-Client-Key": CLIENT_KEY}
response = self.client.post(
f"{COORDINATOR_URL}/v1/jobs",
json=job_data,
headers=headers
)
if response.status_code == 201:
fail_job = response.json()
fail_job_id = fail_job["job_id"]
fail_payment_id = fail_job.get("payment_id")
logger.info(f"✓ Created test job for refund: {fail_job_id}")
# Simulate job failure
fail_headers = {"X-Miner-Key": MINER_KEY}
# Poll for the job
poll_response = self.client.post(
f"{COORDINATOR_URL}/v1/miners/poll",
json={"capabilities": ["llm"]},
headers=fail_headers
)
if poll_response.status_code == 200:
poll_data = poll_response.json()
if poll_data.get("job_id") == fail_job_id:
# Submit failure
fail_data = {
"error_code": "MODEL_NOT_FOUND",
"error_message": "The specified model does not exist"
}
fail_response = self.client.post(
f"{COORDINATOR_URL}/v1/miners/{fail_job_id}/fail",
json=fail_data,
headers=fail_headers
)
if fail_response.status_code == 200:
logger.info("✓ Job failure submitted")
# Wait for refund processing
await asyncio.sleep(2)
# Check refund status
if fail_payment_id:
payment_response = self.client.get(
f"{COORDINATOR_URL}/v1/payments/{fail_payment_id}",
headers=headers
)
if payment_response.status_code == 200:
payment = payment_response.json()
logger.info(f"✓ Payment refunded:")
logger.info(f" Status: {payment['status']}")
logger.info(f" Refunded at: {payment.get('refunded_at', 'N/A')}")
else:
logger.warning(f"Could not verify refund: {payment_response.status_code}")
else:
logger.warning(f"Failed to submit job failure: {fail_response.status_code}")
logger.info("\n=== Test Summary ===")
logger.info("✓ Job creation with payment")
logger.info("✓ Payment escrow creation")
logger.info("✓ Job completion and payment release")
logger.info("✓ Job failure and payment refund")
logger.info("\nPayment integration is working correctly!")
async def main():
"""Run the payment integration test"""
test = PaymentIntegrationTest()
try:
await test.test_complete_payment_flow()
except Exception as e:
logger.error(f"Test failed: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())