- Restructure .env.example with security-focused documentation, service-specific environment file references, and AWS Secrets Manager integration - Update CLI tests workflow to single Python 3.13 version, add pytest-mock dependency, and consolidate test execution with coverage - Add comprehensive security validation to package publishing workflow with manual approval gates, secret scanning, and release
548 lines
21 KiB
Python
548 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AITBC Performance Baseline Testing
|
|
|
|
This script establishes performance baselines for the AITBC platform,
|
|
including API response times, throughput, resource usage, and user experience metrics.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
import statistics
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
import aiohttp
|
|
import psutil
|
|
import subprocess
|
|
import sys
|
|
|
|
|
|
@dataclass
|
|
class PerformanceMetric:
|
|
"""Individual performance measurement."""
|
|
timestamp: float
|
|
metric_name: str
|
|
value: float
|
|
unit: str
|
|
context: Dict[str, Any]
|
|
|
|
|
|
@dataclass
|
|
class BaselineResult:
|
|
"""Performance baseline result."""
|
|
metric_name: str
|
|
baseline_value: float
|
|
unit: str
|
|
samples: int
|
|
min_value: float
|
|
max_value: float
|
|
mean_value: float
|
|
median_value: float
|
|
std_deviation: float
|
|
percentile_95: float
|
|
percentile_99: float
|
|
status: str # "pass", "warning", "fail"
|
|
threshold: Optional[float]
|
|
|
|
|
|
class PerformanceBaseline:
|
|
"""Performance baseline testing system."""
|
|
|
|
def __init__(self, config_path: str = "config/performance_config.json"):
|
|
self.config = self._load_config(config_path)
|
|
self.logger = self._setup_logging()
|
|
self.baselines = self._load_baselines()
|
|
self.current_metrics = []
|
|
|
|
def _load_config(self, config_path: str) -> Dict:
|
|
"""Load performance testing configuration."""
|
|
default_config = {
|
|
"test_duration": 300, # 5 minutes
|
|
"concurrent_users": 10,
|
|
"ramp_up_time": 60, # 1 minute
|
|
"endpoints": {
|
|
"health": "https://api.aitbc.dev/health",
|
|
"users": "https://api.aitbc.dev/api/v1/users",
|
|
"transactions": "https://api.aitbc.dev/api/v1/transactions",
|
|
"blockchain": "https://api.aitbc.dev/api/v1/blockchain/status",
|
|
"marketplace": "https://api.aitbc.dev/api/v1/marketplace/listings"
|
|
},
|
|
"thresholds": {
|
|
"response_time_p95": 2000, # ms
|
|
"response_time_p99": 5000, # ms
|
|
"error_rate": 1.0, # %
|
|
"throughput_min": 100, # requests/second
|
|
"cpu_max": 80, # %
|
|
"memory_max": 85, # %
|
|
"disk_io_max": 100 # MB/s
|
|
},
|
|
"scenarios": {
|
|
"light_load": {"users": 5, "duration": 60},
|
|
"medium_load": {"users": 20, "duration": 120},
|
|
"heavy_load": {"users": 50, "duration": 180},
|
|
"stress_test": {"users": 100, "duration": 300}
|
|
}
|
|
}
|
|
|
|
config_file = Path(config_path)
|
|
if config_file.exists():
|
|
with open(config_file, 'r') as f:
|
|
user_config = json.load(f)
|
|
default_config.update(user_config)
|
|
|
|
return default_config
|
|
|
|
def _setup_logging(self) -> logging.Logger:
|
|
"""Setup logging for performance testing."""
|
|
logger = logging.getLogger("performance_baseline")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
if not logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
return logger
|
|
|
|
def _load_baselines(self) -> Dict:
|
|
"""Load existing baselines."""
|
|
baseline_file = Path("data/performance_baselines.json")
|
|
if baseline_file.exists():
|
|
with open(baseline_file, 'r') as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def _save_baselines(self) -> None:
|
|
"""Save baselines to file."""
|
|
baseline_file = Path("data/performance_baselines.json")
|
|
baseline_file.parent.mkdir(exist_ok=True)
|
|
with open(baseline_file, 'w') as f:
|
|
json.dump(self.baselines, f, indent=2)
|
|
|
|
async def measure_api_response_time(self, endpoint: str, method: str = "GET",
|
|
payload: Dict = None) -> float:
|
|
"""Measure API response time."""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
if method.upper() == "GET":
|
|
async with session.get(endpoint) as response:
|
|
await response.text()
|
|
elif method.upper() == "POST":
|
|
async with session.post(endpoint, json=payload) as response:
|
|
await response.text()
|
|
else:
|
|
raise ValueError(f"Unsupported method: {method}")
|
|
|
|
end_time = time.time()
|
|
return (end_time - start_time) * 1000 # Convert to ms
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error measuring {endpoint}: {e}")
|
|
return -1 # Indicate error
|
|
|
|
async def run_load_test(self, scenario: str) -> Dict[str, Any]:
|
|
"""Run load test scenario."""
|
|
scenario_config = self.config["scenarios"][scenario]
|
|
users = scenario_config["users"]
|
|
duration = scenario_config["duration"]
|
|
|
|
self.logger.info(f"Running {scenario} load test: {users} users for {duration}s")
|
|
|
|
results = {
|
|
"scenario": scenario,
|
|
"users": users,
|
|
"duration": duration,
|
|
"start_time": time.time(),
|
|
"metrics": {},
|
|
"system_metrics": []
|
|
}
|
|
|
|
# Start system monitoring
|
|
monitoring_task = asyncio.create_task(self._monitor_system_resources(results))
|
|
|
|
# Run concurrent requests
|
|
tasks = []
|
|
for i in range(users):
|
|
task = asyncio.create_task(self._simulate_user(duration))
|
|
tasks.append(task)
|
|
|
|
# Wait for all tasks to complete
|
|
user_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Stop monitoring
|
|
monitoring_task.cancel()
|
|
|
|
# Process results
|
|
all_response_times = []
|
|
error_count = 0
|
|
total_requests = 0
|
|
|
|
for user_result in user_results:
|
|
if isinstance(user_result, Exception):
|
|
error_count += 1
|
|
continue
|
|
|
|
for metric in user_result:
|
|
if metric.metric_name == "response_time" and metric.value > 0:
|
|
all_response_times.append(metric.value)
|
|
elif metric.metric_name == "error":
|
|
error_count += 1
|
|
total_requests += 1
|
|
|
|
# Calculate statistics
|
|
if all_response_times:
|
|
results["metrics"]["response_time"] = {
|
|
"samples": len(all_response_times),
|
|
"min": min(all_response_times),
|
|
"max": max(all_response_times),
|
|
"mean": statistics.mean(all_response_times),
|
|
"median": statistics.median(all_response_times),
|
|
"std_dev": statistics.stdev(all_response_times) if len(all_response_times) > 1 else 0,
|
|
"p95": self._percentile(all_response_times, 95),
|
|
"p99": self._percentile(all_response_times, 99)
|
|
}
|
|
|
|
results["metrics"]["error_rate"] = (error_count / total_requests * 100) if total_requests > 0 else 0
|
|
results["metrics"]["throughput"] = total_requests / duration
|
|
results["end_time"] = time.time()
|
|
|
|
return results
|
|
|
|
async def _simulate_user(self, duration: int) -> List[PerformanceMetric]:
|
|
"""Simulate a single user's activity."""
|
|
metrics = []
|
|
end_time = time.time() + duration
|
|
|
|
endpoints = list(self.config["endpoints"].keys())
|
|
|
|
while time.time() < end_time:
|
|
# Random endpoint selection
|
|
endpoint_name = endpoints[hash(str(time.time())) % len(endpoints)]
|
|
endpoint_url = self.config["endpoints"][endpoint_name]
|
|
|
|
# Measure response time
|
|
response_time = await self.measure_api_response_time(endpoint_url)
|
|
|
|
if response_time > 0:
|
|
metrics.append(PerformanceMetric(
|
|
timestamp=time.time(),
|
|
metric_name="response_time",
|
|
value=response_time,
|
|
unit="ms",
|
|
context={"endpoint": endpoint_name}
|
|
))
|
|
else:
|
|
metrics.append(PerformanceMetric(
|
|
timestamp=time.time(),
|
|
metric_name="error",
|
|
value=1,
|
|
unit="count",
|
|
context={"endpoint": endpoint_name}
|
|
))
|
|
|
|
# Random think time (1-5 seconds)
|
|
await asyncio.sleep(1 + (hash(str(time.time())) % 5))
|
|
|
|
return metrics
|
|
|
|
async def _monitor_system_resources(self, results: Dict) -> None:
|
|
"""Monitor system resources during test."""
|
|
try:
|
|
while True:
|
|
# Collect system metrics
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
memory = psutil.virtual_memory()
|
|
disk_io = psutil.disk_io_counters()
|
|
|
|
system_metric = {
|
|
"timestamp": time.time(),
|
|
"cpu_percent": cpu_percent,
|
|
"memory_percent": memory.percent,
|
|
"disk_read_bytes": disk_io.read_bytes,
|
|
"disk_write_bytes": disk_io.write_bytes
|
|
}
|
|
|
|
results["system_metrics"].append(system_metric)
|
|
|
|
await asyncio.sleep(5) # Sample every 5 seconds
|
|
|
|
except asyncio.CancelledError:
|
|
self.logger.info("System monitoring stopped")
|
|
except Exception as e:
|
|
self.logger.error(f"Error in system monitoring: {e}")
|
|
|
|
def _percentile(self, values: List[float], percentile: float) -> float:
|
|
"""Calculate percentile of values."""
|
|
if not values:
|
|
return 0
|
|
|
|
sorted_values = sorted(values)
|
|
index = (percentile / 100) * (len(sorted_values) - 1)
|
|
|
|
if index.is_integer():
|
|
return sorted_values[int(index)]
|
|
else:
|
|
lower = sorted_values[int(index)]
|
|
upper = sorted_values[int(index) + 1]
|
|
return lower + (upper - lower) * (index - int(index))
|
|
|
|
async def establish_baseline(self, scenario: str) -> BaselineResult:
|
|
"""Establish performance baseline for a scenario."""
|
|
self.logger.info(f"Establishing baseline for {scenario}")
|
|
|
|
# Run load test
|
|
test_results = await self.run_load_test(scenario)
|
|
|
|
# Extract key metrics
|
|
response_time_data = test_results["metrics"].get("response_time", {})
|
|
error_rate = test_results["metrics"].get("error_rate", 0)
|
|
throughput = test_results["metrics"].get("throughput", 0)
|
|
|
|
# Create baseline result for response time
|
|
if response_time_data:
|
|
baseline = BaselineResult(
|
|
metric_name=f"{scenario}_response_time_p95",
|
|
baseline_value=response_time_data["p95"],
|
|
unit="ms",
|
|
samples=response_time_data["samples"],
|
|
min_value=response_time_data["min"],
|
|
max_value=response_time_data["max"],
|
|
mean_value=response_time_data["mean"],
|
|
median_value=response_time_data["median"],
|
|
std_deviation=response_time_data["std_dev"],
|
|
percentile_95=response_time_data["p95"],
|
|
percentile_99=response_time_data["p99"],
|
|
status="pass",
|
|
threshold=self.config["thresholds"]["response_time_p95"]
|
|
)
|
|
|
|
# Check against threshold
|
|
if baseline.percentile_95 > baseline.threshold:
|
|
baseline.status = "fail"
|
|
elif baseline.percentile_95 > baseline.threshold * 0.8:
|
|
baseline.status = "warning"
|
|
|
|
# Store baseline
|
|
self.baselines[f"{scenario}_response_time_p95"] = asdict(baseline)
|
|
self._save_baselines()
|
|
|
|
return baseline
|
|
|
|
return None
|
|
|
|
async def compare_with_baseline(self, scenario: str) -> Dict[str, Any]:
|
|
"""Compare current performance with established baseline."""
|
|
self.logger.info(f"Comparing {scenario} with baseline")
|
|
|
|
# Run current test
|
|
current_results = await self.run_load_test(scenario)
|
|
|
|
# Get baseline
|
|
baseline_key = f"{scenario}_response_time_p95"
|
|
baseline_data = self.baselines.get(baseline_key)
|
|
|
|
if not baseline_data:
|
|
return {"error": "No baseline found for scenario"}
|
|
|
|
comparison = {
|
|
"scenario": scenario,
|
|
"baseline": baseline_data,
|
|
"current": current_results["metrics"],
|
|
"comparison": {},
|
|
"status": "unknown"
|
|
}
|
|
|
|
# Compare response times
|
|
current_p95 = current_results["metrics"].get("response_time", {}).get("p95", 0)
|
|
baseline_p95 = baseline_data["baseline_value"]
|
|
|
|
if current_p95 > 0:
|
|
percent_change = ((current_p95 - baseline_p95) / baseline_p95) * 100
|
|
comparison["comparison"]["response_time_p95"] = {
|
|
"baseline": baseline_p95,
|
|
"current": current_p95,
|
|
"percent_change": percent_change,
|
|
"status": "pass" if percent_change < 10 else "warning" if percent_change < 25 else "fail"
|
|
}
|
|
|
|
# Compare error rates
|
|
current_error_rate = current_results["metrics"].get("error_rate", 0)
|
|
baseline_error_rate = baseline_data.get("error_rate", 0)
|
|
|
|
error_change = current_error_rate - baseline_error_rate
|
|
comparison["comparison"]["error_rate"] = {
|
|
"baseline": baseline_error_rate,
|
|
"current": current_error_rate,
|
|
"change": error_change,
|
|
"status": "pass" if error_change < 0.5 else "warning" if error_change < 2.0 else "fail"
|
|
}
|
|
|
|
# Compare throughput
|
|
current_throughput = current_results["metrics"].get("throughput", 0)
|
|
baseline_throughput = baseline_data.get("throughput", 0)
|
|
|
|
if baseline_throughput > 0:
|
|
throughput_change = ((current_throughput - baseline_throughput) / baseline_throughput) * 100
|
|
comparison["comparison"]["throughput"] = {
|
|
"baseline": baseline_throughput,
|
|
"current": current_throughput,
|
|
"percent_change": throughput_change,
|
|
"status": "pass" if throughput_change > -10 else "warning" if throughput_change > -25 else "fail"
|
|
}
|
|
|
|
# Overall status
|
|
statuses = [cmp.get("status") for cmp in comparison["comparison"].values()]
|
|
if "fail" in statuses:
|
|
comparison["status"] = "fail"
|
|
elif "warning" in statuses:
|
|
comparison["status"] = "warning"
|
|
else:
|
|
comparison["status"] = "pass"
|
|
|
|
return comparison
|
|
|
|
async def run_all_scenarios(self) -> Dict[str, Any]:
|
|
"""Run all performance test scenarios."""
|
|
results = {}
|
|
|
|
for scenario in self.config["scenarios"].keys():
|
|
try:
|
|
self.logger.info(f"Running scenario: {scenario}")
|
|
|
|
# Establish baseline if not exists
|
|
if f"{scenario}_response_time_p95" not in self.baselines:
|
|
baseline = await self.establish_baseline(scenario)
|
|
results[scenario] = {"baseline": asdict(baseline)}
|
|
else:
|
|
# Compare with existing baseline
|
|
comparison = await self.compare_with_baseline(scenario)
|
|
results[scenario] = comparison
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error running scenario {scenario}: {e}")
|
|
results[scenario] = {"error": str(e)}
|
|
|
|
return results
|
|
|
|
async def generate_performance_report(self) -> Dict[str, Any]:
|
|
"""Generate comprehensive performance report."""
|
|
self.logger.info("Generating performance report")
|
|
|
|
# Run all scenarios
|
|
scenario_results = await self.run_all_scenarios()
|
|
|
|
# Calculate overall metrics
|
|
total_scenarios = len(scenario_results)
|
|
passed_scenarios = len([r for r in scenario_results.values() if r.get("status") == "pass"])
|
|
warning_scenarios = len([r for r in scenario_results.values() if r.get("status") == "warning"])
|
|
failed_scenarios = len([r for r in scenario_results.values() if r.get("status") == "fail"])
|
|
|
|
report = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"summary": {
|
|
"total_scenarios": total_scenarios,
|
|
"passed": passed_scenarios,
|
|
"warnings": warning_scenarios,
|
|
"failed": failed_scenarios,
|
|
"success_rate": (passed_scenarios / total_scenarios * 100) if total_scenarios > 0 else 0,
|
|
"overall_status": "pass" if failed_scenarios == 0 else "warning" if failed_scenarios == 0 else "fail"
|
|
},
|
|
"scenarios": scenario_results,
|
|
"baselines": self.baselines,
|
|
"thresholds": self.config["thresholds"],
|
|
"recommendations": self._generate_recommendations(scenario_results)
|
|
}
|
|
|
|
# Save report
|
|
report_file = Path("data/performance_report.json")
|
|
report_file.parent.mkdir(exist_ok=True)
|
|
with open(report_file, 'w') as f:
|
|
json.dump(report, f, indent=2)
|
|
|
|
return report
|
|
|
|
def _generate_recommendations(self, scenario_results: Dict) -> List[str]:
|
|
"""Generate performance recommendations."""
|
|
recommendations = []
|
|
|
|
for scenario, result in scenario_results.items():
|
|
if result.get("status") == "fail":
|
|
recommendations.append(f"URGENT: {scenario} scenario failed performance tests")
|
|
elif result.get("status") == "warning":
|
|
recommendations.append(f"Review {scenario} scenario performance degradation")
|
|
|
|
# Check for common issues
|
|
high_response_times = []
|
|
high_error_rates = []
|
|
|
|
for scenario, result in scenario_results.items():
|
|
if "comparison" in result:
|
|
comp = result["comparison"]
|
|
if comp.get("response_time_p95", {}).get("status") == "fail":
|
|
high_response_times.append(scenario)
|
|
if comp.get("error_rate", {}).get("status") == "fail":
|
|
high_error_rates.append(scenario)
|
|
|
|
if high_response_times:
|
|
recommendations.append(f"High response times detected in: {', '.join(high_response_times)}")
|
|
|
|
if high_error_rates:
|
|
recommendations.append(f"High error rates detected in: {', '.join(high_error_rates)}")
|
|
|
|
if not recommendations:
|
|
recommendations.append("All performance tests passed. System is performing within expected parameters.")
|
|
|
|
return recommendations
|
|
|
|
|
|
# CLI interface
|
|
async def main():
|
|
"""Main CLI interface."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="AITBC Performance Baseline Testing")
|
|
parser.add_argument("--scenario", help="Run specific scenario")
|
|
parser.add_argument("--baseline", help="Establish baseline for scenario")
|
|
parser.add_argument("--compare", help="Compare scenario with baseline")
|
|
parser.add_argument("--all", action="store_true", help="Run all scenarios")
|
|
parser.add_argument("--report", action="store_true", help="Generate performance report")
|
|
|
|
args = parser.parse_args()
|
|
|
|
baseline = PerformanceBaseline()
|
|
|
|
if args.scenario:
|
|
if args.baseline:
|
|
result = await baseline.establish_baseline(args.scenario)
|
|
print(f"Baseline established: {result}")
|
|
elif args.compare:
|
|
comparison = await baseline.compare_with_baseline(args.scenario)
|
|
print(json.dumps(comparison, indent=2))
|
|
else:
|
|
result = await baseline.run_load_test(args.scenario)
|
|
print(json.dumps(result, indent=2, default=str))
|
|
|
|
elif args.all:
|
|
results = await baseline.run_all_scenarios()
|
|
print(json.dumps(results, indent=2, default=str))
|
|
|
|
elif args.report:
|
|
report = await baseline.generate_performance_report()
|
|
print(json.dumps(report, indent=2))
|
|
|
|
else:
|
|
print("Use --help to see available options")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|