""" Marketplace Analytics System Tests Comprehensive testing for analytics, insights, reporting, and dashboards """ import pytest import json import statistics from datetime import datetime, timedelta from unittest.mock import Mock, patch from typing import Dict, Any, List class TestMarketplaceAnalytics: """Test marketplace analytics functionality""" def test_market_metrics_calculation(self): """Test market metrics calculation""" # Sample market data market_data = [ {'price': 0.10, 'gpu_type': 'RTX 3080', 'timestamp': '2024-01-01T10:00:00Z'}, {'price': 0.12, 'gpu_type': 'RTX 3080', 'timestamp': '2024-01-01T11:00:00Z'}, {'price': 0.11, 'gpu_type': 'RTX 3080', 'timestamp': '2024-01-01T12:00:00Z'}, {'price': 0.15, 'gpu_type': 'RTX 3090', 'timestamp': '2024-01-01T10:00:00Z'}, {'price': 0.14, 'gpu_type': 'RTX 3090', 'timestamp': '2024-01-01T11:00:00Z'}, ] # Calculate metrics rtx3080_prices = [d['price'] for d in market_data if d['gpu_type'] == 'RTX 3080'] rtx3090_prices = [d['price'] for d in market_data if d['gpu_type'] == 'RTX 3090'] # Calculate statistics metrics = { 'RTX 3080': { 'avg_price': statistics.mean(rtx3080_prices), 'min_price': min(rtx3080_prices), 'max_price': max(rtx3080_prices), 'price_volatility': statistics.stdev(rtx3080_prices) if len(rtx3080_prices) > 1 else 0 }, 'RTX 3090': { 'avg_price': statistics.mean(rtx3090_prices), 'min_price': min(rtx3090_prices), 'max_price': max(rtx3090_prices), 'price_volatility': statistics.stdev(rtx3090_prices) if len(rtx3090_prices) > 1 else 0 } } # Validate metrics assert metrics['RTX 3080']['avg_price'] == 0.11 assert metrics['RTX 3080']['min_price'] == 0.10 assert metrics['RTX 3080']['max_price'] == 0.12 assert metrics['RTX 3090']['avg_price'] == 0.145 assert metrics['RTX 3090']['min_price'] == 0.14 assert metrics['RTX 3090']['max_price'] == 0.15 def test_demand_analysis(self): """Test demand analysis functionality""" # Sample demand data demand_data = [ {'date': '2024-01-01', 'requests': 120, 'fulfilled': 100}, {'date': '2024-01-02', 'requests': 150, 'fulfilled': 130}, {'date': '2024-01-03', 'requests': 180, 'fulfilled': 160}, {'date': '2024-01-04', 'requests': 140, 'fulfilled': 125}, ] # Calculate demand metrics total_requests = sum(d['requests'] for d in demand_data) total_fulfilled = sum(d['fulfilled'] for d in demand_data) fulfillment_rate = (total_fulfilled / total_requests) * 100 # Calculate trend daily_rates = [(d['fulfilled'] / d['requests']) * 100 for d in demand_data] trend = 'increasing' if daily_rates[-1] > daily_rates[0] else 'decreasing' # Validate analysis assert total_requests == 590 assert total_fulfilled == 515 assert fulfillment_rate == 87.29 # Approximately assert trend == 'increasing' assert all(0 <= rate <= 100 for rate in daily_rates) def test_provider_performance(self): """Test provider performance analytics""" # Sample provider data provider_data = [ { 'provider_id': 'provider_1', 'total_jobs': 50, 'completed_jobs': 45, 'avg_completion_time': 25.5, # minutes 'avg_rating': 4.8, 'gpu_types': ['RTX 3080', 'RTX 3090'] }, { 'provider_id': 'provider_2', 'total_jobs': 30, 'completed_jobs': 28, 'avg_completion_time': 30.2, 'avg_rating': 4.6, 'gpu_types': ['RTX 3080'] }, { 'provider_id': 'provider_3', 'total_jobs': 40, 'completed_jobs': 35, 'avg_completion_time': 22.1, 'avg_rating': 4.9, 'gpu_types': ['RTX 3090', 'RTX 4090'] } ] # Calculate performance metrics for provider in provider_data: success_rate = (provider['completed_jobs'] / provider['total_jobs']) * 100 provider['success_rate'] = success_rate # Sort by performance top_providers = sorted(provider_data, key=lambda x: x['success_rate'], reverse=True) # Validate calculations assert top_providers[0]['provider_id'] == 'provider_1' assert top_providers[0]['success_rate'] == 90.0 assert top_providers[1]['success_rate'] == 93.33 # provider_2 assert top_providers[2]['success_rate'] == 87.5 # provider_3 # Validate data integrity for provider in provider_data: assert 0 <= provider['success_rate'] <= 100 assert provider['avg_rating'] >= 0 and provider['avg_rating'] <= 5 assert provider['avg_completion_time'] > 0 class TestAnalyticsEngine: """Test analytics engine functionality""" def test_data_aggregation(self): """Test data aggregation capabilities""" # Sample time series data time_series_data = [ {'timestamp': '2024-01-01T00:00:00Z', 'value': 100}, {'timestamp': '2024-01-01T01:00:00Z', 'value': 110}, {'timestamp': '2024-01-01T02:00:00Z', 'value': 105}, {'timestamp': '2024-01-01T03:00:00Z', 'value': 120}, {'timestamp': '2024-01-01T04:00:00Z', 'value': 115}, ] # Aggregate by hour (already hourly data) hourly_avg = statistics.mean([d['value'] for d in time_series_data]) hourly_max = max([d['value'] for d in time_series_data]) hourly_min = min([d['value'] for d in time_series_data]) # Create aggregated summary aggregated_data = { 'period': 'hourly', 'data_points': len(time_series_data), 'average': hourly_avg, 'maximum': hourly_max, 'minimum': hourly_min, 'trend': 'up' if time_series_data[-1]['value'] > time_series_data[0]['value'] else 'down' } # Validate aggregation assert aggregated_data['period'] == 'hourly' assert aggregated_data['data_points'] == 5 assert aggregated_data['average'] == 110.0 assert aggregated_data['maximum'] == 120 assert aggregated_data['minimum'] == 100 assert aggregated_data['trend'] == 'up' def test_anomaly_detection(self): """Test anomaly detection in metrics""" # Sample metrics with anomalies metrics_data = [ {'timestamp': '2024-01-01T00:00:00Z', 'response_time': 100}, {'timestamp': '2024-01-01T01:00:00Z', 'response_time': 105}, {'timestamp': '2024-01-01T02:00:00Z', 'response_time': 98}, {'timestamp': '2024-01-01T03:00:00Z', 'response_time': 500}, # Anomaly {'timestamp': '2024-01-01T04:00:00Z', 'response_time': 102}, {'timestamp': '2024-01-01T05:00:00Z', 'response_time': 95}, ] # Calculate statistics for anomaly detection response_times = [d['response_time'] for d in metrics_data] mean_time = statistics.mean(response_times) stdev_time = statistics.stdev(response_times) if len(response_times) > 1 else 0 # Detect anomalies (values > 2 standard deviations from mean) threshold = mean_time + (2 * stdev_time) anomalies = [ d for d in metrics_data if d['response_time'] > threshold ] # Validate anomaly detection assert len(anomalies) == 1 assert anomalies[0]['response_time'] == 500 assert anomalies[0]['timestamp'] == '2024-01-01T03:00:00Z' def test_forecasting_model(self): """Test simple forecasting model""" # Historical data for forecasting historical_data = [ {'period': '2024-01-01', 'demand': 100}, {'period': '2024-01-02', 'demand': 110}, {'period': '2024-01-03', 'demand': 105}, {'period': '2024-01-04', 'demand': 120}, {'period': '2024-01-05', 'demand': 115}, ] # Simple moving average forecast demand_values = [d['demand'] for d in historical_data] forecast_period = 3 forecast = statistics.mean(demand_values[-forecast_period:]) # Calculate forecast accuracy (using last known value as "actual") last_actual = demand_values[-1] forecast_error = abs(forecast - last_actual) forecast_accuracy = max(0, 100 - (forecast_error / last_actual * 100)) # Validate forecast assert forecast > 0 assert forecast_accuracy >= 0 assert forecast_accuracy <= 100 class TestDashboardManager: """Test dashboard management functionality""" def test_dashboard_configuration(self): """Test dashboard configuration management""" # Sample dashboard configuration dashboard_config = { 'dashboard_id': 'marketplace_overview', 'title': 'Marketplace Overview', 'layout': 'grid', 'widgets': [ { 'id': 'market_metrics', 'type': 'metric_card', 'title': 'Market Metrics', 'position': {'x': 0, 'y': 0, 'w': 4, 'h': 2}, 'data_source': 'market_metrics_api' }, { 'id': 'price_chart', 'type': 'line_chart', 'title': 'Price Trends', 'position': {'x': 4, 'y': 0, 'w': 8, 'h': 4}, 'data_source': 'price_history_api' }, { 'id': 'provider_ranking', 'type': 'table', 'title': 'Top Providers', 'position': {'x': 0, 'y': 2, 'w': 6, 'h': 3}, 'data_source': 'provider_ranking_api' } ], 'refresh_interval': 300, # 5 minutes 'permissions': ['read', 'write'] } # Validate configuration assert dashboard_config['dashboard_id'] == 'marketplace_overview' assert len(dashboard_config['widgets']) == 3 assert dashboard_config['refresh_interval'] == 300 assert 'read' in dashboard_config['permissions'] # Validate widgets for widget in dashboard_config['widgets']: assert 'id' in widget assert 'type' in widget assert 'title' in widget assert 'position' in widget assert 'data_source' in widget def test_widget_data_processing(self): """Test widget data processing""" # Sample data for different widget types widget_data = { 'metric_card': { 'value': 1250, 'change': 5.2, 'change_type': 'increase', 'unit': 'AITBC', 'timestamp': datetime.utcnow().isoformat() }, 'line_chart': { 'labels': ['Jan', 'Feb', 'Mar', 'Apr', 'May'], 'datasets': [ { 'label': 'RTX 3080', 'data': [0.10, 0.11, 0.12, 0.11, 0.13], 'borderColor': '#007bff' }, { 'label': 'RTX 3090', 'data': [0.15, 0.14, 0.16, 0.15, 0.17], 'borderColor': '#28a745' } ] }, 'table': { 'columns': ['provider', 'jobs_completed', 'avg_rating', 'success_rate'], 'rows': [ ['provider_1', 45, 4.8, '90%'], ['provider_2', 28, 4.6, '93%'], ['provider_3', 35, 4.9, '88%'] ] } } # Validate metric card data metric_data = widget_data['metric_card'] assert isinstance(metric_data['value'], (int, float)) assert isinstance(metric_data['change'], (int, float)) assert metric_data['change_type'] in ['increase', 'decrease'] assert 'timestamp' in metric_data # Validate line chart data chart_data = widget_data['line_chart'] assert 'labels' in chart_data assert 'datasets' in chart_data assert len(chart_data['datasets']) == 2 assert len(chart_data['labels']) == len(chart_data['datasets'][0]['data']) # Validate table data table_data = widget_data['table'] assert 'columns' in table_data assert 'rows' in table_data assert len(table_data['columns']) == 4 assert len(table_data['rows']) == 3 def test_dashboard_permissions(self): """Test dashboard permission management""" # Sample user permissions user_permissions = { 'admin': ['read', 'write', 'delete', 'share'], 'analyst': ['read', 'write', 'share'], 'viewer': ['read'], 'guest': [] } # Sample dashboard access rules dashboard_access = { 'marketplace_overview': ['admin', 'analyst', 'viewer'], 'system_metrics': ['admin'], 'public_stats': ['admin', 'analyst', 'viewer', 'guest'] } # Test permission checking def check_permission(user_role, dashboard_id, action): if action not in user_permissions[user_role]: return False if user_role not in dashboard_access[dashboard_id]: return False return True # Validate permissions assert check_permission('admin', 'marketplace_overview', 'read') is True assert check_permission('admin', 'system_metrics', 'write') is True assert check_permission('viewer', 'system_metrics', 'read') is False assert check_permission('guest', 'public_stats', 'read') is True assert check_permission('analyst', 'marketplace_overview', 'delete') is False class TestReportingSystem: """Test reporting system functionality""" def test_report_generation(self): """Test report generation capabilities""" # Sample report data report_data = { 'report_id': 'monthly_marketplace_report', 'title': 'Monthly Marketplace Performance', 'period': { 'start': '2024-01-01', 'end': '2024-01-31' }, 'sections': [ { 'title': 'Executive Summary', 'content': { 'total_transactions': 1250, 'total_volume': 156.78, 'active_providers': 45, 'satisfaction_rate': 4.7 } }, { 'title': 'Price Analysis', 'content': { 'avg_gpu_price': 0.12, 'price_trend': 'stable', 'volatility_index': 0.05 } } ], 'generated_at': datetime.utcnow().isoformat(), 'format': 'json' } # Validate report structure assert 'report_id' in report_data assert 'title' in report_data assert 'period' in report_data assert 'sections' in report_data assert 'generated_at' in report_data # Validate sections for section in report_data['sections']: assert 'title' in section assert 'content' in section # Validate data integrity summary = report_data['sections'][0]['content'] assert summary['total_transactions'] > 0 assert summary['total_volume'] > 0 assert summary['active_providers'] > 0 assert 0 <= summary['satisfaction_rate'] <= 5 def test_report_export(self): """Test report export functionality""" # Sample report for export report = { 'title': 'Marketplace Analysis', 'data': { 'metrics': {'transactions': 100, 'volume': 50.5}, 'trends': {'price': 'up', 'demand': 'stable'} }, 'metadata': { 'generated_by': 'analytics_system', 'generated_at': datetime.utcnow().isoformat() } } # Test JSON export json_export = json.dumps(report, indent=2) assert isinstance(json_export, str) assert 'Marketplace Analysis' in json_export # Test CSV export (simplified) csv_data = "Metric,Value\n" csv_data += f"Transactions,{report['data']['metrics']['transactions']}\n" csv_data += f"Volume,{report['data']['metrics']['volume']}\n" assert 'Transactions,100' in csv_data assert 'Volume,50.5' in csv_data assert csv_data.count('\n') == 3 # Header + 2 data rows def test_report_scheduling(self): """Test report scheduling functionality""" # Sample schedule configuration schedule_config = { 'report_id': 'daily_marketplace_summary', 'frequency': 'daily', 'time': '08:00', 'recipients': ['admin@aitbc.com', 'ops@aitbc.com'], 'format': 'pdf', 'enabled': True, 'last_run': '2024-01-01T08:00:00Z', 'next_run': '2024-01-02T08:00:00Z' } # Validate schedule configuration assert schedule_config['frequency'] in ['daily', 'weekly', 'monthly'] assert schedule_config['time'] == '08:00' assert len(schedule_config['recipients']) > 0 assert schedule_config['enabled'] is True assert 'next_run' in schedule_config # Test next run calculation from datetime import datetime, timedelta last_run = datetime.fromisoformat(schedule_config['last_run'].replace('Z', '+00:00')) next_run = datetime.fromisoformat(schedule_config['next_run'].replace('Z', '+00:00')) expected_next_run = last_run + timedelta(days=1) assert next_run.date() == expected_next_run.date() assert next_run.hour == 8 assert next_run.minute == 0 class TestDataCollector: """Test data collection functionality""" def test_data_collection_metrics(self): """Test data collection metrics gathering""" # Sample data collection metrics collection_metrics = { 'total_records_collected': 10000, 'collection_duration_seconds': 300, 'error_rate': 0.02, # 2% 'data_sources': ['marketplace_api', 'blockchain_api', 'user_activity'], 'last_collection': datetime.utcnow().isoformat() } # Validate metrics assert collection_metrics['total_records_collected'] > 0 assert collection_metrics['collection_duration_seconds'] > 0 assert 0 <= collection_metrics['error_rate'] <= 1 assert len(collection_metrics['data_sources']) > 0 assert 'last_collection' in collection_metrics # Calculate collection rate collection_rate = collection_metrics['total_records_collected'] / collection_metrics['collection_duration_seconds'] assert collection_rate > 10 # Should collect at least 10 records per second def test_collect_transaction_volume(self, data_collector): """Test transaction volume collection""" session = MockSession() # Test daily collection start_time = datetime.utcnow() - timedelta(days=1) end_time = datetime.utcnow() volume_metric = asyncio.run( data_collector.collect_transaction_volume( session, AnalyticsPeriod.DAILY, start_time, end_time ) ) # Verify metric structure assert volume_metric is not None assert volume_metric.metric_name == "transaction_volume" assert volume_metric.metric_type == MetricType.VOLUME assert volume_metric.period_type == AnalyticsPeriod.DAILY assert volume_metric.unit == "AITBC" assert volume_metric.category == "financial" assert volume_metric.value > 0 assert "by_trade_type" in volume_metric.breakdown assert "by_region" in volume_metric.breakdown # Verify change percentage calculation assert volume_metric.change_percentage is not None assert volume_metric.previous_value is not None def test_collect_active_agents(self, data_collector): """Test active agents collection""" session = MockSession() start_time = datetime.utcnow() - timedelta(days=1) end_time = datetime.utcnow() agents_metric = asyncio.run( data_collector.collect_active_agents( session, AnalyticsPeriod.DAILY, start_time, end_time ) ) # Verify metric structure assert agents_metric is not None assert agents_metric.metric_name == "active_agents" assert agents_metric.metric_type == MetricType.COUNT assert agents_metric.unit == "agents" assert agents_metric.category == "agents" assert agents_metric.value > 0 assert "by_role" in agents_metric.breakdown assert "by_tier" in agents_metric.breakdown assert "by_region" in agents_metric.breakdown def test_collect_average_prices(self, data_collector): """Test average price collection""" session = MockSession() start_time = datetime.utcnow() - timedelta(days=1) end_time = datetime.utcnow() price_metric = asyncio.run( data_collector.collect_average_prices( session, AnalyticsPeriod.DAILY, start_time, end_time ) ) # Verify metric structure assert price_metric is not None assert price_metric.metric_name == "average_price" assert price_metric.metric_type == MetricType.AVERAGE assert price_metric.unit == "AITBC" assert price_metric.category == "pricing" assert price_metric.value > 0 assert "by_trade_type" in price_metric.breakdown assert "by_tier" in price_metric.breakdown def test_collect_success_rates(self, data_collector): """Test success rate collection""" session = MockSession() start_time = datetime.utcnow() - timedelta(days=1) end_time = datetime.utcnow() success_metric = asyncio.run( data_collector.collect_success_rates( session, AnalyticsPeriod.DAILY, start_time, end_time ) ) # Verify metric structure assert success_metric is not None assert success_metric.metric_name == "success_rate" assert success_metric.metric_type == MetricType.PERCENTAGE assert success_metric.unit == "%" assert success_metric.category == "performance" assert 70.0 <= success_metric.value <= 95.0 # Clamped range assert "by_trade_type" in success_metric.breakdown assert "by_tier" in success_metric.breakdown def test_collect_supply_demand_ratio(self, data_collector): """Test supply/demand ratio collection""" session = MockSession() start_time = datetime.utcnow() - timedelta(days=1) end_time = datetime.utcnow() ratio_metric = asyncio.run( data_collector.collect_supply_demand_ratio( session, AnalyticsPeriod.DAILY, start_time, end_time ) ) # Verify metric structure assert ratio_metric is not None assert ratio_metric.metric_name == "supply_demand_ratio" assert ratio_metric.metric_type == MetricType.RATIO assert ratio_metric.unit == "ratio" assert ratio_metric.category == "market" assert 0.5 <= ratio_metric.value <= 2.0 # Clamped range assert "by_trade_type" in ratio_metric.breakdown assert "by_region" in ratio_metric.breakdown def test_collect_market_metrics_batch(self, data_collector): """Test batch collection of all market metrics""" session = MockSession() start_time = datetime.utcnow() - timedelta(days=1) end_time = datetime.utcnow() metrics = asyncio.run( data_collector.collect_market_metrics( session, AnalyticsPeriod.DAILY, start_time, end_time ) ) # Verify all metrics were collected assert len(metrics) == 5 # Should collect 5 metrics metric_names = [m.metric_name for m in metrics] expected_names = [ "transaction_volume", "active_agents", "average_price", "success_rate", "supply_demand_ratio" ] for name in expected_names: assert name in metric_names def test_different_periods(self, data_collector): """Test collection for different time periods""" session = MockSession() periods = [AnalyticsPeriod.HOURLY, AnalyticsPeriod.DAILY, AnalyticsPeriod.WEEKLY, AnalyticsPeriod.MONTHLY] for period in periods: if period == AnalyticsPeriod.HOURLY: start_time = datetime.utcnow() - timedelta(hours=1) end_time = datetime.utcnow() elif period == AnalyticsPeriod.WEEKLY: start_time = datetime.utcnow() - timedelta(weeks=1) end_time = datetime.utcnow() elif period == AnalyticsPeriod.MONTHLY: start_time = datetime.utcnow() - timedelta(days=30) end_time = datetime.utcnow() else: start_time = datetime.utcnow() - timedelta(days=1) end_time = datetime.utcnow() metrics = asyncio.run( data_collector.collect_market_metrics( session, period, start_time, end_time ) ) # Verify metrics were collected for each period assert len(metrics) > 0 for metric in metrics: assert metric.period_type == period class TestAnalyticsEngine: """Test analytics engine functionality""" @pytest.fixture def analytics_engine(self): return AnalyticsEngine() @pytest.fixture def sample_metrics(self): """Create sample metrics for testing""" return [ MarketMetric( metric_name="transaction_volume", metric_type=MetricType.VOLUME, period_type=AnalyticsPeriod.DAILY, value=1200.0, previous_value=1000.0, change_percentage=20.0, unit="AITBC", category="financial", recorded_at=datetime.utcnow(), period_start=datetime.utcnow() - timedelta(days=1), period_end=datetime.utcnow() ), MarketMetric( metric_name="success_rate", metric_type=MetricType.PERCENTAGE, period_type=AnalyticsPeriod.DAILY, value=85.0, previous_value=90.0, change_percentage=-5.56, unit="%", category="performance", recorded_at=datetime.utcnow(), period_start=datetime.utcnow() - timedelta(days=1), period_end=datetime.utcnow() ), MarketMetric( metric_name="active_agents", metric_type=MetricType.COUNT, period_type=AnalyticsPeriod.DAILY, value=180.0, previous_value=150.0, change_percentage=20.0, unit="agents", category="agents", recorded_at=datetime.utcnow(), period_start=datetime.utcnow() - timedelta(days=1), period_end=datetime.utcnow() ) ] def test_analyze_trends(self, analytics_engine, sample_metrics): """Test trend analysis""" session = MockSession() insights = asyncio.run( analytics_engine.analyze_trends(sample_metrics, session) ) # Verify insights were generated assert len(insights) > 0 # Check for significant changes significant_insights = [i for i in insights if abs(i.insight_data.get("change_percentage", 0)) >= 5.0] assert len(significant_insights) > 0 # Verify insight structure for insight in insights: assert insight.insight_type == InsightType.TREND assert insight.title is not None assert insight.description is not None assert insight.confidence_score >= 0.7 assert insight.impact_level in ["low", "medium", "high", "critical"] assert insight.related_metrics is not None assert insight.recommendations is not None assert insight.insight_data is not None def test_detect_anomalies(self, analytics_engine, sample_metrics): """Test anomaly detection""" session = MockSession() insights = asyncio.run( analytics_engine.detect_anomalies(sample_metrics, session) ) # Verify insights were generated (may be empty for normal data) for insight in insights: assert insight.insight_type == InsightType.ANOMALY assert insight.title is not None assert insight.description is not None assert insight.confidence_score >= 0.0 assert insight.insight_data.get("anomaly_type") is not None assert insight.insight_data.get("deviation_percentage") is not None def test_identify_opportunities(self, analytics_engine, sample_metrics): """Test opportunity identification""" session = MockSession() # Add supply/demand ratio metric for opportunity testing ratio_metric = MarketMetric( metric_name="supply_demand_ratio", metric_type=MetricType.RATIO, period_type=AnalyticsPeriod.DAILY, value=0.7, # High demand, low supply previous_value=1.2, change_percentage=-41.67, unit="ratio", category="market", recorded_at=datetime.utcnow(), period_start=datetime.utcnow() - timedelta(days=1), period_end=datetime.utcnow() ) metrics_with_ratio = sample_metrics + [ratio_metric] insights = asyncio.run( analytics_engine.identify_opportunities(metrics_with_ratio, session) ) # Verify opportunity insights were generated opportunity_insights = [i for i in insights if i.insight_type == InsightType.OPPORTUNITY] assert len(opportunity_insights) > 0 # Verify opportunity structure for insight in opportunity_insights: assert insight.insight_type == InsightType.OPPORTUNITY assert "opportunity_type" in insight.insight_data assert "recommended_action" in insight.insight_data assert insight.suggested_actions is not None def test_assess_risks(self, analytics_engine, sample_metrics): """Test risk assessment""" session = MockSession() insights = asyncio.run( analytics_engine.assess_risks(sample_metrics, session) ) # Verify risk insights were generated risk_insights = [i for i in insights if i.insight_type == InsightType.WARNING] # Check for declining success rate risk success_rate_insights = [ i for i in risk_insights if "success_rate" in i.related_metrics and i.insight_data.get("decline_percentage", 0) < -10.0 ] if success_rate_insights: assert len(success_rate_insights) > 0 for insight in success_rate_insights: assert insight.impact_level in ["medium", "high", "critical"] assert insight.suggested_actions is not None def test_generate_insights_comprehensive(self, analytics_engine, sample_metrics): """Test comprehensive insight generation""" session = MockSession() start_time = datetime.utcnow() - timedelta(days=1) end_time = datetime.utcnow() insights = asyncio.run( analytics_engine.generate_insights(session, AnalyticsPeriod.DAILY, start_time, end_time) ) # Verify all insight types were considered insight_types = set(i.insight_type for i in insights) expected_types = {InsightType.TREND, InsightType.ANOMALY, InsightType.OPPORTUNITY, InsightType.WARNING} # At least trends should be generated assert InsightType.TREND in insight_types # Verify insight quality for insight in insights: assert 0.0 <= insight.confidence_score <= 1.0 assert insight.impact_level in ["low", "medium", "high", "critical"] assert insight.recommendations is not None assert len(insight.recommendations) > 0 class TestDashboardManager: """Test dashboard management functionality""" @pytest.fixture def dashboard_manager(self): return DashboardManager() def test_create_default_dashboard(self, dashboard_manager): """Test default dashboard creation""" session = MockSession() dashboard = asyncio.run( dashboard_manager.create_default_dashboard(session, "user_001", "Test Dashboard") ) # Verify dashboard structure assert dashboard.dashboard_id is not None assert dashboard.name == "Test Dashboard" assert dashboard.dashboard_type == "default" assert dashboard.owner_id == "user_001" assert dashboard.status == "active" assert len(dashboard.widgets) == 4 # Default widgets assert len(dashboard.filters) == 2 # Default filters assert dashboard.refresh_interval == 300 assert dashboard.auto_refresh is True # Verify default widgets widget_names = [w["type"] for w in dashboard.widgets] expected_widgets = ["metric_cards", "line_chart", "map", "insight_list"] for widget in expected_widgets: assert widget in widget_names def test_create_executive_dashboard(self, dashboard_manager): """Test executive dashboard creation""" session = MockSession() dashboard = asyncio.run( dashboard_manager.create_executive_dashboard(session, "exec_user_001") ) # Verify executive dashboard structure assert dashboard.dashboard_type == "executive" assert dashboard.owner_id == "exec_user_001" assert dashboard.refresh_interval == 600 # 10 minutes for executive assert dashboard.dashboard_settings["theme"] == "executive" assert dashboard.dashboard_settings["compact_mode"] is True # Verify executive widgets widget_names = [w["type"] for w in dashboard.widgets] expected_widgets = ["kpi_cards", "area_chart", "gauge_chart", "leaderboard", "alert_list"] for widget in expected_widgets: assert widget in widget_names def test_default_widgets_structure(self, dashboard_manager): """Test default widgets structure""" widgets = dashboard_manager.default_widgets # Verify all required widgets are present required_widgets = ["market_overview", "trend_analysis", "geographic_distribution", "recent_insights"] assert set(widgets.keys()) == set(required_widgets) # Verify widget structure for widget_name, widget_config in widgets.items(): assert "type" in widget_config assert "layout" in widget_config assert "x" in widget_config["layout"] assert "y" in widget_config["layout"] assert "w" in widget_config["layout"] assert "h" in widget_config["layout"] class TestMarketplaceAnalytics: """Test main marketplace analytics service""" @pytest.fixture def mock_session(self): """Mock database session""" class MockSession: def __init__(self): self.data = {} self.committed = False def exec(self, query): # Mock query execution if hasattr(query, 'where'): return [] return [] def add(self, obj): self.data[obj.id if hasattr(obj, 'id') else 'temp'] = obj def commit(self): self.committed = True def refresh(self, obj): pass return MockSession() @pytest.fixture def analytics_service(self, mock_session): return MarketplaceAnalytics(mock_session) def test_collect_market_data(self, analytics_service, mock_session): """Test market data collection""" result = asyncio.run( analytics_service.collect_market_data(AnalyticsPeriod.DAILY) ) # Verify result structure assert "period_type" in result assert "start_time" in result assert "end_time" in result assert "metrics_collected" in result assert "insights_generated" in result assert "market_data" in result # Verify market data market_data = result["market_data"] expected_metrics = ["transaction_volume", "active_agents", "average_price", "success_rate", "supply_demand_ratio"] for metric in expected_metrics: assert metric in market_data assert isinstance(market_data[metric], (int, float)) assert market_data[metric] >= 0 assert result["metrics_collected"] > 0 assert result["insights_generated"] > 0 def test_generate_insights(self, analytics_service, mock_session): """Test insight generation""" result = asyncio.run( analytics_service.generate_insights("daily") ) # Verify result structure assert "period_type" in result assert "start_time" in result assert "end_time" in result assert "total_insights" in result assert "insight_groups" in result assert "high_impact_insights" in result assert "high_confidence_insights" in result # Verify insight groups insight_groups = result["insight_groups"] assert isinstance(insight_groups, dict) # Should have at least trends assert "trend" in insight_groups # Verify insight data structure for insight_type, insights in insight_groups.items(): assert isinstance(insights, list) for insight in insights: assert "id" in insight assert "type" in insight assert "title" in insight assert "description" in insight assert "confidence" in insight assert "impact" in insight assert "recommendations" in insight def test_create_dashboard(self, analytics_service, mock_session): """Test dashboard creation""" result = asyncio.run( analytics_service.create_dashboard("user_001", "default") ) # Verify result structure assert "dashboard_id" in result assert "name" in result assert "type" in result assert "widgets" in result assert "refresh_interval" in result assert "created_at" in result # Verify dashboard was created assert result["type"] == "default" assert result["widgets"] > 0 assert result["refresh_interval"] == 300 def test_get_market_overview(self, analytics_service, mock_session): """Test market overview""" overview = asyncio.run( analytics_service.get_market_overview() ) # Verify overview structure assert "timestamp" in overview assert "period" in overview assert "metrics" in overview assert "insights" in overview assert "alerts" in overview assert "summary" in overview # Verify summary data summary = overview["summary"] assert "total_metrics" in summary assert "active_insights" in summary assert "active_alerts" in summary assert "market_health" in summary assert summary["market_health"] in ["healthy", "warning", "critical"] def test_different_periods(self, analytics_service, mock_session): """Test analytics for different time periods""" periods = ["daily", "weekly", "monthly"] for period in periods: # Test data collection result = asyncio.run( analytics_service.collect_market_data(AnalyticsPeriod(period.upper())) ) assert result["period_type"] == period.upper() assert result["metrics_collected"] > 0 # Test insight generation insights = asyncio.run( analytics_service.generate_insights(period) ) assert insights["period_type"] == period assert insights["total_insights"] >= 0 # Mock Session Class class MockSession: """Mock database session for testing""" def __init__(self): self.data = {} self.committed = False def exec(self, query): # Mock query execution if hasattr(query, 'where'): return [] return [] def add(self, obj): self.data[obj.id if hasattr(obj, 'id') else 'temp'] = obj def commit(self): self.committed = True def refresh(self, obj): pass # Performance Tests class TestAnalyticsPerformance: """Performance tests for analytics system""" @pytest.mark.asyncio async def test_bulk_metric_collection_performance(self): """Test performance of bulk metric collection""" # Test collecting metrics for multiple periods # Should complete within acceptable time limits pass @pytest.mark.asyncio async def test_insight_generation_performance(self): """Test insight generation performance""" # Test generating insights with large datasets # Should complete within acceptable time limits pass # Utility Functions def create_test_metric(**kwargs) -> Dict[str, Any]: """Create test metric data""" defaults = { "metric_name": "test_metric", "metric_type": MetricType.VALUE, "period_type": AnalyticsPeriod.DAILY, "value": 100.0, "previous_value": 90.0, "change_percentage": 11.11, "unit": "units", "category": "test", "recorded_at": datetime.utcnow(), "period_start": datetime.utcnow() - timedelta(days=1), "period_end": datetime.utcnow() } defaults.update(kwargs) return defaults def create_test_insight(**kwargs) -> Dict[str, Any]: """Create test insight data""" defaults = { "insight_type": InsightType.TREND, "title": "Test Insight", "description": "Test description", "confidence_score": 0.8, "impact_level": "medium", "related_metrics": ["test_metric"], "time_horizon": "short_term", "recommendations": ["Test recommendation"], "insight_data": {"test": "data"} } defaults.update(kwargs) return defaults # Test Configuration @pytest.fixture(scope="session") def test_config(): """Test configuration for analytics system tests""" return { "test_metric_count": 100, "test_insight_count": 50, "test_report_count": 20, "performance_threshold_ms": 5000, "memory_threshold_mb": 200 } # Test Markers pytest.mark.unit = pytest.mark.unit pytest.mark.integration = pytest.mark.integration pytest.mark.performance = pytest.mark.performance pytest.mark.slow = pytest.mark.slow