feat: migrate tests to use centralized aitbc package utilities
Some checks failed
Python Tests / test-python (push) Failing after 43s

- Migrate HTTP client usage from requests to aitbc.AITBCHTTPClient in test files
- Update conftest.py to use aitbc path utilities (get_data_path, get_log_path)
- Update test_model_validation.py to use aitbc validators (validate_address, validate_hash)
- Skip HTML scraping files that require raw requests (verify_toggle_removed.py)
- Migrated files: test_payment_integration.py, test_cross_node_blockchain.py, verify_transactions_fixed.py, test_tx_import.py, test_simple_import.py, test_minimal.py, test_block_import_complete.py
This commit is contained in:
aitbc
2026-04-24 21:45:18 +02:00
parent 35196e4d43
commit 9b274d4386
12 changed files with 78 additions and 68 deletions

View File

@@ -12,6 +12,12 @@ from unittest.mock import Mock
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# Add aitbc package to sys.path for centralized utilities
sys.path.insert(0, str(project_root / "aitbc"))
# Import aitbc utilities for conftest
from aitbc import get_data_path, get_log_path
# Add necessary source paths
sys.path.insert(0, str(project_root / "packages" / "py" / "aitbc-core" / "src"))
sys.path.insert(0, str(project_root / "packages" / "py" / "aitbc-crypto" / "src"))
@@ -44,8 +50,9 @@ sys.path.insert(0, str(project_root / "apps" / "coordinator-api"))
# Set up test environment
os.environ["TEST_MODE"] = "true"
os.environ["AUDIT_LOG_DIR"] = str(project_root / "logs" / "audit")
os.environ["AUDIT_LOG_DIR"] = str(get_log_path() / "audit")
os.environ["TEST_DATABASE_URL"] = "sqlite:///:memory:"
os.environ["DATA_DIR"] = str(get_data_path())
# Mock missing optional dependencies
sys.modules['slowapi'] = Mock()

View File

@@ -19,14 +19,13 @@ def compute_block_hash(height, parent_hash, timestamp):
def test_block_import():
"""Test the block import endpoint with various scenarios"""
import requests
print("Testing Block Import Endpoint")
print("=" * 50)
# Get current head to work with existing blockchain
response = requests.get(f"{BASE_URL}/head")
head = response.json()
client = AITBCHTTPClient()
head = client.get(f"{BASE_URL}/head")
print(f"Current head: height={head['height']}, hash={head['hash']}")
# Use very high heights to avoid conflicts with existing chain

View File

@@ -6,8 +6,8 @@ Tests all functionality including validation, conflicts, and transaction import
import json
import hashlib
import requests
from datetime import datetime
from aitbc import AITBCHTTPClient
BASE_URL = "https://aitbc.bubuit.net/rpc"
CHAIN_ID = "ait-mainnet"
@@ -25,10 +25,11 @@ def test_block_import_complete():
print("=" * 60)
results = []
client = AITBCHTTPClient()
# Test 1: Invalid height (0)
print("\n[TEST 1] Invalid height (0)...")
response = requests.post(
response = client.post(
f"{BASE_URL}/importBlock",
json={
"height": 0,

View File

@@ -4,11 +4,11 @@ Cross-node blockchain feature tests
Tests new blockchain features across aitbc and aitbc1 nodes
"""
import requests
import hashlib
import subprocess
from datetime import datetime
import time
from aitbc import AITBCHTTPClient, NetworkErroration
# Test configuration
NODES = {
@@ -35,12 +35,11 @@ def compute_block_hash(height, parent_hash, timestamp):
def get_node_head(node_key):
"""Get the current head block from a node"""
url = f"{NODES[node_key]['rpc_url']}/head"
client = AITBCHTTPClient(timeout=10)
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
url = f"{NODES[node_key]['rpc_url']}/head"
return client.get(url)
except NetworkError as e:
print(f"Error getting head from {node_key}: {e}")
return None
@@ -120,24 +119,27 @@ def test_cross_node_block_sync():
timestamp = datetime.utcnow().isoformat() + "Z"
valid_hash = compute_block_hash(height, parent_hash, timestamp)
response = requests.post(
f"{NODES['aitbc']['rpc_url']}/importBlock",
json={
"height": height,
"hash": valid_hash,
"parent_hash": parent_hash,
"proposer": "cross-node-test",
"timestamp": timestamp,
"tx_count": 0,
"chain_id": CHAIN_ID
}
)
if response.status_code == 200 and response.json().get("success"):
print(f"✅ Block imported on aitbc: height={height}, hash={valid_hash}")
else:
print(f"❌ Failed to import block on aitbc: {response.status_code}")
print(f"Response: {response.text}")
client = AITBCHTTPClient(timeout=10)
try:
result = client.post(
f"{NODES['aitbc']['rpc_url']}/importBlock",
json={
"height": height,
"hash": valid_hash,
"parent_hash": parent_hash,
"proposer": "cross-node-test",
"timestamp": timestamp,
"tx_count": 0,
"chain_id": CHAIN_ID
}
)
if result.get("success"):
print(f"✅ Block imported on aitbc: height={height}, hash={valid_hash}")
else:
print(f"❌ Failed to import block on aitbc")
return False
except NetworkError as e:
print(f"❌ Failed to import block on aitbc: {e}")
return False
# Wait for gossip propagation
@@ -151,9 +153,8 @@ def test_cross_node_block_sync():
# Try to get the specific block from aitbc1
try:
response = requests.get(f"{NODES['aitbc1']['rpc_url']}/blocks/{height}", timeout=10)
if response.status_code == 200:
block_data = response.json()
block_data = AITBCHTTPClient(timeout=10).get(f"{NODES['aitbc1']['rpc_url']}/blocks/{height}")
if block_data:
print(f"✅ Block synced to aitbc1: height={block_data.get('height')}, hash={block_data.get('hash')}")
return True
else:
@@ -175,13 +176,12 @@ def test_cross_node_block_range():
for node_key in NODES:
url = f"{NODES[node_key]['rpc_url']}/blocks-range"
try:
response = requests.get(url, params={"start": 0, "end": 5}, timeout=10)
response.raise_for_status()
blocks = response.json().get("blocks", [])
response = AITBCHTTPClient(timeout=10).get(url, params={"start": 0, "end": 5})
blocks = response.get("blocks", []) if response else []
print(f"{NODES[node_key]['name']}: returned {len(blocks)} blocks in range 0-5")
assert len(blocks) >= 1, \
f"Node {node_key} returned no blocks"
except Exception as e:
except NetworkError as e:
print(f"❌ Error getting block range from {node_key}: {e}")
return False
@@ -195,15 +195,13 @@ def test_cross_node_connectivity():
print("=" * 60)
for node_key in NODES:
url = f"{NODES[node_key]['rpc_url']}/head"
client = AITBCHTTPClient(timeout=10)
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
head = response.json()
head = client.get(f"{NODES[node_key]['rpc_url']}/head")
print(f"{NODES[node_key]['name']}: reachable, height={head.get('height')}")
assert head.get("height") is not None, \
f"Node {node_key} did not return valid head"
except Exception as e:
except NetworkError as e:
print(f"❌ Error connecting to {node_key}: {e}")
return False

View File

@@ -5,7 +5,7 @@ Minimal test to debug transaction import
import json
import hashlib
import requests
from aitbc import AITBCHTTPClient, NetworkError
BASE_URL = "https://aitbc.bubuit.net/rpc"
CHAIN_ID = "ait-mainnet"
@@ -19,8 +19,8 @@ def test_minimal():
"""Test with minimal data"""
# Get current head
response = requests.get(f"{BASE_URL}/head")
head = response.json()
client = AITBCHTTPClient()
head = client.get(f"{BASE_URL}/head")
# Create a new block
height = head["height"] + 1
@@ -58,9 +58,8 @@ def test_minimal():
test_block["transactions"] = [{"tx_hash": "0xtest", "sender": "0xtest", "recipient": "0xtest", "payload": {}}]
print("\nTesting with one transaction...")
response = requests.post(f"{BASE_URL}/importBlock", json=test_block)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
response = client.post(f"{BASE_URL}/importBlock", json=test_block)
print(f"Response: {response}")
if __name__ == "__main__":
test_minimal()

View File

@@ -41,6 +41,14 @@ test_data = {
print("Test data:")
print(test_data)
# Validate address and hash using aitbc validators
try:
validate_address(test_data["proposer"])
validate_hash(test_data["hash"])
print("✅ Address and hash validation passed")
except Exception as e:
print(f"⚠️ Validation warning: {e}")
try:
request = BlockImportRequest(**test_data)
print("\n✅ Request validated successfully!")

View File

@@ -7,13 +7,12 @@ Tests job creation with payments, escrow, release, and refund flows
import asyncio
import httpx
import json
import logging
from datetime import datetime
from typing import Dict, Any
from aitbc import get_logger
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
# Configuration
COORDINATOR_URL = "https://aitbc.bubuit.net/api"
@@ -22,7 +21,7 @@ MINER_KEY = "${MINER_API_KEY}"
class PaymentIntegrationTest:
def __init__(self):
self.client = httpx.Client(timeout=30.0)
self.client = AITBCHTTPClient(timeout=30.0)
self.job_id = None
self.payment_id = None

View File

@@ -7,13 +7,12 @@ Tests job creation with payments, escrow, release, and refund flows
import asyncio
import httpx
import json
import logging
from datetime import datetime
from typing import Dict, Any
from aitbc import get_logger
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
# Configuration - Using localhost as we're testing from the server
COORDINATOR_URL = "http://127.0.0.1:8000/v1"

View File

@@ -5,7 +5,7 @@ Simple test for block import endpoint without transactions
import json
import hashlib
import requests
from aitbc import AITBCHTTPClient, NetworkError
BASE_URL = "https://aitbc.bubuit.net/rpc"
CHAIN_ID = "ait-mainnet"
@@ -22,8 +22,8 @@ def test_simple_block_import():
print("=" * 40)
# Get current head
response = requests.get(f"{BASE_URL}/head")
head = response.json()
client = AITBCHTTPClient()
head = client.get(f"{BASE_URL}/head")
print(f"Current head: height={head['height']}, hash={head['hash']}")
# Create a new block

View File

@@ -5,7 +5,7 @@ Test transaction import specifically
import json
import hashlib
import requests
from aitbc import AITBCHTTPClient, NetworkError
BASE_URL = "https://aitbc.bubuit.net/rpc"
CHAIN_ID = "ait-mainnet"
@@ -22,8 +22,8 @@ def test_transaction_import():
print("=" * 40)
# Get current head
response = requests.get(f"{BASE_URL}/head")
head = response.json()
client = AITBCHTTPClient()
head = client.get(f"{BASE_URL}/head")
print(f"Current head: height={head['height']}")
# Create a new block with one transaction

View File

@@ -3,8 +3,8 @@
Verify that the data mode toggle button is removed from the explorer
"""
import requests
import re
import requests
def main():
print("🔍 Verifying Data Mode Toggle is Removed")

View File

@@ -3,7 +3,7 @@
Verify that transactions are now showing properly on the explorer
"""
import requests
from aitbc import AITBCHTTPClient, NetworkError
def main():
print("🔍 Verifying Transactions Display on AITBC Explorer")
@@ -12,9 +12,9 @@ def main():
# Check API
print("\n1. API Check:")
try:
response = requests.get("https://aitbc.bubuit.net/api/explorer/transactions")
if response.status_code == 200:
data = response.json()
client = AITBCHTTPClient()
data = client.get("https://aitbc.bubuit.net/api/explorer/transactions")
if data:
print(f" ✅ API returns {len(data['items'])} transactions")
# Count by status
@@ -27,8 +27,8 @@ def main():
for status, count in status_counts.items():
print(f"{status}: {count}")
else:
print(f" ❌ API failed: {response.status_code}")
except Exception as e:
print(f" ❌ API failed")
except NetworkError as e:
print(f" ❌ Error: {e}")
# Check main explorer page