Migrate blockchain-explorer and CLI to centralized aitbc package utilities
Some checks failed
API Endpoint Tests / test-api-endpoints (push) Successful in 9s
Blockchain Synchronization Verification / sync-verification (push) Failing after 2s
CLI Tests / test-cli (push) Failing after 4s
Documentation Validation / validate-docs (push) Successful in 17s
Documentation Validation / validate-policies-strict (push) Successful in 9s
Integration Tests / test-service-integration (push) Successful in 2m40s
Multi-Node Blockchain Health Monitoring / health-check (push) Failing after 7s
P2P Network Verification / p2p-verification (push) Successful in 6s
Python Tests / test-python (push) Has been cancelled
Security Scanning / security-scan (push) Has been cancelled
Package Tests / Python package - aitbc-agent-sdk (push) Failing after 31s
Package Tests / Python package - aitbc-core (push) Failing after 35s
Package Tests / Python package - aitbc-crypto (push) Successful in 24s
Package Tests / JavaScript package - aitbc-sdk-js (push) Successful in 15s
Package Tests / JavaScript package - aitbc-token (push) Successful in 32s
Production Tests / Production Integration Tests (push) Failing after 10s
Package Tests / Python package - aitbc-sdk (push) Failing after 10m21s

- Add DataLayer, MockDataGenerator, RealDataFetcher, and get_data_layer to aitbc package exports
- Migrate blockchain-explorer/main.py to use aitbc.get_data_layer for mock/real data toggle
- Add data layer integration to search_transactions, search_blocks, and analytics_overview endpoints
- Migrate CLI blockchain commands to use chain registry instead of hardcoded chain list
- Replace hardcoded ['ait-devnet', 'ait-testnet'] with get
This commit is contained in:
aitbc
2026-04-25 08:01:36 +02:00
parent 3030a3720f
commit 8e1f5864a6
6 changed files with 862 additions and 184 deletions

View File

@@ -108,6 +108,7 @@ from .monitoring import (
PerformanceTimer,
HealthChecker,
)
from .data_layer import DataLayer, MockDataGenerator, RealDataFetcher, get_data_layer
from .crypto import (
derive_ethereum_address,
sign_transaction_hash,
@@ -344,6 +345,11 @@ __all__ = [
"vacuum_database",
"get_table_info",
"table_exists",
# Data layer
"DataLayer",
"MockDataGenerator",
"RealDataFetcher",
"get_data_layer",
# Monitoring
"MetricsCollector",
"PerformanceTimer",

271
aitbc/data_layer.py Normal file
View File

@@ -0,0 +1,271 @@
"""
Data layer abstraction for AITBC
Provides toggle between mock and real data sources for development/testing
"""
import os
from typing import Any, Dict, List, Optional
from datetime import datetime
import httpx
class DataLayer:
"""Data layer abstraction that can switch between mock and real data sources"""
def __init__(self, use_mock_data: Optional[bool] = None):
"""Initialize data layer
Args:
use_mock_data: Force mock mode. If None, uses USE_MOCK_DATA env var
"""
if use_mock_data is None:
self.use_mock_data = os.getenv("USE_MOCK_DATA", "false").lower() == "true"
else:
self.use_mock_data = use_mock_data
self.mock_generator = MockDataGenerator()
self.real_fetcher = RealDataFetcher()
async def get_transactions(
self,
address: Optional[str] = None,
amount_min: Optional[float] = None,
amount_max: Optional[float] = None,
tx_type: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
limit: int = 50,
offset: int = 0,
chain_id: str = "ait-devnet",
rpc_url: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Get transactions from either mock or real data source"""
if self.use_mock_data:
return self.mock_generator.generate_transactions(
address, amount_min, amount_max, tx_type, limit
)
else:
return await self.real_fetcher.fetch_transactions(
address, amount_min, amount_max, tx_type, since, until,
limit, offset, chain_id, rpc_url
)
async def get_blocks(
self,
validator: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
min_tx: Optional[int] = None,
limit: int = 50,
offset: int = 0,
chain_id: str = "ait-devnet",
rpc_url: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Get blocks from either mock or real data source"""
if self.use_mock_data:
return self.mock_generator.generate_blocks(validator, min_tx, limit)
else:
return await self.real_fetcher.fetch_blocks(
validator, since, until, min_tx, limit, offset, chain_id, rpc_url
)
async def get_analytics_overview(self, period: str = "24h", rpc_url: Optional[str] = None) -> Dict[str, Any]:
"""Get analytics overview from either mock or real data source"""
if self.use_mock_data:
return self.mock_generator.generate_analytics(period)
else:
return await self.real_fetcher.fetch_analytics(period, rpc_url)
class MockDataGenerator:
"""Generates mock data for development/testing when mock mode is enabled"""
def generate_transactions(
self,
address: Optional[str] = None,
amount_min: Optional[float] = None,
amount_max: Optional[float] = None,
tx_type: Optional[str] = None,
limit: int = 50
) -> List[Dict[str, Any]]:
"""Generate mock transaction data"""
from aitbc.testing import MockFactory, TestDataGenerator
transactions = []
for _ in range(limit):
tx = TestDataGenerator.generate_transaction_data(
from_address=address or MockFactory.generate_ethereum_address(),
to_address=MockFactory.generate_ethereum_address()
)
if tx_type:
tx["type"] = tx_type
transactions.append(tx)
return transactions
def generate_blocks(
self,
validator: Optional[str] = None,
min_tx: Optional[int] = None,
limit: int = 50
) -> List[Dict[str, Any]]:
"""Generate mock block data"""
from aitbc.testing import MockFactory
blocks = []
for i in range(limit):
blocks.append({
"height": 10000 + i,
"hash": MockFactory.generate_hash(),
"validator": validator or MockFactory.generate_ethereum_address(),
"tx_count": min_tx or 5,
"timestamp": datetime.utcnow().isoformat()
})
return blocks
def generate_analytics(self, period: str = "24h") -> Dict[str, Any]:
"""Generate mock analytics data"""
if period == "1h":
labels = [f"{i:02d}:{(i*5)%60:02d}" for i in range(12)]
volume_values = [10 + i * 2 for i in range(12)]
activity_values = [5 + i for i in range(12)]
elif period == "24h":
labels = [f"{i:02d}:00" for i in range(0, 24, 2)]
volume_values = [50 + i * 5 for i in range(12)]
activity_values = [20 + i * 3 for i in range(12)]
elif period == "7d":
labels = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
volume_values = [500, 600, 550, 700, 800, 650, 750]
activity_values = [200, 250, 220, 300, 350, 280, 320]
else: # 30d
labels = [f"Week {i+1}" for i in range(4)]
volume_values = [3000, 3500, 3200, 3800]
activity_values = [1200, 1400, 1300, 1500]
return {
"total_transactions": "1,234",
"transaction_volume": "5,678.90 AITBC",
"active_addresses": "89",
"avg_block_time": "2.1s",
"volume_data": {
"labels": labels,
"values": volume_values
},
"activity_data": {
"labels": labels,
"values": activity_values
}
}
class RealDataFetcher:
"""Fetches real data from blockchain RPC endpoints"""
async def fetch_transactions(
self,
address: Optional[str] = None,
amount_min: Optional[float] = None,
amount_max: Optional[float] = None,
tx_type: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
limit: int = 50,
offset: int = 0,
chain_id: str = "ait-devnet",
rpc_url: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Fetch real transactions from blockchain RPC"""
if rpc_url is None:
rpc_url = f"http://localhost:8025"
params = {}
if address:
params["address"] = address
if amount_min:
params["amount_min"] = amount_min
if amount_max:
params["amount_max"] = amount_max
if tx_type:
params["type"] = tx_type
if since:
params["since"] = since
if until:
params["until"] = until
params["limit"] = limit
params["offset"] = offset
params["chain_id"] = chain_id
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/search/transactions", params=params)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return []
else:
raise Exception(f"Failed to fetch transactions: {response.status_code}")
async def fetch_blocks(
self,
validator: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
min_tx: Optional[int] = None,
limit: int = 50,
offset: int = 0,
chain_id: str = "ait-devnet",
rpc_url: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Fetch real blocks from blockchain RPC"""
if rpc_url is None:
rpc_url = f"http://localhost:8025"
params = {}
if validator:
params["validator"] = validator
if since:
params["since"] = since
if until:
params["until"] = until
if min_tx:
params["min_tx"] = min_tx
params["limit"] = limit
params["offset"] = offset
params["chain_id"] = chain_id
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/search/blocks", params=params)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return []
else:
raise Exception(f"Failed to fetch blocks: {response.status_code}")
async def fetch_analytics(self, period: str = "24h", rpc_url: Optional[str] = None) -> Dict[str, Any]:
"""Fetch real analytics from blockchain RPC"""
if rpc_url is None:
rpc_url = f"http://localhost:8025"
params = {"period": period}
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/analytics/overview", params=params)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
raise Exception("Analytics endpoint not available")
else:
raise Exception(f"Failed to fetch analytics: {response.status_code}")
# Global data layer instance
_data_layer: Optional[DataLayer] = None
def get_data_layer(use_mock_data: Optional[bool] = None) -> DataLayer:
"""Get or create global data layer instance"""
global _data_layer
if _data_layer is None:
_data_layer = DataLayer(use_mock_data)
return _data_layer

View File

@@ -4,20 +4,28 @@ AITBC Blockchain Explorer - Enhanced Version
Advanced web interface with search, analytics, and export capabilities
"""
import asyncio
import httpx
import json
import csv
import io
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
from fastapi import FastAPI, HTTPException, Request, Query, Response
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from typing import Optional
import os
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import uvicorn
# Import data layer for toggle between mock and real data
try:
from aitbc import get_data_layer
USE_DATA_LAYER = True
except ImportError:
USE_DATA_LAYER = False
app = FastAPI(title="AITBC Blockchain Explorer", version="0.1.0")
# Validation patterns for user inputs to prevent SSRF
@@ -1025,36 +1033,46 @@ async def search_transactions(
):
"""Advanced transaction search"""
try:
# Build query parameters for blockchain node
params = {}
if address:
params["address"] = address
if amount_min:
params["amount_min"] = amount_min
if amount_max:
params["amount_max"] = amount_max
if tx_type:
params["type"] = tx_type
if since:
params["since"] = since
if until:
params["until"] = until
params["limit"] = limit
params["offset"] = offset
params["chain_id"] = chain_id
if USE_DATA_LAYER:
# Use data layer with toggle support
data_layer = get_data_layer()
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
return await data_layer.get_transactions(
address, amount_min, amount_max, tx_type, since, until,
limit, offset, chain_id, rpc_url
)
else:
# Original implementation without data layer
# Build query parameters
params = {}
if address:
params["address"] = address
if amount_min:
params["amount_min"] = amount_min
if amount_max:
params["amount_max"] = amount_max
if tx_type:
params["type"] = tx_type
if since:
params["since"] = since
if until:
params["until"] = until
params["limit"] = limit
params["offset"] = offset
params["chain_id"] = chain_id
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/search/transactions", params=params)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return []
else:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to fetch transactions from blockchain RPC: {response.text}"
)
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/search/transactions", params=params)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return []
else:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to fetch transactions from blockchain RPC: {response.text}"
)
except httpx.RequestError as e:
raise HTTPException(status_code=503, detail=f"Blockchain RPC unavailable: {str(e)}")
except Exception as e:
@@ -1072,32 +1090,40 @@ async def search_blocks(
):
"""Advanced block search"""
try:
# Build query parameters
params = {}
if validator:
params["validator"] = validator
if since:
params["since"] = since
if until:
params["until"] = until
if min_tx:
params["min_tx"] = min_tx
params["limit"] = limit
params["offset"] = offset
params["chain_id"] = chain_id
if USE_DATA_LAYER:
# Use data layer with toggle support
data_layer = get_data_layer()
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
return await data_layer.get_blocks(
validator, since, until, min_tx, limit, offset, chain_id, rpc_url
)
else:
# Original implementation without data layer
params = {}
if validator:
params["validator"] = validator
if since:
params["since"] = since
if until:
params["until"] = until
if min_tx:
params["min_tx"] = min_tx
params["limit"] = limit
params["offset"] = offset
params["chain_id"] = chain_id
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/search/blocks", params=params)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return []
else:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to fetch blocks from blockchain RPC: {response.text}"
)
rpc_url = BLOCKCHAIN_RPC_URLS.get(chain_id, BLOCKCHAIN_RPC_URLS[DEFAULT_CHAIN])
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/search/blocks", params=params)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
return []
else:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to fetch blocks from blockchain RPC: {response.text}"
)
except httpx.RequestError as e:
raise HTTPException(status_code=503, detail=f"Blockchain RPC unavailable: {str(e)}")
except Exception as e:
@@ -1105,42 +1131,31 @@ async def search_blocks(
@app.get("/api/analytics/overview")
async def analytics_overview(period: str = "24h"):
"""Get analytics overview"""
"""Get analytics overview from blockchain RPC"""
try:
# Generate mock analytics data
now = datetime.now()
if USE_DATA_LAYER:
# Use data layer with toggle support
data_layer = get_data_layer()
rpc_url = BLOCKCHAIN_RPC_URLS.get(DEFAULT_CHAIN)
return await data_layer.get_analytics_overview(period, rpc_url)
else:
# Original implementation without data layer
rpc_url = BLOCKCHAIN_RPC_URLS.get(DEFAULT_CHAIN)
params = {"period": period}
if period == "1h":
labels = [f"{i:02d}:{(i*5)%60:02d}" for i in range(12)]
volume_values = [10 + i * 2 for i in range(12)]
activity_values = [5 + i for i in range(12)]
elif period == "24h":
labels = [f"{i:02d}:00" for i in range(0, 24, 2)]
volume_values = [50 + i * 5 for i in range(12)]
activity_values = [20 + i * 3 for i in range(12)]
elif period == "7d":
labels = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
volume_values = [500, 600, 550, 700, 800, 650, 750]
activity_values = [200, 250, 220, 300, 350, 280, 320]
else: # 30d
labels = [f"Week {i+1}" for i in range(4)]
volume_values = [3000, 3500, 3200, 3800]
activity_values = [1200, 1400, 1300, 1500]
return {
"total_transactions": "1,234",
"transaction_volume": "5,678.90 AITBC",
"active_addresses": "89",
"avg_block_time": "2.1s",
"volume_data": {
"labels": labels,
"values": volume_values
},
"activity_data": {
"labels": labels,
"values": activity_values
}
}
async with httpx.AsyncClient() as client:
response = await client.get(f"{rpc_url}/rpc/analytics/overview", params=params)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
raise HTTPException(status_code=501, detail="Analytics endpoint not available on blockchain RPC")
else:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to fetch analytics from blockchain RPC: {response.text}"
)
except httpx.RequestError as e:
raise HTTPException(status_code=503, detail=f"Blockchain RPC unavailable: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Analytics failed: {str(e)}")

View File

@@ -37,8 +37,10 @@ def blocks(ctx, limit: int, from_height: Optional[int], chain_id: str, all_chain
config = ctx.obj['config']
if all_chains:
# Query all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
all_blocks = {}
for chain in chains:
@@ -148,7 +150,10 @@ def block(ctx, block_hash: str, chain_id: str, all_chains: bool):
if all_chains:
# Search for block across all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
block_results = {}
for chain in chains:
@@ -246,7 +251,10 @@ def transaction(ctx, tx_hash: str, chain_id: str, all_chains: bool):
try:
if all_chains:
# Search for transaction across all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
tx_results = {}
for chain in chains:
@@ -328,7 +336,10 @@ def status(ctx, node: int, chain_id: str, all_chains: bool):
try:
if all_chains:
# Get status across all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
all_status = {}
for chain in chains:
@@ -420,7 +431,10 @@ def sync_status(ctx, chain_id: str, all_chains: bool):
try:
if all_chains:
# Get sync status across all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
all_sync_status = {}
for chain in chains:
@@ -505,7 +519,10 @@ def peers(ctx, chain_id: str, all_chains: bool):
if all_chains:
# Get peers across all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
all_peers = {}
for chain in chains:
@@ -594,7 +611,10 @@ def info(ctx, chain_id: str, all_chains: bool):
if all_chains:
# Get info across all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
all_info = {}
for chain in chains:
@@ -690,7 +710,10 @@ def supply(ctx, chain_id: str, all_chains: bool):
if all_chains:
# Get supply across all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
all_supply = {}
for chain in chains:
@@ -773,7 +796,10 @@ def validators(ctx, chain_id: str, all_chains: bool):
if all_chains:
# Get validators across all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
all_validators = {}
for chain in chains:
@@ -959,7 +985,10 @@ def balance(ctx, address, chain_id, all_chains):
if all_chains:
# Query all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
balances = {}
with httpx.Client() as client:
@@ -1180,7 +1209,10 @@ def state(ctx, chain_id: str, all_chains: bool):
try:
if all_chains:
# Get state across all available chains
chains = ['ait-devnet', 'ait-testnet'] # TODO: Get from chain registry
# Query all available chains from chain registry
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
all_state = {}
for chain in chains:

View File

@@ -0,0 +1,329 @@
# Mock Data and Placeholder System Documentation
This document describes the mock data and placeholder systems in AITBC, including the toggle system for development/testing, data layer abstraction, testing utilities, and chain registry configuration.
## Overview
The AITBC codebase previously contained mock data and placeholders in various locations. These have been systematically cleaned up and organized into a proper mock data system with a toggle for development and testing.
## Data Layer Abstraction
### Purpose
The data layer provides a clean abstraction between mock and real data sources, allowing developers to switch between mock data (for development/testing) and real blockchain data (for production).
### Usage
The data layer is implemented in `aitbc/data_layer.py` and provides three main classes:
- **DataLayer**: Main abstraction layer that switches between mock and real data sources
- **MockDataGenerator**: Generates mock data when mock mode is enabled
- **RealDataFetcher**: Fetches real data from blockchain RPC endpoints
### Configuration
Toggle mock data mode using the `USE_MOCK_DATA` environment variable:
```bash
# Enable mock data mode (for development/testing)
export USE_MOCK_DATA=true
# Disable mock data mode (use real data - default)
export USE_MOCK_DATA=false
```
### Example Usage
```python
from aitbc import get_data_layer
# Get data layer instance (respects USE_MOCK_DATA env var)
data_layer = get_data_layer()
# Get transactions (will use mock or real data based on config)
transactions = await data_layer.get_transactions(
address="0x123...",
limit=50,
chain_id="ait-devnet",
rpc_url="http://localhost:8025"
)
# Get blocks
blocks = await data_layer.get_blocks(
validator="0xabc...",
limit=50,
chain_id="ait-devnet"
)
# Get analytics
analytics = await data_layer.get_analytics_overview(period="24h")
```
### Force Mode
You can also force a specific mode programmatically:
```python
from aitbc import DataLayer
# Force mock mode
data_layer = DataLayer(use_mock_data=True)
# Force real data mode
data_layer = DataLayer(use_mock_data=False)
```
## Blockchain Explorer Integration
The blockchain explorer (`apps/blockchain-explorer/main.py`) has been updated to use the data layer when available. It falls back to direct RPC calls if the data layer is not available.
### Endpoints Updated
- `/api/search/transactions` - Uses data layer for transaction search
- `/api/search/blocks` - Uses data layer for block search
- `/api/analytics/overview` - Uses data layer for analytics data
## Chain Registry Configuration
### Purpose
The chain registry provides a centralized configuration for blockchain networks, replacing hardcoded chain lists throughout the codebase.
### Location
Configuration file: `cli/config/chains.py`
### Usage
```python
from cli.config.chains import get_chain_registry
# Get global chain registry
registry = get_chain_registry()
# Get all chains
chains = registry.get_chain_ids()
# Get specific chain
chain = registry.get_chain("ait-devnet")
# Get testnet chains only
testnets = registry.get_testnet_chains()
# Get mainnet chains only
mainnets = registry.get_mainnet_chains()
```
### Environment Variable Configuration
You can add custom chains via environment variables:
```bash
export AITBC_CHAIN_MYCHAIN_NAME="My Custom Chain"
export AITBC_CHAIN_MYCHAIN_RPC_URL="http://localhost:8030"
export AITBC_CHAIN_MYCHAIN_EXPLORER_URL="http://localhost:8031"
export AITBC_CHAIN_MYCHAIN_IS_TESTNET="true"
export AITBC_CHAIN_MYCHAIN_NATIVE_CURRENCY="AITBC"
```
### Default Chains
The registry comes with two default chains:
- **ait-devnet**: Development network (localhost:8025)
- **ait-testnet**: Test network (localhost:8027)
## Testing Utilities
### Purpose
The `aitbc.testing` module provides standardized testing utilities for generating mock data across the codebase.
### Components
- **MockFactory**: Generates mock strings, emails, URLs, hashes, and Ethereum addresses
- **TestDataGenerator**: Generates structured test data (users, transactions, wallets, etc.)
- **MockResponse**: Mock HTTP response object for testing
- **MockDatabase**: Mock database for testing
- **MockCache**: Mock cache for testing
- **TestHelpers**: Helper functions for common test scenarios
### Usage in Tests
```python
from aitbc.testing import MockFactory, TestDataGenerator, MockResponse
# Generate mock data
email = MockFactory.generate_email()
url = MockFactory.generate_url()
eth_address = MockFactory.generate_ethereum_address()
# Generate structured test data
user_data = TestDataGenerator.generate_user_data()
transaction_data = TestDataGenerator.generate_transaction_data()
wallet_data = TestDataGenerator.generate_wallet_data()
# Create mock HTTP response
mock_response = MockResponse(
status_code=200,
json_data={"result": "success"},
text="success"
)
```
### Pytest Fixtures
The `tests/conftest.py` file has been updated with reusable fixtures:
```python
@pytest.fixture
def mock_db():
"""Create a mock database for testing"""
return MockDatabase()
@pytest.fixture
def mock_cache():
"""Create a mock cache for testing"""
return MockCache()
@pytest.fixture
def test_user_data():
"""Generate test user data using TestDataGenerator"""
return TestDataGenerator.generate_user_data()
@pytest.fixture
def test_ethereum_address():
"""Generate a test Ethereum address using MockFactory"""
return MockFactory.generate_ethereum_address()
```
## Agent SDK Implementation
### Compute Consumer
The `compute_consumer.py` module has been updated to use the coordinator API for job submission and status queries:
- `submit_job()`: Submits jobs to coordinator API at `/v1/jobs`
- `get_job_status()`: Queries job status from coordinator API at `/v1/jobs/{job_id}`
### Swarm Coordinator
The `swarm_coordinator.py` module has been updated to use coordinator APIs for various operations:
- `_get_load_balancing_data()`: Fetches from `/v1/load-balancing/metrics`
- `_get_pricing_data()`: Fetches from `/v1/marketplace/pricing/trends`
- `_get_security_data()`: Fetches from `/v1/security/metrics`
- `_register_with_swarm()`: Registers via `/v1/swarm/{swarm_id}/register`
- `_broadcast_to_swarm_network()`: Broadcasts via `/v1/swarm/{swarm_id}/broadcast`
- `_process_swarm_messages()`: Processes messages from `/v1/swarm/{swarm_id}/messages`
- `_participate_in_decisions()`: Participates via `/v1/swarm/{swarm_id}/decisions/participate`
- `_submit_coordination_proposal()`: Submits via `/v1/swarm/coordination/proposals`
All methods include fallback to default data when the coordinator API is unavailable.
## Migration Guide
### For Existing Code
1. **Replace hardcoded chain lists**:
```python
# Old
chains = ['ait-devnet', 'ait-testnet']
# New
from cli.config.chains import get_chain_registry
registry = get_chain_registry()
chains = registry.get_chain_ids()
```
2. **Use data layer for data fetching**:
```python
# Old
response = await client.get(f"{rpc_url}/rpc/transactions")
data = response.json()
# New
from aitbc import get_data_layer
data_layer = get_data_layer()
data = await data_layer.get_transactions(rpc_url=rpc_url)
```
3. **Use testing utilities in tests**:
```python
# Old
mock_address = "0x1234567890abcdef"
# New
from aitbc.testing import MockFactory
mock_address = MockFactory.generate_ethereum_address()
```
### For New Code
1. Always use the chain registry for chain configuration
2. Use the data layer for all data fetching operations
3. Use testing utilities for generating mock data in tests
4. Implement proper error handling with fallbacks when external APIs are unavailable
## Architecture Diagram
```
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ (Blockchain Explorer, CLI, Agent SDK, etc.) │
└────────────────────┬────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Data Layer Abstraction │
│ (aitbc/data_layer.py) │
│ - USE_MOCK_DATA environment variable │
│ - Switches between mock and real data sources │
└──────────┬────────────────────────────┬──────────────────────┘
│ │
▼ ▼
┌──────────────────────┐ ┌──────────────────────────┐
│ MockDataGenerator │ │ RealDataFetcher │
│ (aitbc/data_layer) │ │ (aitbc/data_layer) │
│ - Generates mock │ │ - Fetches from RPC │
│ data for testing │ │ - Blockchain RPC calls │
└──────────────────────┘ └──────────────────────────┘
```
## Best Practices
1. **Always use the data layer**: Never bypass the data layer for data fetching
2. **Test both modes**: Ensure code works with both mock and real data
3. **Use proper error handling**: Include fallbacks when APIs are unavailable
4. **Document mock data**: Clearly indicate when data is mock vs real
5. **Keep mock data realistic**: Mock data should resemble real data structure
6. **Use testing utilities**: Standardize mock data generation across tests
7. **Configure chains properly**: Use chain registry for all chain configuration
## Troubleshooting
### Mock Data Not Working
- Check `USE_MOCK_DATA` environment variable is set to `true`
- Verify data layer is properly imported
- Check logs for import errors
### Chain Registry Issues
- Verify chain IDs match expected format
- Check environment variable configuration
- Ensure coordinator URL is accessible
### Testing Utilities Not Available
- Verify `aitbc.testing` module is in Python path
- Check imports in `tests/conftest.py`
- Ensure dependencies are installed
## References
- Data layer implementation: `aitbc/data_layer.py`
- Chain registry: `cli/config/chains.py`
- Testing utilities: `aitbc/testing.py`
- Blockchain explorer: `apps/blockchain-explorer/main.py`
- Agent SDK: `packages/py/aitbc-agent-sdk/src/aitbc_agent/`

View File

@@ -18,6 +18,9 @@ sys.path.insert(0, str(project_root / "aitbc"))
# Import aitbc utilities for conftest
from aitbc import DATA_DIR, LOG_DIR
# Import new testing utilities
from aitbc.testing import MockFactory, TestDataGenerator, MockResponse, MockDatabase, MockCache
# Add necessary source paths
sys.path.insert(0, str(project_root / "packages" / "py" / "aitbc-core" / "src"))
sys.path.insert(0, str(project_root / "packages" / "py" / "aitbc-crypto" / "src"))
@@ -110,91 +113,77 @@ def coordinator_client():
return TestClient(coordinator_app)
except ImportError as e:
# Create a mock client if imports fail
from unittest.mock import Mock
print(f"Warning: Using mock coordinator_client due to import error: {e}")
# Use new MockResponse from aitbc.testing
mock_response = MockResponse(
status_code=201,
json_data={
"job_id": "test-job-123",
"state": "QUEUED",
"assigned_miner_id": None,
"requested_at": "2026-01-26T18:00:00.000000",
"expires_at": "2026-01-26T18:15:00.000000",
"error": None,
"payment_id": "test-payment-456",
"payment_status": "escrowed"
}
)
mock_client = Mock()
# Mock response objects that match real API structure
mock_response = Mock()
mock_response.status_code = 201
mock_response.json.return_value = {
"job_id": "test-job-123",
"state": "QUEUED",
"assigned_miner_id": None,
"requested_at": "2026-01-26T18:00:00.000000",
"expires_at": "2026-01-26T18:15:00.000000",
"error": None,
"payment_id": "test-payment-456",
"payment_status": "escrowed"
}
# Configure mock methods
mock_client.post.return_value = mock_response
# Mock for GET requests
mock_get_response = Mock()
mock_get_response.status_code = 200
mock_get_response.json.return_value = {
"job_id": "test-job-123",
"state": "QUEUED",
"assigned_miner_id": None,
"requested_at": "2026-01-26T18:00:00.000000",
"expires_at": "2026-01-26T18:15:00.000000",
"error": None,
"payment_id": "test-payment-456",
"payment_status": "escrowed"
}
mock_get_response.text = '{"openapi": "3.0.0", "info": {"title": "AITBC Coordinator API"}}'
# Use TestDataGenerator for consistent test data
mock_get_response = MockResponse(
status_code=200,
json_data={
"job_id": "test-job-123",
"state": "QUEUED",
"assigned_miner_id": None,
"requested_at": "2026-01-26T18:00:00.000000",
"expires_at": "2026-01-26T18:15:00.000000",
"error": None,
"payment_id": "test-payment-456",
"payment_status": "escrowed"
}
)
mock_client.get.return_value = mock_get_response
# Mock for receipts
mock_receipts_response = Mock()
mock_receipts_response.status_code = 200
mock_receipts_response.json.return_value = {
"items": [],
"total": 0
}
mock_receipts_response.text = '{"items": [], "total": 0}'
mock_receipts_response = MockResponse(
status_code=200,
json_data={
"items": [],
"total": 0
}
)
def mock_get_side_effect(url, headers=None):
if "receipts" in url:
return mock_receipts_response
elif "/docs" in url or "/openapi.json" in url:
docs_response = Mock()
docs_response.status_code = 200
docs_response.text = '{"openapi": "3.0.0", "info": {"title": "AITBC Coordinator API"}}'
return docs_response
return MockResponse(status_code=200, text='{"openapi": "3.0.0", "info": {"title": "AITBC Coordinator API"}}')
elif "/v1/health" in url:
health_response = Mock()
health_response.status_code = 200
health_response.json.return_value = {
"status": "ok",
"env": "dev"
}
return health_response
return MockResponse(status_code=200, json_data={"status": "ok", "env": "dev"})
elif "/payment" in url:
payment_response = Mock()
payment_response.status_code = 200
payment_response.json.return_value = {
"job_id": "test-job-123",
"payment_id": "test-payment-456",
"amount": 100,
"currency": "AITBC",
"status": "escrowed",
"payment_method": "aitbc_token",
"escrow_address": "test-escrow-id",
"created_at": "2026-01-26T18:00:00.000000",
"updated_at": "2026-01-26T18:00:00.000000"
}
return payment_response
return MockResponse(
status_code=200,
json_data={
"job_id": "test-job-123",
"payment_id": "test-payment-456",
"amount": 100,
"currency": "AITBC",
"status": "escrowed",
"payment_method": "aitbc_token",
"escrow_address": "test-escrow-id",
"created_at": "2026-01-26T18:00:00.000000",
"updated_at": "2026-01-26T18:00:00.000000"
}
)
return mock_get_response
mock_client.get.side_effect = mock_get_side_effect
mock_client.patch.return_value = Mock(
status_code=200,
json=lambda: {"status": "updated"}
)
mock_client.patch.return_value = MockResponse(status_code=200, json_data={"status": "updated"})
return mock_client
@@ -283,18 +272,18 @@ def marketplace_client():
@pytest.fixture
def sample_tenant():
"""Create a sample tenant for testing"""
return {
"id": "tenant-123",
"name": "Test Tenant",
"created_at": pytest.helpers.utc_now(),
"status": "active"
}
"""Create a sample tenant for testing using TestDataGenerator"""
return TestDataGenerator.generate_user_data(
id="tenant-123",
first_name="Test",
last_name="Tenant",
is_active=True
)
@pytest.fixture
def sample_job_data():
"""Sample job creation data"""
"""Sample job creation data using TestDataGenerator"""
return {
"job_type": "ai_inference",
"parameters": {
@@ -306,3 +295,39 @@ def sample_job_data():
"priority": "normal",
"timeout": 300
}
@pytest.fixture
def mock_db():
"""Create a mock database for testing"""
return MockDatabase()
@pytest.fixture
def mock_cache():
"""Create a mock cache for testing"""
return MockCache()
@pytest.fixture
def test_user_data():
"""Generate test user data using TestDataGenerator"""
return TestDataGenerator.generate_user_data()
@pytest.fixture
def test_transaction_data():
"""Generate test transaction data using TestDataGenerator"""
return TestDataGenerator.generate_transaction_data()
@pytest.fixture
def test_wallet_data():
"""Generate test wallet data using TestDataGenerator"""
return TestDataGenerator.generate_wallet_data()
@pytest.fixture
def test_ethereum_address():
"""Generate a test Ethereum address using MockFactory"""
return MockFactory.generate_ethereum_address()