Files
aitbc/tests/performance/test_performance_benchmarks.py
oib 15427c96c0 chore: update file permissions to executable across repository
- Change file mode from 644 to 755 for all project files
- Add chain_id parameter to get_balance RPC endpoint with default "ait-devnet"
- Rename Miner.extra_meta_data to extra_metadata for consistency
2026-03-06 22:17:54 +01:00

513 lines
19 KiB
Python
Executable File

"""
Performance Benchmark Tests for AITBC
Tests system performance under various loads and conditions
"""
import pytest
import time
import asyncio
import threading
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
from concurrent.futures import ThreadPoolExecutor
import statistics
class TestAPIPerformance:
"""Test API endpoint performance"""
def test_response_time_benchmarks(self):
"""Test API response time benchmarks"""
# Mock API client
client = Mock()
# Simulate different response times
response_times = [0.05, 0.08, 0.12, 0.06, 0.09, 0.11, 0.07, 0.10]
# Calculate performance metrics
avg_response_time = statistics.mean(response_times)
max_response_time = max(response_times)
min_response_time = min(response_times)
# Performance assertions
assert avg_response_time < 0.1 # Average should be under 100ms
assert max_response_time < 0.2 # Max should be under 200ms
assert min_response_time > 0.01 # Should be reasonable minimum
# Test performance thresholds
performance_thresholds = {
'excellent': 0.05, # < 50ms
'good': 0.1, # < 100ms
'acceptable': 0.2, # < 200ms
'poor': 0.5 # > 500ms
}
# Classify performance
if avg_response_time < performance_thresholds['excellent']:
performance_rating = 'excellent'
elif avg_response_time < performance_thresholds['good']:
performance_rating = 'good'
elif avg_response_time < performance_thresholds['acceptable']:
performance_rating = 'acceptable'
else:
performance_rating = 'poor'
assert performance_rating in ['excellent', 'good', 'acceptable']
def test_concurrent_request_handling(self):
"""Test handling of concurrent requests"""
# Mock API endpoint
def mock_api_call(request_id):
time.sleep(0.01) # Simulate 10ms processing time
return {'request_id': request_id, 'status': 'success'}
# Test concurrent execution
num_requests = 50
start_time = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(mock_api_call, i)
for i in range(num_requests)
]
results = [future.result() for future in futures]
end_time = time.time()
total_time = end_time - start_time
# Performance assertions
assert len(results) == num_requests
assert all(result['status'] == 'success' for result in results)
assert total_time < 1.0 # Should complete in under 1 second
# Calculate throughput
throughput = num_requests / total_time
assert throughput > 50 # Should handle at least 50 requests per second
def test_memory_usage_under_load(self):
"""Test memory usage under load"""
import psutil
import os
# Get initial memory usage
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# Simulate memory-intensive operations
data_store = []
for i in range(1000):
data_store.append({
'id': i,
'data': 'x' * 1000, # 1KB per item
'timestamp': datetime.utcnow().isoformat()
})
# Get peak memory usage
peak_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_increase = peak_memory - initial_memory
# Memory assertions
assert memory_increase < 100 # Should not increase by more than 100MB
assert len(data_store) == 1000
# Cleanup
del data_store
class TestDatabasePerformance:
"""Test database operation performance"""
def test_query_performance(self):
"""Test database query performance"""
# Mock database operations
def mock_query(query_type):
if query_type == 'simple':
time.sleep(0.001) # 1ms
elif query_type == 'complex':
time.sleep(0.01) # 10ms
elif query_type == 'aggregate':
time.sleep(0.05) # 50ms
return {'results': ['data'], 'query_type': query_type}
# Test different query types
query_types = ['simple', 'complex', 'aggregate']
query_times = {}
for query_type in query_types:
start_time = time.time()
result = mock_query(query_type)
end_time = time.time()
query_times[query_type] = end_time - start_time
assert result['query_type'] == query_type
# Performance assertions
assert query_times['simple'] < 0.005 # < 5ms
assert query_times['complex'] < 0.02 # < 20ms
assert query_times['aggregate'] < 0.1 # < 100ms
def test_batch_operation_performance(self):
"""Test batch operation performance"""
# Mock batch insert
def mock_batch_insert(items):
time.sleep(len(items) * 0.001) # 1ms per item
return {'inserted_count': len(items)}
# Test different batch sizes
batch_sizes = [10, 50, 100, 500]
performance_results = {}
for batch_size in batch_sizes:
items = [{'id': i, 'data': f'item_{i}'} for i in range(batch_size)]
start_time = time.time()
result = mock_batch_insert(items)
end_time = time.time()
performance_results[batch_size] = {
'time': end_time - start_time,
'throughput': batch_size / (end_time - start_time)
}
assert result['inserted_count'] == batch_size
# Performance analysis
for batch_size, metrics in performance_results.items():
assert metrics['throughput'] > 100 # Should handle at least 100 items/second
assert metrics['time'] < 5.0 # Should complete in under 5 seconds
def test_connection_pool_performance(self):
"""Test database connection pool performance"""
# Mock connection pool
class MockConnectionPool:
def __init__(self, max_connections=10):
self.max_connections = max_connections
self.active_connections = 0
self.lock = threading.Lock()
def get_connection(self):
with self.lock:
if self.active_connections < self.max_connections:
self.active_connections += 1
return MockConnection()
else:
raise Exception("Connection pool exhausted")
def release_connection(self, conn):
with self.lock:
self.active_connections -= 1
class MockConnection:
def execute(self, query):
time.sleep(0.01) # 10ms query time
return {'result': 'success'}
# Test connection pool under load
pool = MockConnectionPool(max_connections=5)
def worker_task():
try:
conn = pool.get_connection()
result = conn.execute("SELECT * FROM test")
pool.release_connection(conn)
return result
except Exception as e:
return {'error': str(e)}
# Test concurrent access
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(worker_task) for _ in range(20)]
results = [future.result() for future in futures]
# Analyze results
successful_results = [r for r in results if 'error' not in r]
error_results = [r for r in results if 'error' in r]
# Should have some successful and some error results (pool exhaustion)
assert len(successful_results) > 0
assert len(error_results) > 0
assert len(successful_results) + len(error_results) == 20
class TestBlockchainPerformance:
"""Test blockchain operation performance"""
def test_transaction_processing_speed(self):
"""Test transaction processing speed"""
# Mock transaction processing
def mock_process_transaction(tx):
processing_time = 0.1 + (len(tx['data']) * 0.001) # Base 100ms + data size
time.sleep(processing_time)
return {
'tx_hash': f'0x{hash(str(tx)) % 1000000:x}',
'processing_time': processing_time
}
# Test transactions of different sizes
transactions = [
{'data': 'small', 'amount': 1.0},
{'data': 'x' * 100, 'amount': 10.0}, # 100 bytes
{'data': 'x' * 1000, 'amount': 100.0}, # 1KB
{'data': 'x' * 10000, 'amount': 1000.0}, # 10KB
]
processing_times = []
for tx in transactions:
start_time = time.time()
result = mock_process_transaction(tx)
end_time = time.time()
processing_times.append(result['processing_time'])
assert 'tx_hash' in result
assert result['processing_time'] > 0
# Performance assertions
assert processing_times[0] < 0.2 # Small transaction < 200ms
assert processing_times[-1] < 1.0 # Large transaction < 1 second
def test_block_validation_performance(self):
"""Test block validation performance"""
# Mock block validation
def mock_validate_block(block):
num_transactions = len(block['transactions'])
validation_time = num_transactions * 0.01 # 10ms per transaction
time.sleep(validation_time)
return {
'valid': True,
'validation_time': validation_time,
'transactions_validated': num_transactions
}
# Test blocks with different transaction counts
blocks = [
{'transactions': [f'tx_{i}' for i in range(10)]}, # 10 transactions
{'transactions': [f'tx_{i}' for i in range(50)]}, # 50 transactions
{'transactions': [f'tx_{i}' for i in range(100)]}, # 100 transactions
]
validation_results = []
for block in blocks:
start_time = time.time()
result = mock_validate_block(block)
end_time = time.time()
validation_results.append(result)
assert result['valid'] is True
assert result['transactions_validated'] == len(block['transactions'])
# Performance analysis
for i, result in enumerate(validation_results):
expected_time = len(blocks[i]['transactions']) * 0.01
assert abs(result['validation_time'] - expected_time) < 0.01
def test_sync_performance(self):
"""Test blockchain sync performance"""
# Mock blockchain sync
def mock_sync_blocks(start_block, end_block):
num_blocks = end_block - start_block
sync_time = num_blocks * 0.05 # 50ms per block
time.sleep(sync_time)
return {
'synced_blocks': num_blocks,
'sync_time': sync_time,
'blocks_per_second': num_blocks / sync_time
}
# Test different sync ranges
sync_ranges = [
(1000, 1010), # 10 blocks
(1000, 1050), # 50 blocks
(1000, 1100), # 100 blocks
]
sync_results = []
for start, end in sync_ranges:
result = mock_sync_blocks(start, end)
sync_results.append(result)
assert result['synced_blocks'] == (end - start)
assert result['blocks_per_second'] > 10 # Should sync at least 10 blocks/second
# Performance consistency
sync_rates = [result['blocks_per_second'] for result in sync_results]
avg_sync_rate = statistics.mean(sync_rates)
assert avg_sync_rate > 15 # Average should be at least 15 blocks/second
class TestSystemResourcePerformance:
"""Test system resource utilization"""
def test_cpu_utilization(self):
"""Test CPU utilization under load"""
import psutil
import os
# Get initial CPU usage
initial_cpu = psutil.cpu_percent(interval=0.1)
# CPU-intensive task
def cpu_intensive_task():
result = 0
for i in range(1000000):
result += i * i
return result
# Run CPU-intensive task
start_time = time.time()
cpu_intensive_task()
end_time = time.time()
# Get CPU usage during task
cpu_usage = psutil.cpu_percent(interval=0.1)
# Performance assertions
execution_time = end_time - start_time
assert execution_time < 5.0 # Should complete in under 5 seconds
assert cpu_usage > 0 # Should show CPU usage
def test_disk_io_performance(self):
"""Test disk I/O performance"""
import tempfile
from pathlib import Path
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Test write performance
test_data = 'x' * (1024 * 1024) # 1MB of data
write_times = []
for i in range(10):
file_path = temp_path / f"test_file_{i}.txt"
start_time = time.time()
with open(file_path, 'w') as f:
f.write(test_data)
end_time = time.time()
write_times.append(end_time - start_time)
# Test read performance
read_times = []
for i in range(10):
file_path = temp_path / f"test_file_{i}.txt"
start_time = time.time()
with open(file_path, 'r') as f:
data = f.read()
end_time = time.time()
read_times.append(end_time - start_time)
assert len(data) == len(test_data)
# Performance analysis
avg_write_time = statistics.mean(write_times)
avg_read_time = statistics.mean(read_times)
assert avg_write_time < 0.1 # Write should be under 100ms per MB
assert avg_read_time < 0.05 # Read should be under 50ms per MB
def test_network_performance(self):
"""Test network I/O performance"""
# Mock network operations
def mock_network_request(size_kb):
# Simulate network latency and bandwidth
latency = 0.01 # 10ms latency
bandwidth_time = size_kb / 1000 # 1MB/s bandwidth
total_time = latency + bandwidth_time
time.sleep(total_time)
return {'size': size_kb, 'time': total_time}
# Test different request sizes
request_sizes = [10, 100, 1000] # KB
network_results = []
for size in request_sizes:
result = mock_network_request(size)
network_results.append(result)
assert result['size'] == size
assert result['time'] > 0
# Performance analysis
throughputs = [size / result['time'] for size, result in zip(request_sizes, network_results)]
avg_throughput = statistics.mean(throughputs)
assert avg_throughput > 500 # Should achieve at least 500 KB/s
class TestScalabilityMetrics:
"""Test system scalability metrics"""
def test_load_scaling(self):
"""Test system behavior under increasing load"""
# Mock system under different loads
def mock_system_load(load_factor):
# Simulate increasing response times with load
base_response_time = 0.1
load_response_time = base_response_time * (1 + load_factor * 0.1)
time.sleep(load_response_time)
return {
'load_factor': load_factor,
'response_time': load_response_time,
'throughput': 1 / load_response_time
}
# Test different load factors
load_factors = [1, 2, 5, 10] # 1x, 2x, 5x, 10x load
scaling_results = []
for load in load_factors:
result = mock_system_load(load)
scaling_results.append(result)
assert result['load_factor'] == load
assert result['response_time'] > 0
assert result['throughput'] > 0
# Scalability analysis
response_times = [r['response_time'] for r in scaling_results]
throughputs = [r['throughput'] for r in scaling_results]
# Check that response times increase reasonably
assert response_times[-1] < response_times[0] * 5 # Should not be 5x slower at 10x load
# Check that throughput degrades gracefully
assert throughputs[-1] > throughputs[0] / 5 # Should maintain at least 20% of peak throughput
def test_resource_efficiency(self):
"""Test resource efficiency metrics"""
# Mock resource usage
def mock_resource_usage(requests_per_second):
# Simulate resource usage scaling
cpu_usage = min(90, requests_per_second * 2) # 2% CPU per request/sec
memory_usage = min(80, 50 + requests_per_second * 0.5) # Base 50% + 0.5% per request/sec
return {
'requests_per_second': requests_per_second,
'cpu_usage': cpu_usage,
'memory_usage': memory_usage,
'efficiency': requests_per_second / max(cpu_usage, memory_usage)
}
# Test different request rates
request_rates = [10, 25, 50, 100] # requests per second
efficiency_results = []
for rate in request_rates:
result = mock_resource_usage(rate)
efficiency_results.append(result)
assert result['requests_per_second'] == rate
assert result['cpu_usage'] <= 100
assert result['memory_usage'] <= 100
# Efficiency analysis
efficiencies = [r['efficiency'] for r in efficiency_results]
max_efficiency = max(efficiencies)
assert max_efficiency > 1.0 # Should achieve reasonable efficiency