- Add Prometheus metrics for marketplace API throughput and error rates with new dashboard panels - Implement confidential transaction models with encryption support and access control - Add key management system with registration, rotation, and audit logging - Create services and registry routers for service discovery and management - Integrate ZK proof generation for privacy-preserving receipts - Add metrics instru
278 lines
9.6 KiB
Python
Executable File
278 lines
9.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Blockchain Node Throughput Benchmark
|
|
|
|
This script simulates sustained load on the blockchain node to measure:
|
|
- Transactions per second (TPS)
|
|
- Latency percentiles (p50, p95, p99)
|
|
- CPU and memory usage
|
|
- Queue depth and saturation points
|
|
|
|
Usage:
|
|
python benchmark_throughput.py --concurrent-clients 100 --duration 60 --target-url http://localhost:8080
|
|
"""
|
|
|
|
import asyncio
|
|
import aiohttp
|
|
import time
|
|
import statistics
|
|
import psutil
|
|
import argparse
|
|
import json
|
|
from typing import List, Dict, Any
|
|
from dataclasses import dataclass
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class BenchmarkResult:
|
|
"""Results from a benchmark run"""
|
|
total_transactions: int
|
|
duration: float
|
|
tps: float
|
|
latency_p50: float
|
|
latency_p95: float
|
|
latency_p99: float
|
|
cpu_usage: float
|
|
memory_usage: float
|
|
errors: int
|
|
|
|
|
|
class BlockchainBenchmark:
|
|
"""Benchmark client for blockchain node"""
|
|
|
|
def __init__(self, base_url: str):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.session = None
|
|
|
|
async def __aenter__(self):
|
|
self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30))
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self.session:
|
|
await self.session.close()
|
|
|
|
async def submit_transaction(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Submit a single transaction"""
|
|
start_time = time.time()
|
|
try:
|
|
async with self.session.post(
|
|
f"{self.base_url}/v1/transactions",
|
|
json=payload
|
|
) as response:
|
|
if response.status == 200:
|
|
result = await response.json()
|
|
latency = (time.time() - start_time) * 1000 # ms
|
|
return {"success": True, "latency": latency, "tx_id": result.get("tx_id")}
|
|
else:
|
|
return {"success": False, "error": f"HTTP {response.status}"}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
async def get_block_height(self) -> int:
|
|
"""Get current block height"""
|
|
try:
|
|
async with self.session.get(f"{self.base_url}/v1/blocks/head") as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
return data.get("height", 0)
|
|
except Exception:
|
|
pass
|
|
return 0
|
|
|
|
|
|
def generate_test_transaction(i: int) -> Dict[str, Any]:
|
|
"""Generate a test transaction"""
|
|
return {
|
|
"from": f"0xtest_sender_{i % 100:040x}",
|
|
"to": f"0xtest_receiver_{i % 50:040x}",
|
|
"value": str((i + 1) * 1000),
|
|
"nonce": i,
|
|
"data": f"0x{hash(i) % 1000000:06x}",
|
|
"gas_limit": 21000,
|
|
"gas_price": "1000000000" # 1 gwei
|
|
}
|
|
|
|
|
|
async def worker_task(
|
|
benchmark: BlockchainBenchmark,
|
|
worker_id: int,
|
|
transactions_per_worker: int,
|
|
results: List[Dict[str, Any]]
|
|
) -> None:
|
|
"""Worker task that submits transactions"""
|
|
logger.info(f"Worker {worker_id} starting")
|
|
|
|
for i in range(transactions_per_worker):
|
|
tx = generate_test_transaction(worker_id * transactions_per_worker + i)
|
|
result = await benchmark.submit_transaction(tx)
|
|
results.append(result)
|
|
|
|
if not result["success"]:
|
|
logger.warning(f"Worker {worker_id} transaction failed: {result.get('error', 'unknown')}")
|
|
|
|
logger.info(f"Worker {worker_id} completed")
|
|
|
|
|
|
async def run_benchmark(
|
|
base_url: str,
|
|
concurrent_clients: int,
|
|
duration: int,
|
|
target_tps: int = None
|
|
) -> BenchmarkResult:
|
|
"""Run the benchmark"""
|
|
logger.info(f"Starting benchmark: {concurrent_clients} concurrent clients for {duration}s")
|
|
|
|
# Start resource monitoring
|
|
process = psutil.Process()
|
|
cpu_samples = []
|
|
memory_samples = []
|
|
|
|
async def monitor_resources():
|
|
while True:
|
|
cpu_samples.append(process.cpu_percent())
|
|
memory_samples.append(process.memory_info().rss / 1024 / 1024) # MB
|
|
await asyncio.sleep(1)
|
|
|
|
# Calculate transactions needed
|
|
if target_tps:
|
|
total_transactions = target_tps * duration
|
|
else:
|
|
total_transactions = concurrent_clients * 100 # Default: 100 tx per client
|
|
|
|
transactions_per_worker = total_transactions // concurrent_clients
|
|
results = []
|
|
|
|
async with BlockchainBenchmark(base_url) as benchmark:
|
|
# Start resource monitor
|
|
monitor_task = asyncio.create_task(monitor_resources())
|
|
|
|
# Record start block height
|
|
start_height = await benchmark.get_block_height()
|
|
|
|
# Start benchmark
|
|
start_time = time.time()
|
|
|
|
# Create worker tasks
|
|
tasks = [
|
|
worker_task(benchmark, i, transactions_per_worker, results)
|
|
for i in range(concurrent_clients)
|
|
]
|
|
|
|
# Wait for all tasks to complete or timeout
|
|
try:
|
|
await asyncio.wait_for(asyncio.gather(*tasks), timeout=duration)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Benchmark timed out")
|
|
for task in tasks:
|
|
task.cancel()
|
|
|
|
end_time = time.time()
|
|
actual_duration = end_time - start_time
|
|
|
|
# Stop resource monitor
|
|
monitor_task.cancel()
|
|
|
|
# Get final block height
|
|
end_height = await benchmark.get_block_height()
|
|
|
|
# Calculate metrics
|
|
successful_tx = [r for r in results if r["success"]]
|
|
latencies = [r["latency"] for r in successful_tx if "latency" in r]
|
|
|
|
if latencies:
|
|
latency_p50 = statistics.median(latencies)
|
|
latency_p95 = statistics.quantiles(latencies, n=20)[18] # 95th percentile
|
|
latency_p99 = statistics.quantiles(latencies, n=100)[98] # 99th percentile
|
|
else:
|
|
latency_p50 = latency_p95 = latency_p99 = 0
|
|
|
|
tps = len(successful_tx) / actual_duration if actual_duration > 0 else 0
|
|
avg_cpu = statistics.mean(cpu_samples) if cpu_samples else 0
|
|
avg_memory = statistics.mean(memory_samples) if memory_samples else 0
|
|
errors = len(results) - len(successful_tx)
|
|
|
|
logger.info(f"Benchmark completed:")
|
|
logger.info(f" Duration: {actual_duration:.2f}s")
|
|
logger.info(f" Transactions: {len(successful_tx)} successful, {errors} failed")
|
|
logger.info(f" TPS: {tps:.2f}")
|
|
logger.info(f" Latency p50/p95/p99: {latency_p50:.2f}/{latency_p95:.2f}/{latency_p99:.2f}ms")
|
|
logger.info(f" CPU Usage: {avg_cpu:.1f}%")
|
|
logger.info(f" Memory Usage: {avg_memory:.1f}MB")
|
|
logger.info(f" Blocks processed: {end_height - start_height}")
|
|
|
|
return BenchmarkResult(
|
|
total_transactions=len(successful_tx),
|
|
duration=actual_duration,
|
|
tps=tps,
|
|
latency_p50=latency_p50,
|
|
latency_p95=latency_p95,
|
|
latency_p99=latency_p99,
|
|
cpu_usage=avg_cpu,
|
|
memory_usage=avg_memory,
|
|
errors=errors
|
|
)
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(description="Blockchain Node Throughput Benchmark")
|
|
parser.add_argument("--target-url", default="http://localhost:8080",
|
|
help="Blockchain node RPC URL")
|
|
parser.add_argument("--concurrent-clients", type=int, default=50,
|
|
help="Number of concurrent client connections")
|
|
parser.add_argument("--duration", type=int, default=60,
|
|
help="Benchmark duration in seconds")
|
|
parser.add_argument("--target-tps", type=int,
|
|
help="Target TPS to achieve (calculates transaction count)")
|
|
parser.add_argument("--output", help="Output results to JSON file")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Run benchmark
|
|
result = await run_benchmark(
|
|
base_url=args.target_url,
|
|
concurrent_clients=args.concurrent_clients,
|
|
duration=args.duration,
|
|
target_tps=args.target_tps
|
|
)
|
|
|
|
# Output results
|
|
if args.output:
|
|
with open(args.output, "w") as f:
|
|
json.dump({
|
|
"total_transactions": result.total_transactions,
|
|
"duration": result.duration,
|
|
"tps": result.tps,
|
|
"latency_p50": result.latency_p50,
|
|
"latency_p95": result.latency_p95,
|
|
"latency_p99": result.latency_p99,
|
|
"cpu_usage": result.cpu_usage,
|
|
"memory_usage": result.memory_usage,
|
|
"errors": result.errors
|
|
}, f, indent=2)
|
|
logger.info(f"Results saved to {args.output}")
|
|
|
|
# Provide scaling recommendations
|
|
logger.info("\n=== Scaling Recommendations ===")
|
|
if result.tps < 100:
|
|
logger.info("• Low TPS detected. Consider optimizing transaction processing")
|
|
if result.latency_p95 > 1000:
|
|
logger.info("• High latency detected. Consider increasing resources or optimizing database queries")
|
|
if result.cpu_usage > 80:
|
|
logger.info("• High CPU usage. Horizontal scaling recommended")
|
|
if result.memory_usage > 1024:
|
|
logger.info("• High memory usage. Monitor for memory leaks")
|
|
|
|
logger.info(f"\nRecommended minimum resources for current load:")
|
|
logger.info(f"• CPU: {result.cpu_usage * 1.5:.0f}% (with headroom)")
|
|
logger.info(f"• Memory: {result.memory_usage * 1.5:.0f}MB (with headroom)")
|
|
logger.info(f"• Horizontal scaling threshold: ~{result.tps * 0.7:.0f} TPS per node")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|