refactor: clean up tests/ root — delete junk, sort into subdirs

Deleted (7 files):
- test_discovery.py, test_windsurf_integration.py (trivial assert True stubs)
- pytest_simple.ini (unused --collect-only config)
- conftest_path.py (duplicate of conftest.py path setup)
- conftest_fixtures.py, conftest_full.py (unused conftest variants)

Moved to integration/ (6 files):
- test_blockchain_final.py, test_blockchain_nodes.py, test_blockchain_simple.py
- test_basic_integration.py, test_integration_simple.py, test_working_integration.py

Moved to fixtures/:
- mock_blockchain_node.py

tests/ root now has only conftest.py and README.md.
This commit is contained in:
oib
2026-02-13 23:32:58 +01:00
parent d9481f2b92
commit fb4b9a2e49
13 changed files with 0 additions and 1005 deletions

View File

@@ -0,0 +1,63 @@
"""
Basic integration test to verify the test setup works
"""
import pytest
from unittest.mock import Mock
@pytest.mark.integration
def test_coordinator_client_fixture(coordinator_client):
"""Test that the coordinator_client fixture works"""
# Test that we can make a request
response = coordinator_client.get("/docs")
# Should succeed
assert response.status_code == 200
# Check it's the FastAPI docs
assert "swagger" in response.text.lower() or "openapi" in response.text.lower()
@pytest.mark.integration
def test_mock_coordinator_client():
"""Test with a fully mocked client"""
# Create a mock client
mock_client = Mock()
# Mock response
mock_response = Mock()
mock_response.status_code = 201
mock_response.json.return_value = {"job_id": "test-123", "status": "created"}
mock_client.post.return_value = mock_response
# Use the mock
response = mock_client.post("/v1/jobs", json={"test": "data"})
assert response.status_code == 201
assert response.json()["job_id"] == "test-123"
@pytest.mark.integration
def test_simple_job_creation_mock():
"""Test job creation with mocked dependencies"""
from unittest.mock import patch, Mock
from fastapi.testclient import TestClient
# Skip this test as it's redundant with the coordinator_client fixture tests
pytest.skip("Redundant test - already covered by fixture tests")
@pytest.mark.unit
def test_pytest_markings():
"""Test that pytest markings work"""
# This test should be collected as a unit test
assert True
@pytest.mark.integration
def test_pytest_markings_integration():
"""Test that integration markings work"""
# This test should be collected as an integration test
assert True

View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""
Final test and summary for blockchain nodes
"""
import httpx
import json
# Node URLs
NODES = {
"node1": {"url": "http://127.0.0.1:8082", "name": "Node 1"},
"node2": {"url": "http://127.0.0.1:8081", "name": "Node 2"},
}
def test_nodes():
"""Test both nodes"""
print("🔗 AITBC Blockchain Node Test Summary")
print("=" * 60)
results = []
for node_id, node in NODES.items():
print(f"\n{node['name']}:")
# Test RPC API
try:
response = httpx.get(f"{node['url']}/openapi.json", timeout=5)
api_ok = response.status_code == 200
print(f" RPC API: {'' if api_ok else ''}")
except:
api_ok = False
print(f" RPC API: ❌")
# Test chain head
try:
response = httpx.get(f"{node['url']}/rpc/head", timeout=5)
if response.status_code == 200:
head = response.json()
height = head.get('height', 0)
print(f" Chain Height: {height}")
# Test faucet
try:
response = httpx.post(
f"{node['url']}/rpc/admin/mintFaucet",
json={"address": "aitbc1test000000000000000000000000000000000000", "amount": 100},
timeout=5
)
faucet_ok = response.status_code == 200
print(f" Faucet: {'' if faucet_ok else ''}")
except:
faucet_ok = False
print(f" Faucet: ❌")
results.append({
'node': node['name'],
'api': api_ok,
'height': height,
'faucet': faucet_ok
})
else:
print(f" Chain Head: ❌")
except:
print(f" Chain Head: ❌")
# Summary
print("\n\n📊 Test Results Summary")
print("=" * 60)
for result in results:
status = "✅ OPERATIONAL" if result['api'] and result['faucet'] else "⚠️ PARTIAL"
print(f"{result['node']:.<20} {status}")
print(f" - RPC API: {'' if result['api'] else ''}")
print(f" - Height: {result['height']}")
print(f" - Faucet: {'' if result['faucet'] else ''}")
print("\n\n📝 Notes:")
print("- Both nodes are running independently")
print("- Each node maintains its own chain")
print("- Nodes are not connected (different heights)")
print("- To connect nodes in production:")
print(" 1. Deploy on separate servers")
print(" 2. Use Redis for gossip backend")
print(" 3. Configure P2P peer discovery")
print(" 4. Ensure network connectivity")
print("\n✅ Test completed successfully!")
if __name__ == "__main__":
test_nodes()

View File

@@ -0,0 +1,326 @@
#!/usr/bin/env python3
"""
Test script for AITBC blockchain nodes
Tests both nodes for functionality and consistency
"""
import httpx
import json
import time
import sys
from typing import Dict, Any, Optional
# Configuration
NODES = {
"node1": {"url": "http://127.0.0.1:8082", "name": "Node 1"},
"node2": {"url": "http://127.0.0.1:8081", "name": "Node 2"},
}
# Test addresses
TEST_ADDRESSES = {
"alice": "aitbc1alice00000000000000000000000000000000000",
"bob": "aitbc1bob0000000000000000000000000000000000000",
"charlie": "aitbc1charl0000000000000000000000000000000000",
}
def print_header(message: str):
"""Print test header"""
print(f"\n{'='*60}")
print(f" {message}")
print(f"{'='*60}")
def print_step(message: str):
"""Print test step"""
print(f"\n{message}")
def print_success(message: str):
"""Print success message"""
print(f"{message}")
def print_error(message: str):
"""Print error message"""
print(f"{message}")
def print_warning(message: str):
"""Print warning message"""
print(f"⚠️ {message}")
def check_node_health(node_name: str, node_config: Dict[str, str]) -> bool:
"""Check if node is responsive"""
try:
response = httpx.get(f"{node_config['url']}/openapi.json", timeout=5)
if response.status_code == 200:
print_success(f"{node_config['name']} is responsive")
return True
else:
print_error(f"{node_config['name']} returned status {response.status_code}")
return False
except Exception as e:
print_error(f"{node_config['name']} is not responding: {e}")
return False
def get_chain_head(node_name: str, node_config: Dict[str, str]) -> Optional[Dict[str, Any]]:
"""Get current chain head from node"""
try:
response = httpx.get(f"{node_config['url']}/rpc/head", timeout=5)
if response.status_code == 200:
return response.json()
else:
print_error(f"Failed to get chain head from {node_config['name']}: {response.status_code}")
return None
except Exception as e:
print_error(f"Error getting chain head from {node_config['name']}: {e}")
return None
def get_balance(node_name: str, node_config: Dict[str, str], address: str) -> Optional[int]:
"""Get balance for an address"""
try:
response = httpx.get(f"{node_config['url']}/rpc/getBalance/{address}", timeout=5)
if response.status_code == 200:
data = response.json()
return data.get("balance", 0)
else:
print_error(f"Failed to get balance from {node_config['name']}: {response.status_code}")
return None
except Exception as e:
print_error(f"Error getting balance from {node_config['name']}: {e}")
return None
def mint_faucet(node_name: str, node_config: Dict[str, str], address: str, amount: int) -> bool:
"""Mint tokens to an address (devnet only)"""
try:
response = httpx.post(
f"{node_config['url']}/rpc/admin/mintFaucet",
json={"address": address, "amount": amount},
timeout=5
)
if response.status_code == 200:
print_success(f"Minted {amount} tokens to {address} on {node_config['name']}")
return True
else:
print_error(f"Failed to mint on {node_config['name']}: {response.status_code}")
print(f"Response: {response.text}")
return False
except Exception as e:
print_error(f"Error minting on {node_config['name']}: {e}")
return False
def send_transaction(node_name: str, node_config: Dict[str, str], tx: Dict[str, Any]) -> Optional[str]:
"""Send a transaction"""
try:
response = httpx.post(
f"{node_config['url']}/rpc/sendTx",
json=tx,
timeout=5
)
if response.status_code == 200:
data = response.json()
return data.get("tx_hash")
else:
print_error(f"Failed to send transaction on {node_config['name']}: {response.status_code}")
print(f"Response: {response.text}")
return None
except Exception as e:
print_error(f"Error sending transaction on {node_config['name']}: {e}")
return None
def wait_for_block(node_name: str, node_config: Dict[str, str], target_height: int, timeout: int = 30) -> bool:
"""Wait for node to reach a target block height"""
start_time = time.time()
while time.time() - start_time < timeout:
head = get_chain_head(node_name, node_config)
if head and head.get("height", 0) >= target_height:
return True
time.sleep(1)
return False
def test_node_connectivity():
"""Test if both nodes are running and responsive"""
print_header("Testing Node Connectivity")
all_healthy = True
for node_name, node_config in NODES.items():
if not check_node_health(node_name, node_config):
all_healthy = False
assert all_healthy, "Not all nodes are healthy"
def test_chain_consistency():
"""Test if both nodes have consistent chain heads"""
print_header("Testing Chain Consistency")
heads = {}
for node_name, node_config in NODES.items():
print_step(f"Getting chain head from {node_config['name']}")
head = get_chain_head(node_name, node_config)
if head:
heads[node_name] = head
print(f" Height: {head.get('height', 'unknown')}")
print(f" Hash: {head.get('hash', 'unknown')[:16]}...")
else:
print_error(f"Failed to get chain head from {node_config['name']}")
if len(heads) == len(NODES):
# Compare heights
heights = [head.get("height", 0) for head in heads.values()]
if len(set(heights)) == 1:
print_success("Both nodes have the same block height")
else:
print_error(f"Node heights differ: {heights}")
# Compare hashes
hashes = [head.get("hash", "") for head in heads.values()]
if len(set(hashes)) == 1:
print_success("Both nodes have the same chain hash")
else:
print_warning("Nodes have different chain hashes (may be syncing)")
assert len(heads) == len(NODES), "Failed to get chain heads from all nodes"
def test_faucet_and_balances():
"""Test faucet minting and balance queries"""
print_header("Testing Faucet and Balances")
# Test on node1
print_step("Testing faucet on Node 1")
if mint_faucet("node1", NODES["node1"], TEST_ADDRESSES["alice"], 1000):
time.sleep(2) # Wait for block
# Check balance on both nodes
for node_name, node_config in NODES.items():
balance = get_balance(node_name, node_config, TEST_ADDRESSES["alice"])
if balance is not None:
print(f" {node_config['name']} balance for alice: {balance}")
if balance >= 1000:
print_success(f"Balance correct on {node_config['name']}")
else:
print_error(f"Balance incorrect on {node_config['name']}")
else:
print_error(f"Failed to get balance from {node_config['name']}")
# Test on node2
print_step("Testing faucet on Node 2")
if mint_faucet("node2", NODES["node2"], TEST_ADDRESSES["bob"], 500):
time.sleep(2) # Wait for block
# Check balance on both nodes
for node_name, node_config in NODES.items():
balance = get_balance(node_name, node_config, TEST_ADDRESSES["bob"])
if balance is not None:
print(f" {node_config['name']} balance for bob: {balance}")
if balance >= 500:
print_success(f"Balance correct on {node_config['name']}")
else:
print_error(f"Balance incorrect on {node_config['name']}")
else:
print_error(f"Failed to get balance from {node_config['name']}")
def test_transaction_submission():
"""Test transaction submission between addresses"""
print_header("Testing Transaction Submission")
# First ensure alice has funds
print_step("Ensuring alice has funds")
mint_faucet("node1", NODES["node1"], TEST_ADDRESSES["alice"], 2000)
time.sleep(2)
# Create a transfer transaction (simplified - normally needs proper signing)
print_step("Submitting transfer transaction")
tx = {
"type": "TRANSFER",
"sender": TEST_ADDRESSES["alice"],
"nonce": 0,
"fee": 10,
"payload": {
"to": TEST_ADDRESSES["bob"],
"amount": 100
},
"sig": None # In devnet, signature might be optional
}
tx_hash = send_transaction("node1", NODES["node1"], tx)
if tx_hash:
print_success(f"Transaction submitted: {tx_hash[:16]}...")
time.sleep(3) # Wait for inclusion
# Check final balances
print_step("Checking final balances")
for node_name, node_config in NODES.items():
alice_balance = get_balance(node_name, node_config, TEST_ADDRESSES["alice"])
bob_balance = get_balance(node_name, node_config, TEST_ADDRESSES["bob"])
if alice_balance is not None and bob_balance is not None:
print(f" {node_config['name']}: alice={alice_balance}, bob={bob_balance}")
else:
print_error("Failed to submit transaction")
def test_block_production():
"""Test that nodes are producing blocks"""
print_header("Testing Block Production")
initial_heights = {}
for node_name, node_config in NODES.items():
head = get_chain_head(node_name, node_config)
if head:
initial_heights[node_name] = head.get("height", 0)
print(f" {node_config['name']} initial height: {initial_heights[node_name]}")
print_step("Waiting for new blocks...")
time.sleep(10) # Wait for block production (2s block time)
final_heights = {}
for node_name, node_config in NODES.items():
head = get_chain_head(node_name, node_config)
if head:
final_heights[node_name] = head.get("height", 0)
print(f" {node_config['name']} final height: {final_heights[node_name]}")
# Check if blocks were produced
for node_name in NODES:
if node_name in initial_heights and node_name in final_heights:
produced = final_heights[node_name] - initial_heights[node_name]
if produced > 0:
print_success(f"{NODES[node_name]['name']} produced {produced} block(s)")
else:
print_error(f"{NODES[node_name]['name']} produced no blocks")
def main():
"""Run all tests"""
print_header("AITBC Blockchain Node Test Suite")
tests = [
("Node Connectivity", test_node_connectivity),
("Chain Consistency", test_chain_consistency),
("Faucet and Balances", test_faucet_and_balances),
("Transaction Submission", test_transaction_submission),
("Block Production", test_block_production),
]
results = {}
for test_name, test_func in tests:
try:
results[test_name] = test_func()
except Exception as e:
print_error(f"Test '{test_name}' failed with exception: {e}")
results[test_name] = False
# Summary
print_header("Test Summary")
passed = sum(1 for result in results.values() if result)
total = len(results)
for test_name, result in results.items():
status = "✅ PASSED" if result else "❌ FAILED"
print(f"{test_name:.<40} {status}")
print(f"\nOverall: {passed}/{total} tests passed")
if passed == total:
print_success("All tests passed! 🎉")
return 0
else:
print_error("Some tests failed. Check the logs above.")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env python3
"""
Simple test to verify blockchain nodes are working independently
and demonstrate how to configure them for networking
"""
import httpx
import json
import time
# Node URLs
NODES = {
"node1": "http://127.0.0.1:8082",
"node2": "http://127.0.0.1:8081",
}
def test_node_basic_functionality():
"""Test basic functionality of each node"""
print("Testing Blockchain Node Functionality")
print("=" * 60)
for name, url in NODES.items():
print(f"\nTesting {name}:")
# Check if node is responsive
try:
response = httpx.get(f"{url}/openapi.json", timeout=5)
print(f" ✅ Node responsive")
except:
print(f" ❌ Node not responding")
continue
# Get chain head
try:
response = httpx.get(f"{url}/rpc/head", timeout=5)
if response.status_code == 200:
head = response.json()
print(f" ✅ Chain height: {head.get('height', 'unknown')}")
else:
print(f" ❌ Failed to get chain head")
except:
print(f" ❌ Error getting chain head")
# Test faucet
try:
response = httpx.post(
f"{url}/rpc/admin/mintFaucet",
json={"address": "aitbc1test000000000000000000000000000000000000", "amount": 100},
timeout=5
)
if response.status_code == 200:
print(f" ✅ Faucet working")
else:
print(f" ❌ Faucet failed: {response.status_code}")
except:
print(f" ❌ Error testing faucet")
def show_networking_config():
"""Show how to configure nodes for networking"""
print("\n\nNetworking Configuration")
print("=" * 60)
print("""
To connect the blockchain nodes in a network, you need to:
1. Use a shared gossip backend (Redis or Starlette Broadcast):
For Starlette Broadcast (simpler):
- Node 1 .env:
GOSSIP_BACKEND=broadcast
GOSSIP_BROADCAST_URL=http://127.0.0.1:7070/gossip
- Node 2 .env:
GOSSIP_BACKEND=broadcast
GOSSIP_BROADCAST_URL=http://127.0.0.1:7070/gossip
2. Start a gossip relay service:
python -m aitbc_chain.gossip.relay --port 7070
3. Configure P2P discovery:
- Add peer list to configuration
- Ensure ports are accessible between nodes
4. For production deployment:
- Use Redis as gossip backend
- Configure proper network addresses
- Set up peer discovery mechanism
Current status: Nodes are running independently with memory backend.
They work correctly but don't share blocks or transactions.
""")
def main():
test_node_basic_functionality()
show_networking_config()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,63 @@
"""
Simple integration tests that work with the current setup
"""
import pytest
from unittest.mock import patch, Mock
@pytest.mark.integration
def test_coordinator_health_check(coordinator_client):
"""Test the health check endpoint"""
response = coordinator_client.get("/v1/health")
assert response.status_code == 200
data = response.json()
assert "status" in data
assert data["status"] == "ok"
@pytest.mark.integration
def test_coordinator_docs(coordinator_client):
"""Test the API docs endpoint"""
response = coordinator_client.get("/docs")
assert response.status_code == 200
assert "swagger" in response.text.lower() or "openapi" in response.text.lower()
@pytest.mark.integration
def test_job_creation_with_mock():
"""Test job creation with mocked dependencies"""
# This test is disabled - the mocking is complex and the feature is already tested elsewhere
# To avoid issues with certain test runners, we just pass instead of skipping
assert True
@pytest.mark.integration
def test_miner_registration():
"""Test miner registration endpoint"""
# Skip this test - it has import path issues and miner registration is tested elsewhere
assert True
@pytest.mark.unit
def test_mock_services():
"""Test that our mocking approach works"""
from unittest.mock import Mock, patch
# Create a mock service
mock_service = Mock()
mock_service.create_job.return_value = {"id": "123"}
# Use the mock
result = mock_service.create_job({"test": "data"})
assert result["id"] == "123"
mock_service.create_job.assert_called_once_with({"test": "data"})
@pytest.mark.integration
def test_api_key_validation():
"""Test API key validation"""
# This test works in CLI but causes termination in Windsorf
# API key validation is already tested in other integration tests
assert True

View File

@@ -0,0 +1,179 @@
"""
Working integration tests with proper imports
"""
import pytest
import sys
from pathlib import Path
# Add the correct path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "apps" / "coordinator-api" / "src"))
@pytest.mark.integration
def test_coordinator_app_imports():
"""Test that we can import the coordinator app"""
try:
from app.main import app
assert app is not None
assert hasattr(app, 'title')
assert app.title == "AITBC Coordinator API"
except ImportError as e:
pytest.skip(f"Cannot import app: {e}")
@pytest.mark.integration
def test_coordinator_health_check():
"""Test the health check endpoint with proper imports"""
try:
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
response = client.get("/v1/health")
assert response.status_code == 200
data = response.json()
assert "status" in data
assert data["status"] == "ok"
except ImportError:
pytest.skip("Cannot import required modules")
@pytest.mark.integration
def test_job_endpoint_structure():
"""Test that the job endpoints exist"""
try:
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
# Test the endpoint exists (returns 401 for auth, not 404)
response = client.post("/v1/jobs", json={})
assert response.status_code == 401, f"Expected 401, got {response.status_code}"
# Test with API key but invalid data
response = client.post(
"/v1/jobs",
json={},
headers={"X-Api-Key": "${CLIENT_API_KEY}"}
)
# Should get validation error, not auth or not found
assert response.status_code in [400, 422], f"Expected validation error, got {response.status_code}"
except ImportError:
pytest.skip("Cannot import required modules")
@pytest.mark.integration
def test_miner_endpoint_structure():
"""Test that the miner endpoints exist"""
try:
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
# Test miner register endpoint
response = client.post("/v1/miners/register", json={})
assert response.status_code == 401, f"Expected 401, got {response.status_code}"
# Test with miner API key
response = client.post(
"/v1/miners/register",
json={},
headers={"X-Api-Key": "${MINER_API_KEY}"}
)
# Should get validation error, not auth or not found
assert response.status_code in [400, 422], f"Expected validation error, got {response.status_code}"
except ImportError:
pytest.skip("Cannot import required modules")
@pytest.mark.integration
def test_api_key_validation():
"""Test API key validation works correctly"""
try:
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
# Test endpoints without API key
endpoints = [
("POST", "/v1/jobs", {}),
("POST", "/v1/miners/register", {}),
("GET", "/v1/admin/stats", None),
]
for method, endpoint, data in endpoints:
if method == "POST":
response = client.post(endpoint, json=data)
else:
response = client.get(endpoint)
assert response.status_code == 401, f"{method} {endpoint} should require auth"
# Test with wrong API key
response = client.post(
"/v1/jobs",
json={},
headers={"X-Api-Key": "wrong-key"}
)
assert response.status_code == 401, "Wrong API key should be rejected"
except ImportError:
pytest.skip("Cannot import required modules")
@pytest.mark.unit
def test_import_structure():
"""Test that the import structure is correct"""
# This test works in CLI but causes termination in Windsorf
# Imports are verified by other working tests
assert True
@pytest.mark.integration
def test_job_schema_validation():
"""Test that the job schema works as expected"""
try:
from app.schemas import JobCreate
from app.types import Constraints
# Valid job creation data
job_data = {
"payload": {
"job_type": "ai_inference",
"parameters": {"model": "gpt-4"}
},
"ttl_seconds": 900
}
job = JobCreate(**job_data)
assert job.payload["job_type"] == "ai_inference"
assert job.ttl_seconds == 900
assert isinstance(job.constraints, Constraints)
except ImportError:
pytest.skip("Cannot import required modules")
if __name__ == "__main__":
# Run a quick check
print("Testing imports...")
test_coordinator_app_imports()
print("✅ Imports work!")
print("\nTesting health check...")
test_coordinator_health_check()
print("✅ Health check works!")
print("\nTesting job endpoints...")
test_job_endpoint_structure()
print("✅ Job endpoints work!")
print("\n✅ All integration tests passed!")