refactor: merge scripts/test/ into tests/verification/

Move 21 standalone verification scripts from scripts/test/ to
tests/verification/. These are run-manually scripts that hit live
services (not pytest suites). Consolidates all test-related files
under tests/ with clear separation:
- tests/cli/, tests/unit/, tests/integration/, etc. = pytest suites
- tests/verification/ = standalone verification scripts
This commit is contained in:
oib
2026-02-13 23:29:48 +01:00
parent 3b4cc69179
commit d9481f2b92
21 changed files with 0 additions and 0 deletions

View File

@@ -1,33 +0,0 @@
# Testing Scripts
This directory contains various test scripts and utilities for testing the AITBC platform.
## Test Scripts
### Block Import Tests
- **test_block_import.py** - Main block import endpoint test
- **test_block_import_complete.py** - Comprehensive block import test suite
- **test_simple_import.py** - Simple block import test
- **test_tx_import.py** - Transaction import test
- **test_tx_model.py** - Transaction model validation test
- **test_minimal.py** - Minimal test case
- **test_model_validation.py** - Model validation test
### Payment Tests
- **test_payment_integration.py** - Payment integration test suite
- **test_payment_local.py** - Local payment testing
### Test Runners
- **run_test_suite.py** - Main test suite runner
- **run_tests.py** - Simple test runner
- **verify_windsurf_tests.py** - Verify Windsurf test configuration
- **register_test_clients.py** - Register test clients for testing
## Usage
Most test scripts can be run directly with Python:
```bash
python3 test_block_import.py
```
Some scripts may require specific environment setup or configuration.

View File

@@ -1,56 +0,0 @@
#!/usr/bin/env python3
"""Register test clients for payment integration testing"""
import asyncio
import httpx
import json
# Configuration
COORDINATOR_URL = "http://127.0.0.1:8000/v1"
CLIENT_KEY = "test_client_key_123"
MINER_KEY = "${MINER_API_KEY}"
async def register_client():
"""Register a test client"""
async with httpx.AsyncClient() as client:
# Register client
response = await client.post(
f"{COORDINATOR_URL}/clients/register",
headers={"X-API-Key": CLIENT_KEY},
json={"name": "Test Client", "description": "Client for payment testing"}
)
print(f"Client registration: {response.status_code}")
if response.status_code not in [200, 201]:
print(f"Response: {response.text}")
else:
print("✓ Test client registered successfully")
async def register_miner():
"""Register a test miner"""
async with httpx.AsyncClient() as client:
# Register miner
response = await client.post(
f"{COORDINATOR_URL}/miners/register",
headers={"X-API-Key": MINER_KEY},
json={
"name": "Test Miner",
"description": "Miner for payment testing",
"capacity": 100,
"price_per_hour": 0.1,
"hardware": {"gpu": "RTX 4090", "memory": "24GB"}
}
)
print(f"Miner registration: {response.status_code}")
if response.status_code not in [200, 201]:
print(f"Response: {response.text}")
else:
print("✓ Test miner registered successfully")
async def main():
print("=== Registering Test Clients ===")
await register_client()
await register_miner()
print("\n✅ Test clients registered successfully!")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,146 +0,0 @@
#!/usr/bin/env python3
"""
Test suite runner for AITBC
"""
import sys
import argparse
import subprocess
from pathlib import Path
def run_command(cmd, description):
"""Run a command and handle errors"""
print(f"\n{'='*60}")
print(f"Running: {description}")
print(f"Command: {' '.join(cmd)}")
print('='*60)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout:
print(result.stdout)
if result.stderr:
print("STDERR:", result.stderr)
return result.returncode == 0
def main():
parser = argparse.ArgumentParser(description="AITBC Test Suite Runner")
parser.add_argument(
"--suite",
choices=["unit", "integration", "e2e", "security", "all"],
default="all",
help="Test suite to run"
)
parser.add_argument(
"--coverage",
action="store_true",
help="Generate coverage report"
)
parser.add_argument(
"--parallel",
action="store_true",
help="Run tests in parallel"
)
parser.add_argument(
"--verbose",
action="store_true",
help="Verbose output"
)
parser.add_argument(
"--marker",
help="Run tests with specific marker (e.g., unit, integration)"
)
parser.add_argument(
"--file",
help="Run specific test file"
)
args = parser.parse_args()
# Base pytest command
pytest_cmd = ["python", "-m", "pytest"]
# Add verbosity
if args.verbose:
pytest_cmd.append("-v")
# Add coverage if requested
if args.coverage:
pytest_cmd.extend([
"--cov=apps",
"--cov-report=html:htmlcov",
"--cov-report=term-missing"
])
# Add parallel execution if requested
if args.parallel:
pytest_cmd.extend(["-n", "auto"])
# Determine which tests to run
test_paths = []
if args.file:
test_paths.append(args.file)
elif args.marker:
pytest_cmd.extend(["-m", args.marker])
elif args.suite == "unit":
test_paths.append("tests/unit/")
elif args.suite == "integration":
test_paths.append("tests/integration/")
elif args.suite == "e2e":
test_paths.append("tests/e2e/")
# E2E tests might need additional setup
pytest_cmd.extend(["--driver=Chrome"])
elif args.suite == "security":
pytest_cmd.extend(["-m", "security"])
else: # all
test_paths.append("tests/")
# Add test paths to command
pytest_cmd.extend(test_paths)
# Add pytest configuration
pytest_cmd.extend([
"--tb=short",
"--strict-markers",
"--disable-warnings"
])
# Run the tests
success = run_command(pytest_cmd, f"{args.suite.title()} Test Suite")
if success:
print(f"\n{args.suite.title()} tests passed!")
if args.coverage:
print("\n📊 Coverage report generated in htmlcov/index.html")
else:
print(f"\n{args.suite.title()} tests failed!")
sys.exit(1)
# Additional checks
if args.suite in ["all", "integration"]:
print("\n🔍 Running integration test checks...")
# Add any integration-specific checks here
if args.suite in ["all", "e2e"]:
print("\n🌐 Running E2E test checks...")
# Add any E2E-specific checks here
if args.suite in ["all", "security"]:
print("\n🔒 Running security scan...")
# Run security scan
security_cmd = ["bandit", "-r", "apps/"]
run_command(security_cmd, "Security Scan")
# Run dependency check
deps_cmd = ["safety", "check"]
run_command(deps_cmd, "Dependency Security Check")
if __name__ == "__main__":
main()

View File

@@ -1,26 +0,0 @@
#!/usr/bin/env python3
"""
Wrapper script to run pytest with proper Python path configuration
"""
import sys
from pathlib import Path
# Add project root to sys.path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
# Add package source directories
sys.path.insert(0, str(project_root / "packages" / "py" / "aitbc-core" / "src"))
sys.path.insert(0, str(project_root / "packages" / "py" / "aitbc-crypto" / "src"))
sys.path.insert(0, str(project_root / "packages" / "py" / "aitbc-p2p" / "src"))
sys.path.insert(0, str(project_root / "packages" / "py" / "aitbc-sdk" / "src"))
# Add app source directories
sys.path.insert(0, str(project_root / "apps" / "coordinator-api" / "src"))
sys.path.insert(0, str(project_root / "apps" / "wallet-daemon" / "src"))
sys.path.insert(0, str(project_root / "apps" / "blockchain-node" / "src"))
# Run pytest with the original arguments
import pytest
sys.exit(pytest.main())

View File

@@ -1,203 +0,0 @@
#!/usr/bin/env python3
"""
Test script for block import endpoint
Tests the /rpc/blocks/import POST endpoint functionality
"""
import json
import hashlib
from datetime import datetime
# Test configuration
BASE_URL = "https://aitbc.bubuit.net/rpc"
CHAIN_ID = "ait-devnet"
def compute_block_hash(height, parent_hash, timestamp):
"""Compute block hash using the same algorithm as PoA proposer"""
payload = f"{CHAIN_ID}|{height}|{parent_hash}|{timestamp}".encode()
return "0x" + hashlib.sha256(payload).hexdigest()
def test_block_import():
"""Test the block import endpoint with various scenarios"""
import requests
print("Testing Block Import Endpoint")
print("=" * 50)
# Test 1: Invalid height (0)
print("\n1. Testing invalid height (0)...")
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": 0,
"hash": "0x123",
"parent_hash": "0x00",
"proposer": "test",
"timestamp": "2026-01-29T10:20:00",
"tx_count": 0
}
)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
assert response.status_code == 422, "Should return validation error for height 0"
print("✓ Correctly rejected height 0")
# Test 2: Block already exists with different hash
print("\n2. Testing block conflict...")
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": 1,
"hash": "0xinvalidhash",
"parent_hash": "0x00",
"proposer": "test",
"timestamp": "2026-01-29T10:20:00",
"tx_count": 0
}
)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
assert response.status_code == 409, "Should return conflict for existing height with different hash"
print("✓ Correctly detected block conflict")
# Test 3: Import existing block with correct hash
print("\n3. Testing import of existing block with correct hash...")
# Get actual block data
response = requests.get(f"{BASE_URL}/blocks/1")
block_data = response.json()
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": block_data["height"],
"hash": block_data["hash"],
"parent_hash": block_data["parent_hash"],
"proposer": block_data["proposer"],
"timestamp": block_data["timestamp"],
"tx_count": block_data["tx_count"]
}
)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
assert response.status_code == 200, "Should accept existing block with correct hash"
assert response.json()["status"] == "exists", "Should return 'exists' status"
print("✓ Correctly handled existing block")
# Test 4: Invalid block hash (with valid parent)
print("\n4. Testing invalid block hash...")
# Get current head to use as parent
response = requests.get(f"{BASE_URL}/head")
head = response.json()
timestamp = "2026-01-29T10:20:00"
parent_hash = head["hash"] # Use actual parent hash
height = head["height"] + 1000 # Use high height to avoid conflicts
invalid_hash = "0xinvalid"
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": height,
"hash": invalid_hash,
"parent_hash": parent_hash,
"proposer": "test",
"timestamp": timestamp,
"tx_count": 0
}
)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
assert response.status_code == 400, "Should reject invalid hash"
assert "Invalid block hash" in response.json()["detail"], "Should mention invalid hash"
print("✓ Correctly rejected invalid hash")
# Test 5: Valid hash but parent not found
print("\n5. Testing valid hash but parent not found...")
height = head["height"] + 2000 # Use different height
parent_hash = "0xnonexistentparent"
timestamp = "2026-01-29T10:20:00"
valid_hash = compute_block_hash(height, parent_hash, timestamp)
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": height,
"hash": valid_hash,
"parent_hash": parent_hash,
"proposer": "test",
"timestamp": timestamp,
"tx_count": 0
}
)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
assert response.status_code == 400, "Should reject when parent not found"
assert "Parent block not found" in response.json()["detail"], "Should mention parent not found"
print("✓ Correctly rejected missing parent")
# Test 6: Valid block with transactions and receipts
print("\n6. Testing valid block with transactions...")
# Get current head to use as parent
response = requests.get(f"{BASE_URL}/head")
head = response.json()
height = head["height"] + 1
parent_hash = head["hash"]
timestamp = datetime.utcnow().isoformat() + "Z"
valid_hash = compute_block_hash(height, parent_hash, timestamp)
test_block = {
"height": height,
"hash": valid_hash,
"parent_hash": parent_hash,
"proposer": "test-proposer",
"timestamp": timestamp,
"tx_count": 1,
"transactions": [{
"tx_hash": f"0xtx{height}",
"sender": "0xsender",
"recipient": "0xreceiver",
"payload": {"to": "0xreceiver", "amount": 1000000}
}],
"receipts": [{
"receipt_id": f"rx{height}",
"job_id": f"job{height}",
"payload": {"result": "success"},
"miner_signature": "0xminer",
"coordinator_attestations": ["0xatt1"],
"minted_amount": 100,
"recorded_at": timestamp
}]
}
response = requests.post(
f"{BASE_URL}/blocks/import",
json=test_block
)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
assert response.status_code == 200, "Should accept valid block with transactions"
assert response.json()["status"] == "imported", "Should return 'imported' status"
print("✓ Successfully imported block with transactions")
# Verify the block was imported
print("\n7. Verifying imported block...")
response = requests.get(f"{BASE_URL}/blocks/{height}")
assert response.status_code == 200, "Should be able to retrieve imported block"
imported_block = response.json()
assert imported_block["hash"] == valid_hash, "Hash should match"
assert imported_block["tx_count"] == 1, "Should have 1 transaction"
print("✓ Block successfully imported and retrievable")
print("\n" + "=" * 50)
print("All tests passed! ✅")
print("\nBlock import endpoint is fully functional with:")
print("- ✓ Input validation")
print("- ✓ Hash validation")
print("- ✓ Parent block verification")
print("- ✓ Conflict detection")
print("- ✓ Transaction and receipt import")
print("- ✓ Proper error handling")
if __name__ == "__main__":
test_block_import()

View File

@@ -1,224 +0,0 @@
#!/usr/bin/env python3
"""
Comprehensive test for block import endpoint
Tests all functionality including validation, conflicts, and transaction import
"""
import json
import hashlib
import requests
from datetime import datetime
BASE_URL = "https://aitbc.bubuit.net/rpc"
CHAIN_ID = "ait-devnet"
def compute_block_hash(height, parent_hash, timestamp):
"""Compute block hash using the same algorithm as PoA proposer"""
payload = f"{CHAIN_ID}|{height}|{parent_hash}|{timestamp}".encode()
return "0x" + hashlib.sha256(payload).hexdigest()
def test_block_import_complete():
"""Complete test suite for block import endpoint"""
print("=" * 60)
print("BLOCK IMPORT ENDPOINT TEST SUITE")
print("=" * 60)
results = []
# Test 1: Invalid height (0)
print("\n[TEST 1] Invalid height (0)...")
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": 0,
"hash": "0x123",
"parent_hash": "0x00",
"proposer": "test",
"timestamp": "2026-01-29T10:20:00",
"tx_count": 0
}
)
if response.status_code == 422 and "greater_than" in response.json()["detail"][0]["msg"]:
print("✅ PASS: Correctly rejected height 0")
results.append(True)
else:
print(f"❌ FAIL: Expected 422, got {response.status_code}")
results.append(False)
# Test 2: Block conflict
print("\n[TEST 2] Block conflict...")
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": 1,
"hash": "0xinvalidhash",
"parent_hash": "0x00",
"proposer": "test",
"timestamp": "2026-01-29T10:20:00",
"tx_count": 0
}
)
if response.status_code == 409 and "already exists with different hash" in response.json()["detail"]:
print("✅ PASS: Correctly detected block conflict")
results.append(True)
else:
print(f"❌ FAIL: Expected 409, got {response.status_code}")
results.append(False)
# Test 3: Import existing block with correct hash
print("\n[TEST 3] Import existing block with correct hash...")
response = requests.get(f"{BASE_URL}/blocks/1")
block_data = response.json()
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": block_data["height"],
"hash": block_data["hash"],
"parent_hash": block_data["parent_hash"],
"proposer": block_data["proposer"],
"timestamp": block_data["timestamp"],
"tx_count": block_data["tx_count"]
}
)
if response.status_code == 200 and response.json()["status"] == "exists":
print("✅ PASS: Correctly handled existing block")
results.append(True)
else:
print(f"❌ FAIL: Expected 200 with 'exists' status, got {response.status_code}")
results.append(False)
# Test 4: Invalid block hash
print("\n[TEST 4] Invalid block hash...")
response = requests.get(f"{BASE_URL}/head")
head = response.json()
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": 999999,
"hash": "0xinvalid",
"parent_hash": head["hash"],
"proposer": "test",
"timestamp": "2026-01-29T10:20:00",
"tx_count": 0
}
)
if response.status_code == 400 and "Invalid block hash" in response.json()["detail"]:
print("✅ PASS: Correctly rejected invalid hash")
results.append(True)
else:
print(f"❌ FAIL: Expected 400, got {response.status_code}")
results.append(False)
# Test 5: Parent not found
print("\n[TEST 5] Parent block not found...")
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": 999998,
"hash": compute_block_hash(999998, "0xnonexistent", "2026-01-29T10:20:00"),
"parent_hash": "0xnonexistent",
"proposer": "test",
"timestamp": "2026-01-29T10:20:00",
"tx_count": 0
}
)
if response.status_code == 400 and "Parent block not found" in response.json()["detail"]:
print("✅ PASS: Correctly rejected missing parent")
results.append(True)
else:
print(f"❌ FAIL: Expected 400, got {response.status_code}")
results.append(False)
# Test 6: Import block without transactions
print("\n[TEST 6] Import block without transactions...")
response = requests.get(f"{BASE_URL}/head")
head = response.json()
height = head["height"] + 1
block_hash = compute_block_hash(height, head["hash"], "2026-01-29T10:20:00")
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": height,
"hash": block_hash,
"parent_hash": head["hash"],
"proposer": "test-proposer",
"timestamp": "2026-01-29T10:20:00",
"tx_count": 0,
"transactions": []
}
)
if response.status_code == 200 and response.json()["status"] == "imported":
print("✅ PASS: Successfully imported block without transactions")
results.append(True)
else:
print(f"❌ FAIL: Expected 200, got {response.status_code}")
results.append(False)
# Test 7: Import block with transactions (KNOWN ISSUE)
print("\n[TEST 7] Import block with transactions...")
print("⚠️ KNOWN ISSUE: Transaction import currently fails with database constraint error")
print(" This appears to be a bug in the transaction field mapping")
height = height + 1
block_hash = compute_block_hash(height, head["hash"], "2026-01-29T10:20:00")
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": height,
"hash": block_hash,
"parent_hash": head["hash"],
"proposer": "test-proposer",
"timestamp": "2026-01-29T10:20:00",
"tx_count": 1,
"transactions": [{
"tx_hash": "0xtx123",
"sender": "0xsender",
"recipient": "0xrecipient",
"payload": {"test": "data"}
}]
}
)
if response.status_code == 500:
print("⚠️ EXPECTED FAILURE: Transaction import fails with 500 error")
print(" Error: NOT NULL constraint failed on transaction fields")
results.append(None) # Known issue, not counting as fail
else:
print(f"❓ UNEXPECTED: Got {response.status_code} instead of expected 500")
results.append(None)
# Summary
print("\n" + "=" * 60)
print("TEST SUMMARY")
print("=" * 60)
passed = sum(1 for r in results if r is True)
failed = sum(1 for r in results if r is False)
known_issues = sum(1 for r in results if r is None)
print(f"✅ Passed: {passed}")
print(f"❌ Failed: {failed}")
if known_issues > 0:
print(f"⚠️ Known Issues: {known_issues}")
print("\nFUNCTIONALITY STATUS:")
print("- ✅ Input validation (height, hash, parent)")
print("- ✅ Conflict detection")
print("- ✅ Block import without transactions")
print("- ❌ Block import with transactions (database constraint issue)")
if failed == 0:
print("\n🎉 All core functionality is working!")
print(" The block import endpoint is functional for basic use.")
else:
print(f"\n⚠️ {failed} test(s) failed - review required")
return passed, failed, known_issues
if __name__ == "__main__":
test_block_import_complete()

View File

@@ -1,45 +0,0 @@
#!/usr/bin/env python3
"""
Test GPU registration with mock coordinator
"""
import httpx
import json
COORDINATOR_URL = "http://localhost:8090"
# Test available endpoints
print("=== Testing Mock Coordinator Endpoints ===")
endpoints = [
"/",
"/health",
"/metrics",
"/miners/register",
"/miners/list",
"/marketplace/offers"
]
for endpoint in endpoints:
try:
response = httpx.get(f"{COORDINATOR_URL}{endpoint}", timeout=5)
print(f"{endpoint}: {response.status_code}")
if response.status_code == 200 and response.text:
try:
data = response.json()
print(f" Response: {json.dumps(data, indent=2)[:200]}...")
except:
print(f" Response: {response.text[:100]}...")
except Exception as e:
print(f"{endpoint}: Error - {e}")
print("\n=== Checking OpenAPI Spec ===")
try:
response = httpx.get(f"{COORDINATOR_URL}/openapi.json", timeout=5)
if response.status_code == 200:
openapi = response.json()
paths = list(openapi.get("paths", {}).keys())
print(f"Available endpoints: {paths}")
else:
print(f"OpenAPI not available: {response.status_code}")
except Exception as e:
print(f"Error getting OpenAPI: {e}")

View File

@@ -1,63 +0,0 @@
#!/usr/bin/env python3
"""
Test script for host GPU miner
"""
import subprocess
import httpx
# Test GPU
print("Testing GPU access...")
result = subprocess.run(['nvidia-smi', '--query-gpu=name', '--format=csv,noheader,nounits'],
capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ GPU detected: {result.stdout.strip()}")
else:
print("❌ GPU not accessible")
# Test Ollama
print("\nTesting Ollama...")
try:
response = httpx.get("http://localhost:11434/api/tags", timeout=5)
if response.status_code == 200:
models = response.json().get('models', [])
print(f"✅ Ollama running with {len(models)} models")
for m in models[:3]: # Show first 3 models
print(f" - {m['name']}")
else:
print("❌ Ollama not responding")
except Exception as e:
print(f"❌ Ollama error: {e}")
# Test Coordinator
print("\nTesting Coordinator...")
try:
response = httpx.get("http://127.0.0.1:8000/v1/health", timeout=5)
if response.status_code == 200:
print("✅ Coordinator is accessible")
else:
print("❌ Coordinator not responding")
except Exception as e:
print(f"❌ Coordinator error: {e}")
# Test Ollama inference
print("\nTesting Ollama inference...")
try:
response = httpx.post(
"http://localhost:11434/api/generate",
json={
"model": "llama3.2:latest",
"prompt": "Say hello",
"stream": False
},
timeout=10
)
if response.status_code == 200:
result = response.json()
print(f"✅ Inference successful: {result.get('response', '')[:50]}...")
else:
print("❌ Inference failed")
except Exception as e:
print(f"❌ Inference error: {e}")
print("\n✅ All tests completed!")

View File

@@ -1,65 +0,0 @@
#!/usr/bin/env python3
"""
Minimal test to debug transaction import
"""
import json
import hashlib
import requests
BASE_URL = "https://aitbc.bubuit.net/rpc"
CHAIN_ID = "ait-devnet"
def compute_block_hash(height, parent_hash, timestamp):
"""Compute block hash using the same algorithm as PoA proposer"""
payload = f"{CHAIN_ID}|{height}|{parent_hash}|{timestamp}".encode()
return "0x" + hashlib.sha256(payload).hexdigest()
def test_minimal():
"""Test with minimal data"""
# Get current head
response = requests.get(f"{BASE_URL}/head")
head = response.json()
# Create a new block
height = head["height"] + 1
parent_hash = head["hash"]
timestamp = "2026-01-29T10:20:00"
block_hash = compute_block_hash(height, parent_hash, timestamp)
# Test with empty transactions list first
test_block = {
"height": height,
"hash": block_hash,
"parent_hash": parent_hash,
"proposer": "test-proposer",
"timestamp": timestamp,
"tx_count": 0,
"transactions": []
}
print("Testing with empty transactions list...")
response = requests.post(f"{BASE_URL}/blocks/import", json=test_block)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
if response.status_code == 200:
print("\n✅ Empty transactions work!")
# Now test with one transaction
height = height + 1
block_hash = compute_block_hash(height, parent_hash, timestamp)
test_block["height"] = height
test_block["hash"] = block_hash
test_block["tx_count"] = 1
test_block["transactions"] = [{"tx_hash": "0xtest", "sender": "0xtest", "recipient": "0xtest", "payload": {}}]
print("\nTesting with one transaction...")
response = requests.post(f"{BASE_URL}/blocks/import", json=test_block)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
if __name__ == "__main__":
test_minimal()

View File

@@ -1,57 +0,0 @@
#!/usr/bin/env python3
"""
Test the BlockImportRequest model
"""
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Optional
class TransactionData(BaseModel):
tx_hash: str
sender: str
recipient: str
payload: Dict[str, Any] = Field(default_factory=dict)
class BlockImportRequest(BaseModel):
height: int = Field(gt=0)
hash: str
parent_hash: str
proposer: str
timestamp: str
tx_count: int = Field(ge=0)
state_root: Optional[str] = None
transactions: List[TransactionData] = Field(default_factory=list)
# Test creating the request
test_data = {
"height": 1,
"hash": "0xtest",
"parent_hash": "0x00",
"proposer": "test",
"timestamp": "2026-01-29T10:20:00",
"tx_count": 1,
"transactions": [{
"tx_hash": "0xtx123",
"sender": "0xsender",
"recipient": "0xrecipient",
"payload": {"test": "data"}
}]
}
print("Test data:")
print(test_data)
try:
request = BlockImportRequest(**test_data)
print("\n✅ Request validated successfully!")
print(f"Transactions count: {len(request.transactions)}")
if request.transactions:
tx = request.transactions[0]
print(f"First transaction:")
print(f" tx_hash: {tx.tx_hash}")
print(f" sender: {tx.sender}")
print(f" recipient: {tx.recipient}")
except Exception as e:
print(f"\n❌ Validation failed: {e}")
import traceback
traceback.print_exc()

View File

@@ -1,317 +0,0 @@
#!/usr/bin/env python3
"""
Test script for AITBC Payment Integration
Tests job creation with payments, escrow, release, and refund flows
"""
import asyncio
import httpx
import json
import logging
from datetime import datetime
from typing import Dict, Any
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
COORDINATOR_URL = "https://aitbc.bubuit.net/api"
CLIENT_KEY = "test_client_key_123"
MINER_KEY = "${MINER_API_KEY}"
class PaymentIntegrationTest:
def __init__(self):
self.client = httpx.Client(timeout=30.0)
self.job_id = None
self.payment_id = None
async def test_complete_payment_flow(self):
"""Test the complete payment flow from job creation to payment release"""
logger.info("=== Starting AITBC Payment Integration Test ===")
# Step 1: Check coordinator health
await self.check_health()
# Step 2: Submit a job with payment
await self.submit_job_with_payment()
# Step 3: Check job status and payment
await self.check_job_and_payment_status()
# Step 4: Simulate job completion by miner
await self.complete_job()
# Step 5: Verify payment was released
await self.verify_payment_release()
# Step 6: Test refund flow with a new job
await self.test_refund_flow()
logger.info("=== Payment Integration Test Complete ===")
async def check_health(self):
"""Check if coordinator API is healthy"""
logger.info("Step 1: Checking coordinator health...")
response = self.client.get(f"{COORDINATOR_URL}/health")
if response.status_code == 200:
logger.info(f"✓ Coordinator healthy: {response.json()}")
else:
raise Exception(f"Coordinator health check failed: {response.status_code}")
async def submit_job_with_payment(self):
"""Submit a job with AITBC token payment"""
logger.info("Step 2: Submitting job with payment...")
job_data = {
"service_type": "llm",
"service_params": {
"model": "llama3.2",
"prompt": "What is AITBC?",
"max_tokens": 100
},
"payment_amount": 1.0,
"payment_currency": "AITBC",
"escrow_timeout_seconds": 3600
}
headers = {"X-Client-Key": CLIENT_KEY}
response = self.client.post(
f"{COORDINATOR_URL}/v1/jobs",
json=job_data,
headers=headers
)
if response.status_code == 201:
job = response.json()
self.job_id = job["job_id"]
logger.info(f"✓ Job created with ID: {self.job_id}")
logger.info(f" Payment status: {job.get('payment_status', 'N/A')}")
else:
raise Exception(f"Failed to create job: {response.status_code} - {response.text}")
async def check_job_and_payment_status(self):
"""Check job status and payment details"""
logger.info("Step 3: Checking job and payment status...")
headers = {"X-Client-Key": CLIENT_KEY}
# Get job status
response = self.client.get(
f"{COORDINATOR_URL}/v1/jobs/{self.job_id}",
headers=headers
)
if response.status_code == 200:
job = response.json()
logger.info(f"✓ Job status: {job['state']}")
logger.info(f" Payment ID: {job.get('payment_id', 'N/A')}")
logger.info(f" Payment status: {job.get('payment_status', 'N/A')}")
self.payment_id = job.get('payment_id')
# Get payment details if payment_id exists
if self.payment_id:
payment_response = self.client.get(
f"{COORDINATOR_URL}/v1/payments/{self.payment_id}",
headers=headers
)
if payment_response.status_code == 200:
payment = payment_response.json()
logger.info(f"✓ Payment details:")
logger.info(f" Amount: {payment['amount']} {payment['currency']}")
logger.info(f" Status: {payment['status']}")
logger.info(f" Method: {payment['payment_method']}")
else:
logger.warning(f"Could not fetch payment details: {payment_response.status_code}")
else:
raise Exception(f"Failed to get job status: {response.status_code}")
async def complete_job(self):
"""Simulate miner completing the job"""
logger.info("Step 4: Simulating job completion...")
# First, poll for the job as miner
headers = {"X-Miner-Key": MINER_KEY}
poll_response = self.client.post(
f"{COORDINATOR_URL}/v1/miners/poll",
json={"capabilities": ["llm"]},
headers=headers
)
if poll_response.status_code == 200:
poll_data = poll_response.json()
if poll_data.get("job_id") == self.job_id:
logger.info(f"✓ Miner received job: {self.job_id}")
# Submit job result
result_data = {
"result": json.dumps({
"text": "AITBC is a decentralized AI computing marketplace that uses blockchain for payments and zero-knowledge proofs for privacy.",
"model": "llama3.2",
"tokens_used": 42
}),
"metrics": {
"duration_ms": 2500,
"tokens_used": 42,
"gpu_seconds": 0.5
}
}
submit_response = self.client.post(
f"{COORDINATOR_URL}/v1/miners/{self.job_id}/result",
json=result_data,
headers=headers
)
if submit_response.status_code == 200:
logger.info("✓ Job result submitted successfully")
logger.info(f" Receipt: {submit_response.json().get('receipt', {}).get('receipt_id', 'N/A')}")
else:
raise Exception(f"Failed to submit result: {submit_response.status_code}")
else:
logger.warning(f"Miner received different job: {poll_data.get('job_id')}")
else:
raise Exception(f"Failed to poll for job: {poll_response.status_code}")
async def verify_payment_release(self):
"""Verify that payment was released after job completion"""
logger.info("Step 5: Verifying payment release...")
# Wait a moment for payment processing
await asyncio.sleep(2)
headers = {"X-Client-Key": CLIENT_KEY}
# Check updated job status
response = self.client.get(
f"{COORDINATOR_URL}/v1/jobs/{self.job_id}",
headers=headers
)
if response.status_code == 200:
job = response.json()
logger.info(f"✓ Final job status: {job['state']}")
logger.info(f" Final payment status: {job.get('payment_status', 'N/A')}")
# Get payment receipt
if self.payment_id:
receipt_response = self.client.get(
f"{COORDINATOR_URL}/v1/payments/{self.payment_id}/receipt",
headers=headers
)
if receipt_response.status_code == 200:
receipt = receipt_response.json()
logger.info(f"✓ Payment receipt:")
logger.info(f" Status: {receipt['status']}")
logger.info(f" Verified at: {receipt.get('verified_at', 'N/A')}")
logger.info(f" Transaction hash: {receipt.get('transaction_hash', 'N/A')}")
else:
logger.warning(f"Could not fetch payment receipt: {receipt_response.status_code}")
else:
raise Exception(f"Failed to verify payment release: {response.status_code}")
async def test_refund_flow(self):
"""Test payment refund for failed jobs"""
logger.info("Step 6: Testing refund flow...")
# Create a new job that will fail
job_data = {
"service_type": "llm",
"service_params": {
"model": "nonexistent_model",
"prompt": "This should fail"
},
"payment_amount": 0.5,
"payment_currency": "AITBC"
}
headers = {"X-Client-Key": CLIENT_KEY}
response = self.client.post(
f"{COORDINATOR_URL}/v1/jobs",
json=job_data,
headers=headers
)
if response.status_code == 201:
fail_job = response.json()
fail_job_id = fail_job["job_id"]
fail_payment_id = fail_job.get("payment_id")
logger.info(f"✓ Created test job for refund: {fail_job_id}")
# Simulate job failure
fail_headers = {"X-Miner-Key": MINER_KEY}
# Poll for the job
poll_response = self.client.post(
f"{COORDINATOR_URL}/v1/miners/poll",
json={"capabilities": ["llm"]},
headers=fail_headers
)
if poll_response.status_code == 200:
poll_data = poll_response.json()
if poll_data.get("job_id") == fail_job_id:
# Submit failure
fail_data = {
"error_code": "MODEL_NOT_FOUND",
"error_message": "The specified model does not exist"
}
fail_response = self.client.post(
f"{COORDINATOR_URL}/v1/miners/{fail_job_id}/fail",
json=fail_data,
headers=fail_headers
)
if fail_response.status_code == 200:
logger.info("✓ Job failure submitted")
# Wait for refund processing
await asyncio.sleep(2)
# Check refund status
if fail_payment_id:
payment_response = self.client.get(
f"{COORDINATOR_URL}/v1/payments/{fail_payment_id}",
headers=headers
)
if payment_response.status_code == 200:
payment = payment_response.json()
logger.info(f"✓ Payment refunded:")
logger.info(f" Status: {payment['status']}")
logger.info(f" Refunded at: {payment.get('refunded_at', 'N/A')}")
else:
logger.warning(f"Could not verify refund: {payment_response.status_code}")
else:
logger.warning(f"Failed to submit job failure: {fail_response.status_code}")
logger.info("\n=== Test Summary ===")
logger.info("✓ Job creation with payment")
logger.info("✓ Payment escrow creation")
logger.info("✓ Job completion and payment release")
logger.info("✓ Job failure and payment refund")
logger.info("\nPayment integration is working correctly!")
async def main():
"""Run the payment integration test"""
test = PaymentIntegrationTest()
try:
await test.test_complete_payment_flow()
except Exception as e:
logger.error(f"Test failed: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,329 +0,0 @@
#!/usr/bin/env python3
"""
Test script for AITBC Payment Integration (Localhost)
Tests job creation with payments, escrow, release, and refund flows
"""
import asyncio
import httpx
import json
import logging
from datetime import datetime
from typing import Dict, Any
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration - Using localhost as we're testing from the server
COORDINATOR_URL = "http://127.0.0.1:8000/v1"
CLIENT_KEY = "${CLIENT_API_KEY}"
MINER_KEY = "${MINER_API_KEY}"
class PaymentIntegrationTest:
def __init__(self):
self.client = httpx.Client(timeout=30.0)
self.job_id = None
self.payment_id = None
async def test_complete_payment_flow(self):
"""Test the complete payment flow from job creation to payment release"""
logger.info("=== Starting AITBC Payment Integration Test (Localhost) ===")
# Step 1: Check coordinator health
await self.check_health()
# Step 2: Submit a job with payment
await self.submit_job_with_payment()
# Step 3: Check job status and payment
await self.check_job_and_payment_status()
# Step 4: Simulate job completion by miner
await self.complete_job()
# Step 5: Verify payment was released
await self.verify_payment_release()
# Step 6: Test refund flow with a new job
await self.test_refund_flow()
logger.info("=== Payment Integration Test Complete ===")
async def check_health(self):
"""Check if coordinator API is healthy"""
logger.info("Step 1: Checking coordinator health...")
response = self.client.get(f"{COORDINATOR_URL}/health")
if response.status_code == 200:
logger.info(f"✓ Coordinator healthy: {response.json()}")
else:
raise Exception(f"Coordinator health check failed: {response.status_code}")
async def submit_job_with_payment(self):
"""Submit a job with AITBC token payment"""
logger.info("Step 2: Submitting job with payment...")
job_data = {
"payload": {
"service_type": "llm",
"model": "llama3.2",
"prompt": "What is AITBC?",
"max_tokens": 100
},
"constraints": {},
"payment_amount": 1.0,
"payment_currency": "AITBC",
"escrow_timeout_seconds": 3600
}
headers = {"X-Api-Key": CLIENT_KEY}
response = self.client.post(
f"{COORDINATOR_URL}/jobs",
json=job_data,
headers=headers
)
if response.status_code == 201:
job = response.json()
self.job_id = job["job_id"]
logger.info(f"✓ Job created with ID: {self.job_id}")
logger.info(f" Payment status: {job.get('payment_status', 'N/A')}")
else:
logger.error(f"Failed to create job: {response.status_code}")
logger.error(f"Response: {response.text}")
raise Exception(f"Failed to create job: {response.status_code}")
async def check_job_and_payment_status(self):
"""Check job status and payment details"""
logger.info("Step 3: Checking job and payment status...")
headers = {"X-Api-Key": CLIENT_KEY}
# Get job status
response = self.client.get(
f"{COORDINATOR_URL}/jobs/{self.job_id}",
headers=headers
)
if response.status_code == 200:
job = response.json()
logger.info(f"✓ Job status: {job['state']}")
logger.info(f" Payment ID: {job.get('payment_id', 'N/A')}")
logger.info(f" Payment status: {job.get('payment_status', 'N/A')}")
self.payment_id = job.get('payment_id')
# Get payment details if payment_id exists
if self.payment_id:
payment_response = self.client.get(
f"{COORDINATOR_URL}/payments/{self.payment_id}",
headers=headers
)
if payment_response.status_code == 200:
payment = payment_response.json()
logger.info(f"✓ Payment details:")
logger.info(f" Amount: {payment['amount']} {payment['currency']}")
logger.info(f" Status: {payment['status']}")
logger.info(f" Method: {payment['payment_method']}")
else:
logger.warning(f"Could not fetch payment details: {payment_response.status_code}")
else:
raise Exception(f"Failed to get job status: {response.status_code}")
async def complete_job(self):
"""Simulate miner completing the job"""
logger.info("Step 4: Simulating job completion...")
# First, poll for the job as miner (with retry for 204)
headers = {"X-Api-Key": MINER_KEY}
poll_data = None
for attempt in range(5):
poll_response = self.client.post(
f"{COORDINATOR_URL}/miners/poll",
json={"capabilities": {"llm": True}},
headers=headers
)
if poll_response.status_code == 200:
poll_data = poll_response.json()
break
elif poll_response.status_code == 204:
logger.info(f" No job available yet, retrying... ({attempt + 1}/5)")
await asyncio.sleep(1)
else:
raise Exception(f"Failed to poll for job: {poll_response.status_code}")
if poll_data and poll_data.get("job_id") == self.job_id:
logger.info(f"✓ Miner received job: {self.job_id}")
# Submit job result
result_data = {
"result": {
"text": "AITBC is a decentralized AI computing marketplace that uses blockchain for payments and zero-knowledge proofs for privacy.",
"model": "llama3.2",
"tokens_used": 42
},
"metrics": {
"duration_ms": 2500,
"tokens_used": 42,
"gpu_seconds": 0.5
}
}
submit_response = self.client.post(
f"{COORDINATOR_URL}/miners/{self.job_id}/result",
json=result_data,
headers=headers
)
if submit_response.status_code == 200:
logger.info("✓ Job result submitted successfully")
logger.info(f" Receipt: {submit_response.json().get('receipt', {}).get('receipt_id', 'N/A')}")
else:
raise Exception(f"Failed to submit result: {submit_response.status_code}")
elif poll_data:
logger.warning(f"Miner received different job: {poll_data.get('job_id')}")
else:
raise Exception("No job received after 5 retries")
async def verify_payment_release(self):
"""Verify that payment was released after job completion"""
logger.info("Step 5: Verifying payment release...")
# Wait a moment for payment processing
await asyncio.sleep(2)
headers = {"X-Api-Key": CLIENT_KEY}
# Check updated job status
response = self.client.get(
f"{COORDINATOR_URL}/jobs/{self.job_id}",
headers=headers
)
if response.status_code == 200:
job = response.json()
logger.info(f"✓ Final job status: {job['state']}")
logger.info(f" Final payment status: {job.get('payment_status', 'N/A')}")
# Get payment receipt
if self.payment_id:
receipt_response = self.client.get(
f"{COORDINATOR_URL}/payments/{self.payment_id}/receipt",
headers=headers
)
if receipt_response.status_code == 200:
receipt = receipt_response.json()
logger.info(f"✓ Payment receipt:")
logger.info(f" Status: {receipt['status']}")
logger.info(f" Verified at: {receipt.get('verified_at', 'N/A')}")
logger.info(f" Transaction hash: {receipt.get('transaction_hash', 'N/A')}")
else:
logger.warning(f"Could not fetch payment receipt: {receipt_response.status_code}")
else:
raise Exception(f"Failed to verify payment release: {response.status_code}")
async def test_refund_flow(self):
"""Test payment refund for failed jobs"""
logger.info("Step 6: Testing refund flow...")
# Create a new job that will fail
job_data = {
"payload": {
"service_type": "llm",
"model": "nonexistent_model",
"prompt": "This should fail"
},
"payment_amount": 0.5,
"payment_currency": "AITBC"
}
headers = {"X-Api-Key": CLIENT_KEY}
response = self.client.post(
f"{COORDINATOR_URL}/jobs",
json=job_data,
headers=headers
)
if response.status_code == 201:
fail_job = response.json()
fail_job_id = fail_job["job_id"]
fail_payment_id = fail_job.get("payment_id")
logger.info(f"✓ Created test job for refund: {fail_job_id}")
# Simulate job failure
fail_headers = {"X-Api-Key": MINER_KEY}
# Poll for the job
poll_response = self.client.post(
f"{COORDINATOR_URL}/miners/poll",
json={"capabilities": ["llm"]},
headers=fail_headers
)
if poll_response.status_code == 200:
poll_data = poll_response.json()
if poll_data.get("job_id") == fail_job_id:
# Submit failure
fail_data = {
"error_code": "MODEL_NOT_FOUND",
"error_message": "The specified model does not exist"
}
fail_response = self.client.post(
f"{COORDINATOR_URL}/miners/{fail_job_id}/fail",
json=fail_data,
headers=fail_headers
)
if fail_response.status_code == 200:
logger.info("✓ Job failure submitted")
# Wait for refund processing
await asyncio.sleep(2)
# Check refund status
if fail_payment_id:
payment_response = self.client.get(
f"{COORDINATOR_URL}/payments/{fail_payment_id}",
headers=headers
)
if payment_response.status_code == 200:
payment = payment_response.json()
logger.info(f"✓ Payment refunded:")
logger.info(f" Status: {payment['status']}")
logger.info(f" Refunded at: {payment.get('refunded_at', 'N/A')}")
else:
logger.warning(f"Could not verify refund: {payment_response.status_code}")
else:
logger.warning(f"Failed to submit job failure: {fail_response.status_code}")
logger.info("\n=== Test Summary ===")
logger.info("✓ Job creation with payment")
logger.info("✓ Payment escrow creation")
logger.info("✓ Job completion and payment release")
logger.info("✓ Job failure and payment refund")
logger.info("\nPayment integration is working correctly!")
async def main():
"""Run the payment integration test"""
test = PaymentIntegrationTest()
try:
await test.test_complete_payment_flow()
except Exception as e:
logger.error(f"Test failed: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,74 +0,0 @@
#!/usr/bin/env python3
"""
Simple test for block import endpoint without transactions
"""
import json
import hashlib
import requests
BASE_URL = "https://aitbc.bubuit.net/rpc"
CHAIN_ID = "ait-devnet"
def compute_block_hash(height, parent_hash, timestamp):
"""Compute block hash using the same algorithm as PoA proposer"""
payload = f"{CHAIN_ID}|{height}|{parent_hash}|{timestamp}".encode()
return "0x" + hashlib.sha256(payload).hexdigest()
def test_simple_block_import():
"""Test importing a simple block without transactions"""
print("Testing Simple Block Import")
print("=" * 40)
# Get current head
response = requests.get(f"{BASE_URL}/head")
head = response.json()
print(f"Current head: height={head['height']}, hash={head['hash']}")
# Create a new block
height = head["height"] + 1
parent_hash = head["hash"]
timestamp = "2026-01-29T10:20:00"
block_hash = compute_block_hash(height, parent_hash, timestamp)
print(f"\nCreating test block:")
print(f" height: {height}")
print(f" parent_hash: {parent_hash}")
print(f" hash: {block_hash}")
# Import the block
response = requests.post(
f"{BASE_URL}/blocks/import",
json={
"height": height,
"hash": block_hash,
"parent_hash": parent_hash,
"proposer": "test-proposer",
"timestamp": timestamp,
"tx_count": 0
}
)
print(f"\nImport response:")
print(f" Status: {response.status_code}")
print(f" Body: {response.json()}")
if response.status_code == 200:
print("\n✅ Block imported successfully!")
# Verify the block was imported
response = requests.get(f"{BASE_URL}/blocks/{height}")
if response.status_code == 200:
imported = response.json()
print(f"\n✅ Verified imported block:")
print(f" height: {imported['height']}")
print(f" hash: {imported['hash']}")
print(f" proposer: {imported['proposer']}")
else:
print(f"\n❌ Could not retrieve imported block: {response.status_code}")
else:
print(f"\n❌ Import failed: {response.status_code}")
if __name__ == "__main__":
test_simple_block_import()

View File

@@ -1,77 +0,0 @@
#!/usr/bin/env python3
"""
Test if transactions are displaying on the explorer
"""
import requests
from bs4 import BeautifulSoup
def main():
print("🔍 Testing Transaction Display on Explorer")
print("=" * 60)
# Check API has transactions
print("\n1. Checking API for transactions...")
try:
response = requests.get("https://aitbc.bubuit.net/api/explorer/transactions")
if response.status_code == 200:
data = response.json()
print(f"✅ API has {len(data['items'])} transactions")
if data['items']:
first_tx = data['items'][0]
print(f"\n First transaction:")
print(f" Hash: {first_tx['hash']}")
print(f" From: {first_tx['from']}")
print(f" To: {first_tx.get('to', 'null')}")
print(f" Value: {first_tx['value']}")
print(f" Status: {first_tx['status']}")
else:
print(f"❌ API failed: {response.status_code}")
return
except Exception as e:
print(f"❌ Error: {e}")
return
# Check explorer page
print("\n2. Checking explorer page...")
try:
response = requests.get("https://aitbc.bubuit.net/explorer/#/transactions")
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
# Check if it says "mock data"
if "mock data" in soup.text.lower():
print("❌ Page still shows 'mock data' message")
else:
print("✅ No 'mock data' message found")
# Check for transactions table
table = soup.find('tbody', {'id': 'transactions-table-body'})
if table:
rows = table.find_all('tr')
if len(rows) > 0:
if 'Loading' in rows[0].text:
print("⏳ Still loading transactions...")
elif 'No transactions' in rows[0].text:
print("❌ No transactions displayed")
else:
print(f"✅ Found {len(rows)} transaction rows")
else:
print("❌ No transaction rows found")
else:
print("❌ Transactions table not found")
else:
print(f"❌ Failed to load page: {response.status_code}")
except Exception as e:
print(f"❌ Error: {e}")
print("\n" + "=" * 60)
print("\n💡 If transactions aren't showing, it might be because:")
print(" 1. JavaScript is still loading")
print(" 2. The API call is failing")
print(" 3. The transactions have empty values")
print("\n Try refreshing the page or check browser console for errors")
if __name__ == "__main__":
main()

View File

@@ -1,77 +0,0 @@
#!/usr/bin/env python3
"""
Test transaction import specifically
"""
import json
import hashlib
import requests
BASE_URL = "https://aitbc.bubuit.net/rpc"
CHAIN_ID = "ait-devnet"
def compute_block_hash(height, parent_hash, timestamp):
"""Compute block hash using the same algorithm as PoA proposer"""
payload = f"{CHAIN_ID}|{height}|{parent_hash}|{timestamp}".encode()
return "0x" + hashlib.sha256(payload).hexdigest()
def test_transaction_import():
"""Test importing a block with a single transaction"""
print("Testing Transaction Import")
print("=" * 40)
# Get current head
response = requests.get(f"{BASE_URL}/head")
head = response.json()
print(f"Current head: height={head['height']}")
# Create a new block with one transaction
height = head["height"] + 1
parent_hash = head["hash"]
timestamp = "2026-01-29T10:20:00"
block_hash = compute_block_hash(height, parent_hash, timestamp)
test_block = {
"height": height,
"hash": block_hash,
"parent_hash": parent_hash,
"proposer": "test-proposer",
"timestamp": timestamp,
"tx_count": 1,
"transactions": [{
"tx_hash": "0xtx123456789",
"sender": "0xsender123",
"recipient": "0xreceiver456",
"payload": {"to": "0xreceiver456", "amount": 1000000}
}]
}
print(f"\nTest block data:")
print(json.dumps(test_block, indent=2))
# Import the block
response = requests.post(
f"{BASE_URL}/blocks/import",
json=test_block
)
print(f"\nImport response:")
print(f" Status: {response.status_code}")
print(f" Body: {response.json()}")
# Check logs
print("\nChecking recent logs...")
import subprocess
result = subprocess.run(
["ssh", "aitbc-cascade", "journalctl -u blockchain-node --since '30 seconds ago' | grep 'Importing transaction' | tail -1"],
capture_output=True,
text=True
)
if result.stdout:
print(f"Log: {result.stdout.strip()}")
else:
print("No transaction import logs found")
if __name__ == "__main__":
test_transaction_import()

View File

@@ -1,21 +0,0 @@
#!/usr/bin/env python3
"""
Test the Transaction model directly
"""
# Test creating a transaction model instance
tx_data = {
"tx_hash": "0xtest123",
"sender": "0xsender",
"recipient": "0xrecipient",
"payload": {"test": "data"}
}
print("Transaction data:")
print(tx_data)
# Simulate what the router does
print("\nExtracting fields:")
print(f"tx_hash: {tx_data.get('tx_hash')}")
print(f"sender: {tx_data.get('sender')}")
print(f"recipient: {tx_data.get('recipient')}")

View File

@@ -1,91 +0,0 @@
#!/usr/bin/env python3
"""
Verify that the explorer is using live data instead of mock
"""
import requests
import json
def main():
print("🔍 Verifying AITBC Explorer is using Live Data")
print("=" * 60)
# Check API endpoint
print("\n1. Testing API endpoint...")
try:
response = requests.get("https://aitbc.bubuit.net/api/explorer/blocks")
if response.status_code == 200:
data = response.json()
print(f"✅ API is working - Found {len(data['items'])} blocks")
# Show latest block
if data['items']:
latest = data['items'][0]
print(f"\n Latest Block:")
print(f" Height: {latest['height']}")
print(f" Hash: {latest['hash']}")
print(f" Proposer: {latest['proposer']}")
print(f" Time: {latest['timestamp']}")
else:
print(f"❌ API failed: {response.status_code}")
return
except Exception as e:
print(f"❌ API error: {e}")
return
# Check explorer page
print("\n2. Checking explorer configuration...")
# Get the JS file
try:
js_response = requests.get("https://aitbc.bubuit.net/explorer/assets/index-IsD_hiHT.js")
if js_response.status_code == 200:
js_content = js_response.text
# Check for live data mode
if 'dataMode:"live"' in js_content:
print("✅ Explorer is configured for LIVE data")
elif 'dataMode:"mock"' in js_content:
print("❌ Explorer is still using MOCK data")
return
else:
print("⚠️ Could not determine data mode")
except Exception as e:
print(f"❌ Error checking JS: {e}")
# Check other endpoints
print("\n3. Testing other endpoints...")
endpoints = [
("/api/explorer/transactions", "Transactions"),
("/api/explorer/addresses", "Addresses"),
("/api/explorer/receipts", "Receipts")
]
for endpoint, name in endpoints:
try:
response = requests.get(f"https://aitbc.bubuit.net{endpoint}")
if response.status_code == 200:
data = response.json()
print(f"{name}: {len(data['items'])} items")
else:
print(f"{name}: Failed ({response.status_code})")
except Exception as e:
print(f"{name}: Error - {e}")
print("\n" + "=" * 60)
print("✅ Explorer is successfully using LIVE data!")
print("\n📊 Live Data Sources:")
print(" • Blocks: https://aitbc.bubuit.net/api/explorer/blocks")
print(" • Transactions: https://aitbc.bubuit.net/api/explorer/transactions")
print(" • Addresses: https://aitbc.bubuit.net/api/explorer/addresses")
print(" • Receipts: https://aitbc.bubuit.net/api/explorer/receipts")
print("\n💡 Visitors to https://aitbc.bubuit.net/explorer/ will now see:")
print(" • Real blockchain data")
print(" • Actual transactions")
print(" • Live network activity")
print(" • No mock/sample data")
if __name__ == "__main__":
main()

View File

@@ -1,35 +0,0 @@
#!/bin/bash
# Simple verification of GPU deployment in container
echo "🔍 Checking GPU deployment in AITBC container..."
# Check if services exist
echo "1. Checking if services are installed..."
if ssh aitbc 'systemctl list-unit-files | grep -E "aitbc-gpu" 2>/dev/null'; then
echo "✅ GPU services found"
else
echo "❌ GPU services not found - need to deploy first"
exit 1
fi
# Check service status
echo -e "\n2. Checking service status..."
ssh aitbc 'sudo systemctl status aitbc-gpu-registry.service --no-pager --lines=3'
ssh aitbc 'sudo systemctl status aitbc-gpu-miner.service --no-pager --lines=3'
# Check if ports are listening
echo -e "\n3. Checking if GPU registry is listening..."
if ssh aitbc 'ss -tlnp | grep :8091 2>/dev/null'; then
echo "✅ GPU registry listening on port 8091"
else
echo "❌ GPU registry not listening"
fi
# Check GPU registration
echo -e "\n4. Checking GPU registration from container..."
ssh aitbc 'curl -s http://127.0.0.1:8091/miners/list 2>/dev/null | python3 -c "import sys,json; data=json.load(sys.stdin); print(f\"Found {len(data.get(\"gpus\", []))} GPU(s)\")" 2>/dev/null || echo "Failed to get GPU list"'
echo -e "\n5. Checking from host (10.1.223.93)..."
curl -s http://10.1.223.93:8091/miners/list 2>/dev/null | python3 -c "import sys,json; data=json.load(sys.stdin); print(f\"✅ From host: Found {len(data.get(\"gpus\", []))} GPU(s)\")" 2>/dev/null || echo "❌ Cannot access from host"
echo -e "\n✅ Verification complete!"

View File

@@ -1,84 +0,0 @@
#!/usr/bin/env python3
"""
Verify that the data mode toggle button is removed from the explorer
"""
import requests
import re
def main():
print("🔍 Verifying Data Mode Toggle is Removed")
print("=" * 60)
# Get the explorer page
print("\n1. Checking explorer page...")
try:
response = requests.get("https://aitbc.bubuit.net/explorer/")
if response.status_code == 200:
print("✅ Explorer page loaded")
else:
print(f"❌ Failed to load page: {response.status_code}")
return
except Exception as e:
print(f"❌ Error: {e}")
return
# Check for data mode toggle elements
print("\n2. Checking for data mode toggle...")
html_content = response.text
# Check for toggle button
if 'dataModeBtn' in html_content:
print("❌ Data mode toggle button still present!")
return
else:
print("✅ Data mode toggle button removed")
# Check for mode-button class
if 'mode-button' in html_content:
print("❌ Mode button class still found!")
return
else:
print("✅ Mode button class removed")
# Check for data-mode-toggle
if 'data-mode-toggle' in html_content:
print("❌ Data mode toggle component still present!")
return
else:
print("✅ Data mode toggle component removed")
# Check JS file
print("\n3. Checking JavaScript file...")
try:
js_response = requests.get("https://aitbc.bubuit.net/explorer/assets/index-7nlLaz1v.js")
if js_response.status_code == 200:
js_content = js_response.text
if 'initDataModeToggle' in js_content:
print("❌ Data mode toggle initialization still in JS!")
return
else:
print("✅ Data mode toggle initialization removed")
if 'dataMode:"mock"' in js_content:
print("❌ Mock data mode still configured!")
return
elif 'dataMode:"live"' in js_content:
print("✅ Live data mode confirmed")
else:
print(f"❌ Failed to load JS: {js_response.status_code}")
except Exception as e:
print(f"❌ Error checking JS: {e}")
print("\n" + "=" * 60)
print("✅ Data mode toggle successfully removed!")
print("\n🎉 The explorer now:")
print(" • Uses live data only")
print(" • Has no mock/live toggle button")
print(" • Shows real blockchain data")
print(" • Is cleaner and more professional")
if __name__ == "__main__":
main()

View File

@@ -1,65 +0,0 @@
#!/usr/bin/env python3
"""
Verify that transactions are now showing properly on the explorer
"""
import requests
def main():
print("🔍 Verifying Transactions Display on AITBC Explorer")
print("=" * 60)
# Check API
print("\n1. API Check:")
try:
response = requests.get("https://aitbc.bubuit.net/api/explorer/transactions")
if response.status_code == 200:
data = response.json()
print(f" ✅ API returns {len(data['items'])} transactions")
# Count by status
status_counts = {}
for tx in data['items']:
status = tx['status']
status_counts[status] = status_counts.get(status, 0) + 1
print(f"\n Transaction Status Breakdown:")
for status, count in status_counts.items():
print(f"{status}: {count}")
else:
print(f" ❌ API failed: {response.status_code}")
except Exception as e:
print(f" ❌ Error: {e}")
# Check main explorer page
print("\n2. Main Page Check:")
print(" Visit: https://aitbc.bubuit.net/explorer/")
print(" ✅ Overview page now shows:")
print(" • Real-time network statistics")
print(" • Total transactions count")
print(" • Completed/Running transactions")
# Check transactions page
print("\n3. Transactions Page Check:")
print(" Visit: https://aitbc.bubuit.net/explorer/#/transactions")
print(" ✅ Now shows:")
print("'Latest transactions on the AITBC network'")
print(" • No 'mock data' references")
print(" • Real transaction data from API")
print("\n" + "=" * 60)
print("✅ All mock data references removed!")
print("\n📊 What's now displayed:")
print(" • Real blocks with actual job IDs")
print(" • Live transactions from clients")
print(" • Network statistics")
print(" • Professional, production-ready interface")
print("\n💡 Note: Most transactions show:")
print(" • From: ${CLIENT_API_KEY}")
print(" • To: null (not assigned to miner yet)")
print(" • Value: 0 (cost shown when completed)")
print(" • Status: Queued/Running/Expired")
if __name__ == "__main__":
main()

View File

@@ -1,64 +0,0 @@
#!/usr/bin/env python3
"""
Verify Windsurf test integration is working properly
"""
import subprocess
import sys
import os
def run_command(cmd, description):
"""Run a command and return success status"""
print(f"\n{'='*60}")
print(f"Testing: {description}")
print(f"Command: {cmd}")
print('='*60)
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.stdout:
print("STDOUT:")
print(result.stdout)
if result.stderr:
print("STDERR:")
print(result.stderr)
return result.returncode == 0
def main():
print("🔍 Verifying Windsurf Test Integration")
print("=" * 60)
# Change to project directory
os.chdir('/home/oib/windsurf/aitbc')
tests = [
("pytest --collect-only tests/test_windsurf_integration.py", "Test Discovery"),
("pytest tests/test_windsurf_integration.py -v", "Run Simple Tests"),
("pytest --collect-only tests/ -q --no-cov", "Collect All Tests (without imports)"),
]
all_passed = True
for cmd, desc in tests:
if not run_command(cmd, desc):
all_passed = False
print(f"❌ Failed: {desc}")
else:
print(f"✅ Passed: {desc}")
print("\n" + "=" * 60)
if all_passed:
print("✅ All tests passed! Windsurf integration is working.")
print("\nTo use in Windsurf:")
print("1. Open the Testing panel (beaker icon)")
print("2. Tests should be automatically discovered")
print("3. Click play button to run tests")
print("4. Use F5 to debug tests")
else:
print("❌ Some tests failed. Check the output above.")
sys.exit(1)
if __name__ == "__main__":
main()