diff --git a/tests/phase3/test_decision_framework.py b/tests/phase3/test_decision_framework.py new file mode 100644 index 00000000..ff8a7997 --- /dev/null +++ b/tests/phase3/test_decision_framework.py @@ -0,0 +1,358 @@ +""" +Phase 3: Decision Framework Tests +Tests for distributed decision making, voting systems, and consensus algorithms +""" + +import pytest +import asyncio +import json +from datetime import datetime, timedelta +from unittest.mock import Mock, AsyncMock +from typing import Dict, List, Any + +# Mock imports for testing +class MockDecisionEngine: + def __init__(self): + self.decisions = {} + self.votes = {} + + async def make_decision(self, decision_data: Dict[str, Any]) -> Dict[str, Any]: + decision_id = decision_data.get('decision_id', 'test_decision') + self.decisions[decision_id] = decision_data + return { + 'decision_id': decision_id, + 'status': 'completed', + 'result': decision_data.get('proposal', 'approved'), + 'timestamp': datetime.utcnow().isoformat() + } + + async def submit_vote(self, vote_data: Dict[str, Any]) -> Dict[str, Any]: + vote_id = vote_data.get('vote_id', 'test_vote') + self.votes[vote_id] = vote_data + return { + 'vote_id': vote_id, + 'status': 'recorded', + 'timestamp': datetime.utcnow().isoformat() + } + +class MockConsensusAlgorithm: + def __init__(self): + self.consensus_results = {} + + async def achieve_consensus(self, participants: List[str], proposal: Dict[str, Any]) -> Dict[str, Any]: + consensus_id = f"consensus_{len(self.consensus_results)}" + self.consensus_results[consensus_id] = { + 'participants': participants, + 'proposal': proposal, + 'result': 'consensus_reached' + } + return { + 'consensus_id': consensus_id, + 'status': 'consensus_reached', + 'agreement': True, + 'timestamp': datetime.utcnow().isoformat() + } + +class TestDecisionEngine: + """Test the decision engine functionality""" + + def setup_method(self): + self.decision_engine = MockDecisionEngine() + + @pytest.mark.asyncio + async def test_make_decision(self): + """Test basic decision making""" + decision_data = { + 'decision_id': 'test_decision_001', + 'proposal': 'test_proposal', + 'priority': 'high' + } + + result = await self.decision_engine.make_decision(decision_data) + + assert result['decision_id'] == 'test_decision_001' + assert result['status'] == 'completed' + assert result['result'] == 'test_proposal' + assert 'timestamp' in result + + @pytest.mark.asyncio + async def test_submit_vote(self): + """Test vote submission""" + vote_data = { + 'vote_id': 'test_vote_001', + 'voter_id': 'agent_001', + 'vote': 'approve', + 'decision_id': 'test_decision_001' + } + + result = await self.decision_engine.submit_vote(vote_data) + + assert result['vote_id'] == 'test_vote_001' + assert result['status'] == 'recorded' + assert 'timestamp' in result + + @pytest.mark.asyncio + async def test_decision_with_complex_data(self): + """Test decision making with complex data""" + decision_data = { + 'decision_id': 'complex_decision_001', + 'proposal': { + 'action': 'resource_allocation', + 'resources': ['cpu', 'memory', 'storage'], + 'amounts': {'cpu': 50, 'memory': 2048, 'storage': 100} + }, + 'participants': ['agent_001', 'agent_002', 'agent_003'], + 'deadline': (datetime.utcnow() + timedelta(hours=1)).isoformat() + } + + result = await self.decision_engine.make_decision(decision_data) + + assert result['decision_id'] == 'complex_decision_001' + assert result['status'] == 'completed' + assert 'timestamp' in result + +class TestConsensusAlgorithm: + """Test consensus algorithm functionality""" + + def setup_method(self): + self.consensus = MockConsensusAlgorithm() + + @pytest.mark.asyncio + async def test_achieve_consensus(self): + """Test basic consensus achievement""" + participants = ['agent_001', 'agent_002', 'agent_003'] + proposal = { + 'action': 'system_update', + 'version': '1.0.0', + 'description': 'Update system to new version' + } + + result = await self.consensus.achieve_consensus(participants, proposal) + + assert result['status'] == 'consensus_reached' + assert result['agreement'] is True + assert 'consensus_id' in result + assert 'timestamp' in result + + @pytest.mark.asyncio + async def test_consensus_with_single_agent(self): + """Test consensus with single participant""" + participants = ['agent_001'] + proposal = {'action': 'test_action'} + + result = await self.consensus.achieve_consensus(participants, proposal) + + assert result['status'] == 'consensus_reached' + assert result['agreement'] is True + + @pytest.mark.asyncio + async def test_consensus_with_complex_proposal(self): + """Test consensus with complex proposal""" + participants = ['agent_001', 'agent_002', 'agent_003', 'agent_004'] + proposal = { + 'action': 'policy_change', + 'policy': { + 'name': 'resource_allocation_policy', + 'rules': [ + {'rule': 'priority_based', 'weight': 0.6}, + {'rule': 'fair_share', 'weight': 0.4} + ], + 'effective_date': datetime.utcnow().isoformat() + } + } + + result = await self.consensus.achieve_consensus(participants, proposal) + + assert result['status'] == 'consensus_reached' + assert result['agreement'] is True + assert 'consensus_id' in result + +class TestVotingSystem: + """Test voting system functionality""" + + def setup_method(self): + self.decision_engine = MockDecisionEngine() + self.votes = {} + + @pytest.mark.asyncio + async def test_majority_voting(self): + """Test majority voting mechanism""" + votes = [ + {'voter_id': 'agent_001', 'vote': 'approve'}, + {'voter_id': 'agent_002', 'vote': 'approve'}, + {'voter_id': 'agent_003', 'vote': 'reject'} + ] + + # Simulate majority voting + approve_votes = sum(1 for v in votes if v['vote'] == 'approve') + total_votes = len(votes) + majority_threshold = total_votes // 2 + 1 + + result = { + 'decision': 'approve' if approve_votes >= majority_threshold else 'reject', + 'vote_count': {'approve': approve_votes, 'reject': total_votes - approve_votes}, + 'threshold': majority_threshold + } + + assert result['decision'] == 'approve' + assert result['vote_count']['approve'] == 2 + assert result['vote_count']['reject'] == 1 + assert result['threshold'] == 2 + + @pytest.mark.asyncio + async def test_weighted_voting(self): + """Test weighted voting mechanism""" + votes = [ + {'voter_id': 'agent_001', 'vote': 'approve', 'weight': 3}, + {'voter_id': 'agent_002', 'vote': 'reject', 'weight': 1}, + {'voter_id': 'agent_003', 'vote': 'approve', 'weight': 2} + ] + + # Calculate weighted votes + approve_weight = sum(v['weight'] for v in votes if v['vote'] == 'approve') + reject_weight = sum(v['weight'] for v in votes if v['vote'] == 'reject') + total_weight = approve_weight + reject_weight + + result = { + 'decision': 'approve' if approve_weight > reject_weight else 'reject', + 'weighted_count': {'approve': approve_weight, 'reject': reject_weight}, + 'total_weight': total_weight + } + + assert result['decision'] == 'approve' + assert result['weighted_count']['approve'] == 5 + assert result['weighted_count']['reject'] == 1 + assert result['total_weight'] == 6 + + @pytest.mark.asyncio + async def test_unanimous_voting(self): + """Test unanimous voting mechanism""" + votes = [ + {'voter_id': 'agent_001', 'vote': 'approve'}, + {'voter_id': 'agent_002', 'vote': 'approve'}, + {'voter_id': 'agent_003', 'vote': 'approve'} + ] + + # Check for unanimity + all_approve = all(v['vote'] == 'approve' for v in votes) + + result = { + 'decision': 'approve' if all_approve else 'reject', + 'unanimous': all_approve, + 'vote_count': len(votes) + } + + assert result['decision'] == 'approve' + assert result['unanimous'] is True + assert result['vote_count'] == 3 + +class TestAgentLifecycleManagement: + """Test agent lifecycle management""" + + def setup_method(self): + self.agents = {} + self.agent_states = {} + + @pytest.mark.asyncio + async def test_agent_registration(self): + """Test agent registration in decision system""" + agent_data = { + 'agent_id': 'agent_001', + 'capabilities': ['decision_making', 'voting'], + 'status': 'active', + 'join_time': datetime.utcnow().isoformat() + } + + self.agents[agent_data['agent_id']] = agent_data + + assert agent_data['agent_id'] in self.agents + assert self.agents[agent_data['agent_id']]['status'] == 'active' + assert 'decision_making' in self.agents[agent_data['agent_id']]['capabilities'] + + @pytest.mark.asyncio + async def test_agent_status_update(self): + """Test agent status updates""" + agent_id = 'agent_002' + self.agents[agent_id] = { + 'agent_id': agent_id, + 'status': 'active', + 'last_update': datetime.utcnow().isoformat() + } + + # Update agent status + self.agents[agent_id]['status'] = 'busy' + self.agents[agent_id]['last_update'] = datetime.utcnow().isoformat() + + assert self.agents[agent_id]['status'] == 'busy' + assert 'last_update' in self.agents[agent_id] + + @pytest.mark.asyncio + async def test_agent_removal(self): + """Test agent removal from decision system""" + agent_id = 'agent_003' + self.agents[agent_id] = { + 'agent_id': agent_id, + 'status': 'active' + } + + # Remove agent + del self.agents[agent_id] + + assert agent_id not in self.agents + +# Integration tests +class TestDecisionIntegration: + """Integration tests for decision framework""" + + @pytest.mark.asyncio + async def test_end_to_end_decision_process(self): + """Test complete decision making process""" + decision_engine = MockDecisionEngine() + consensus = MockConsensusAlgorithm() + + # Step 1: Create decision proposal + decision_data = { + 'decision_id': 'integration_test_001', + 'proposal': 'test_proposal', + 'participants': ['agent_001', 'agent_002'] + } + + # Step 2: Make decision + decision_result = await decision_engine.make_decision(decision_data) + + # Step 3: Achieve consensus + consensus_result = await consensus.achieve_consensus( + decision_data['participants'], + {'action': decision_data['proposal']} + ) + + # Verify results + assert decision_result['status'] == 'completed' + assert consensus_result['status'] == 'consensus_reached' + assert decision_result['decision_id'] == 'integration_test_001' + + @pytest.mark.asyncio + async def test_multi_agent_coordination(self): + """Test coordination between multiple agents""" + agents = ['agent_001', 'agent_002', 'agent_003'] + decision_engine = MockDecisionEngine() + + # Simulate coordinated decision making + decisions = [] + for i, agent in enumerate(agents): + decision_data = { + 'decision_id': f'coord_test_{i}', + 'agent_id': agent, + 'proposal': f'proposal_{i}', + 'coordinated_with': [a for a in agents if a != agent] + } + result = await decision_engine.make_decision(decision_data) + decisions.append(result) + + # Verify all decisions were made + assert len(decisions) == len(agents) + for decision in decisions: + assert decision['status'] == 'completed' + +if __name__ == '__main__': + pytest.main([__file__]) diff --git a/tests/phase4/test_autonomous_decision_making.py b/tests/phase4/test_autonomous_decision_making.py new file mode 100644 index 00000000..51b991fe --- /dev/null +++ b/tests/phase4/test_autonomous_decision_making.py @@ -0,0 +1,532 @@ +""" +Phase 4: Autonomous Decision Making Tests +Tests for autonomous systems, learning, and adaptation +""" + +import pytest +import asyncio +import json +from datetime import datetime, timedelta +from unittest.mock import Mock, AsyncMock +from typing import Dict, List, Any, Optional + +# Mock imports for testing +class MockAutonomousEngine: + def __init__(self): + self.policies = {} + self.decisions = [] + self.learning_data = {} + self.performance_metrics = {} + + async def make_autonomous_decision(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Make autonomous decision based on context""" + decision_id = f"auto_decision_{len(self.decisions)}" + decision = { + 'decision_id': decision_id, + 'context': context, + 'action': self._determine_action(context), + 'reasoning': self._generate_reasoning(context), + 'confidence': self._calculate_confidence(context), + 'timestamp': datetime.utcnow().isoformat() + } + self.decisions.append(decision) + return decision + + def _determine_action(self, context: Dict[str, Any]) -> str: + """Determine action based on context""" + if context.get('system_load', 0) > 0.8: + return 'scale_resources' + elif context.get('error_rate', 0) > 0.1: + return 'trigger_recovery' + elif context.get('task_queue_size', 0) > 100: + return 'allocate_more_agents' + else: + return 'maintain_status' + + def _generate_reasoning(self, context: Dict[str, Any]) -> str: + """Generate reasoning for decision""" + return f"Based on system metrics: load={context.get('system_load', 0)}, errors={context.get('error_rate', 0)}" + + def _calculate_confidence(self, context: Dict[str, Any]) -> float: + """Calculate confidence in decision""" + # Simple confidence calculation based on data quality + has_metrics = all(key in context for key in ['system_load', 'error_rate']) + return 0.9 if has_metrics else 0.6 + +class MockLearningSystem: + def __init__(self): + self.experience_buffer = [] + self.performance_history = [] + self.adaptations = {} + + async def learn_from_experience(self, experience: Dict[str, Any]) -> Dict[str, Any]: + """Learn from experience""" + experience_id = f"exp_{len(self.experience_buffer)}" + learning_data = { + 'experience_id': experience_id, + 'experience': experience, + 'lessons_learned': self._extract_lessons(experience), + 'performance_impact': self._calculate_impact(experience), + 'timestamp': datetime.utcnow().isoformat() + } + self.experience_buffer.append(learning_data) + return learning_data + + def _extract_lessons(self, experience: Dict[str, Any]) -> List[str]: + """Extract lessons from experience""" + lessons = [] + if experience.get('success', False): + lessons.append("Action was successful") + if experience.get('performance_gain', 0) > 0: + lessons.append("Performance improved") + return lessons + + def _calculate_impact(self, experience: Dict[str, Any]) -> float: + """Calculate performance impact""" + return experience.get('performance_gain', 0.0) + + async def adapt_behavior(self, adaptation_data: Dict[str, Any]) -> Dict[str, Any]: + """Adapt behavior based on learning""" + adaptation_id = f"adapt_{len(self.adaptations)}" + adaptation = { + 'adaptation_id': adaptation_id, + 'type': adaptation_data.get('type', 'parameter_adjustment'), + 'changes': adaptation_data.get('changes', {}), + 'expected_improvement': adaptation_data.get('expected_improvement', 0.1), + 'timestamp': datetime.utcnow().isoformat() + } + self.adaptations[adaptation_id] = adaptation + return adaptation + +class MockPolicyEngine: + def __init__(self): + self.policies = { + 'resource_management': { + 'max_cpu_usage': 0.8, + 'max_memory_usage': 0.85, + 'auto_scale_threshold': 0.7 + }, + 'error_handling': { + 'max_error_rate': 0.05, + 'retry_attempts': 3, + 'recovery_timeout': 300 + }, + 'task_management': { + 'max_queue_size': 1000, + 'task_timeout': 600, + 'priority_weights': {'high': 1.0, 'normal': 0.5, 'low': 0.2} + } + } + + async def evaluate_policy_compliance(self, decision: Dict[str, Any]) -> Dict[str, Any]: + """Evaluate if decision complies with policies""" + compliance_score = self._calculate_compliance(decision) + violations = self._find_violations(decision) + + return { + 'decision_id': decision.get('decision_id'), + 'compliance_score': compliance_score, + 'violations': violations, + 'approved': compliance_score >= 0.8 and len(violations) == 0, + 'timestamp': datetime.utcnow().isoformat() + } + + def _calculate_compliance(self, decision: Dict[str, Any]) -> float: + """Calculate policy compliance score""" + # Simplified compliance calculation + base_score = 1.0 + if decision.get('action') == 'scale_resources': + # Check resource management policy + base_score -= 0.1 # Small penalty for resource scaling + return max(0.0, base_score) + + def _find_violations(self, decision: Dict[str, Any]) -> List[str]: + """Find policy violations""" + violations = [] + context = decision.get('context', {}) + + # Check resource limits + if context.get('system_load', 0) > self.policies['resource_management']['max_cpu_usage']: + violations.append("CPU usage exceeds policy limit") + + return violations + +class TestAutonomousEngine: + """Test autonomous decision making engine""" + + def setup_method(self): + self.autonomous_engine = MockAutonomousEngine() + + @pytest.mark.asyncio + async def test_autonomous_decision_making(self): + """Test basic autonomous decision making""" + context = { + 'system_load': 0.9, + 'error_rate': 0.02, + 'task_queue_size': 50, + 'active_agents': 5 + } + + decision = await self.autonomous_engine.make_autonomous_decision(context) + + assert decision['action'] == 'scale_resources' + assert decision['confidence'] > 0.5 + assert 'reasoning' in decision + assert 'timestamp' in decision + + @pytest.mark.asyncio + async def test_decision_with_high_error_rate(self): + """Test decision making with high error rate""" + context = { + 'system_load': 0.4, + 'error_rate': 0.15, + 'task_queue_size': 30, + 'active_agents': 3 + } + + decision = await self.autonomous_engine.make_autonomous_decision(context) + + assert decision['action'] == 'trigger_recovery' + assert 'error_rate' in decision['reasoning'] + + @pytest.mark.asyncio + async def test_decision_with_task_queue_pressure(self): + """Test decision making with task queue pressure""" + context = { + 'system_load': 0.6, + 'error_rate': 0.03, + 'task_queue_size': 150, + 'active_agents': 4 + } + + decision = await self.autonomous_engine.make_autonomous_decision(context) + + assert decision['action'] == 'allocate_more_agents' + + @pytest.mark.asyncio + async def test_decision_with_normal_conditions(self): + """Test decision making with normal conditions""" + context = { + 'system_load': 0.5, + 'error_rate': 0.02, + 'task_queue_size': 25, + 'active_agents': 4 + } + + decision = await self.autonomous_engine.make_autonomous_decision(context) + + assert decision['action'] == 'maintain_status' + assert decision['confidence'] > 0.8 + +class TestLearningSystem: + """Test learning and adaptation system""" + + def setup_method(self): + self.learning_system = MockLearningSystem() + + @pytest.mark.asyncio + async def test_learning_from_successful_experience(self): + """Test learning from successful experience""" + experience = { + 'action': 'scale_resources', + 'success': True, + 'performance_gain': 0.15, + 'context': {'system_load': 0.9} + } + + learning_result = await self.learning_system.learn_from_experience(experience) + + assert learning_result['experience_id'].startswith('exp_') + assert 'lessons_learned' in learning_result + assert learning_result['performance_impact'] == 0.15 + assert 'Action was successful' in learning_result['lessons_learned'] + + @pytest.mark.asyncio + async def test_learning_from_failure(self): + """Test learning from failed experience""" + experience = { + 'action': 'scale_resources', + 'success': False, + 'performance_gain': -0.05, + 'context': {'system_load': 0.9} + } + + learning_result = await self.learning_system.learn_from_experience(experience) + + assert learning_result['experience_id'].startswith('exp_') + assert learning_result['performance_impact'] == -0.05 + + @pytest.mark.asyncio + async def test_behavior_adaptation(self): + """Test behavior adaptation based on learning""" + adaptation_data = { + 'type': 'threshold_adjustment', + 'changes': {'scale_threshold': 0.75, 'error_threshold': 0.08}, + 'expected_improvement': 0.1 + } + + adaptation = await self.learning_system.adapt_behavior(adaptation_data) + + assert adaptation['type'] == 'threshold_adjustment' + assert adaptation['expected_improvement'] == 0.1 + assert 'scale_threshold' in adaptation['changes'] + + @pytest.mark.asyncio + async def test_experience_accumulation(self): + """Test accumulation of experiences over time""" + experiences = [ + {'action': 'scale_resources', 'success': True, 'performance_gain': 0.1}, + {'action': 'allocate_agents', 'success': True, 'performance_gain': 0.05}, + {'action': 'trigger_recovery', 'success': False, 'performance_gain': -0.02} + ] + + for exp in experiences: + await self.learning_system.learn_from_experience(exp) + + assert len(self.learning_system.experience_buffer) == 3 + assert all(exp['experience_id'].startswith('exp_') for exp in self.learning_system.experience_buffer) + +class TestPolicyEngine: + """Test policy engine for autonomous decisions""" + + def setup_method(self): + self.policy_engine = MockPolicyEngine() + + @pytest.mark.asyncio + async def test_policy_compliance_evaluation(self): + """Test policy compliance evaluation""" + decision = { + 'decision_id': 'test_decision_001', + 'action': 'scale_resources', + 'context': { + 'system_load': 0.7, + 'error_rate': 0.03, + 'task_queue_size': 50 + } + } + + compliance = await self.policy_engine.evaluate_policy_compliance(decision) + + assert compliance['decision_id'] == 'test_decision_001' + assert 'compliance_score' in compliance + assert 'violations' in compliance + assert 'approved' in compliance + assert 'timestamp' in compliance + + @pytest.mark.asyncio + async def test_policy_violation_detection(self): + """Test detection of policy violations""" + decision = { + 'decision_id': 'test_decision_002', + 'action': 'scale_resources', + 'context': { + 'system_load': 0.9, # Exceeds policy limit + 'error_rate': 0.03, + 'task_queue_size': 50 + } + } + + compliance = await self.policy_engine.evaluate_policy_compliance(decision) + + assert len(compliance['violations']) > 0 + assert any('CPU usage' in violation for violation in compliance['violations']) + + @pytest.mark.asyncio + async def test_policy_approval(self): + """Test policy approval for compliant decisions""" + decision = { + 'decision_id': 'test_decision_003', + 'action': 'maintain_status', + 'context': { + 'system_load': 0.5, + 'error_rate': 0.02, + 'task_queue_size': 25 + } + } + + compliance = await self.policy_engine.evaluate_policy_compliance(decision) + + assert compliance['approved'] is True + assert compliance['compliance_score'] >= 0.8 + +class TestSelfCorrectionMechanism: + """Test self-correction mechanisms""" + + def setup_method(self): + self.autonomous_engine = MockAutonomousEngine() + self.learning_system = MockLearningSystem() + self.policy_engine = MockPolicyEngine() + + @pytest.mark.asyncio + async def test_automatic_error_correction(self): + """Test automatic error correction""" + # Simulate error condition + context = { + 'system_load': 0.9, + 'error_rate': 0.12, # High error rate + 'task_queue_size': 50 + } + + # Make initial decision + decision = await self.autonomous_engine.make_autonomous_decision(context) + + # Simulate error in execution + error_experience = { + 'action': decision['action'], + 'success': False, + 'error_type': 'resource_exhaustion', + 'performance_gain': -0.1 + } + + # Learn from error + learning_result = await self.learning_system.learn_from_experience(error_experience) + + # Adapt behavior + adaptation_data = { + 'type': 'resource_threshold_adjustment', + 'changes': {'scale_threshold': 0.8}, + 'expected_improvement': 0.15 + } + + adaptation = await self.learning_system.adapt_behavior(adaptation_data) + + # Verify self-correction + assert decision['action'] == 'trigger_recovery' + assert learning_result['experience_id'].startswith('exp_') + assert adaptation['type'] == 'resource_threshold_adjustment' + + @pytest.mark.asyncio + async def test_performance_optimization(self): + """Test performance optimization through learning""" + # Initial performance + initial_context = { + 'system_load': 0.7, + 'error_rate': 0.05, + 'task_queue_size': 80 + } + + decision = await self.autonomous_engine.make_autonomous_decision(initial_context) + + # Simulate successful execution with performance gain + success_experience = { + 'action': decision['action'], + 'success': True, + 'performance_gain': 0.2 + } + + learning_result = await self.learning_system.learn_from_experience(success_experience) + + # Adapt to optimize further + adaptation_data = { + 'type': 'performance_optimization', + 'changes': {'aggressive_scaling': True}, + 'expected_improvement': 0.1 + } + + adaptation = await self.learning_system.adapt_behavior(adaptation_data) + + # Verify optimization + assert learning_result['performance_impact'] == 0.2 + assert adaptation['type'] == 'performance_optimization' + + @pytest.mark.asyncio + async def test_goal_oriented_behavior(self): + """Test goal-oriented autonomous behavior""" + # Define goals + goals = { + 'primary_goal': 'maintain_system_stability', + 'secondary_goals': ['optimize_performance', 'minimize_errors'], + 'constraints': ['resource_limits', 'policy_compliance'] + } + + # Simulate goal-oriented decision making + context = { + 'system_load': 0.6, + 'error_rate': 0.04, + 'task_queue_size': 60, + 'goals': goals + } + + decision = await self.autonomous_engine.make_autonomous_decision(context) + + # Evaluate against goals + compliance = await self.policy_engine.evaluate_policy_compliance(decision) + + # Verify goal alignment + assert decision['action'] in ['maintain_status', 'allocate_more_agents'] + assert compliance['approved'] is True # Should be policy compliant + +# Integration tests +class TestAutonomousIntegration: + """Integration tests for autonomous systems""" + + @pytest.mark.asyncio + async def test_full_autonomous_cycle(self): + """Test complete autonomous decision cycle""" + autonomous_engine = MockAutonomousEngine() + learning_system = MockLearningSystem() + policy_engine = MockPolicyEngine() + + # Step 1: Make autonomous decision + context = { + 'system_load': 0.85, + 'error_rate': 0.08, + 'task_queue_size': 120 + } + + decision = await autonomous_engine.make_autonomous_decision(context) + + # Step 2: Evaluate policy compliance + compliance = await policy_engine.evaluate_policy_compliance(decision) + + # Step 3: Execute and learn from result + execution_result = { + 'action': decision['action'], + 'success': compliance['approved'], + 'performance_gain': 0.1 if compliance['approved'] else -0.05 + } + + learning_result = await learning_system.learn_from_experience(execution_result) + + # Step 4: Adapt if needed + if not compliance['approved']: + adaptation = await learning_system.adapt_behavior({ + 'type': 'policy_compliance', + 'changes': {'more_conservative_thresholds': True} + }) + + # Verify complete cycle + assert decision['decision_id'].startswith('auto_decision_') + assert 'compliance_score' in compliance + assert learning_result['experience_id'].startswith('exp_') + + @pytest.mark.asyncio + async def test_multi_goal_optimization(self): + """Test optimization across multiple goals""" + goals = { + 'stability': {'weight': 0.4, 'target': 0.95}, + 'performance': {'weight': 0.3, 'target': 0.8}, + 'efficiency': {'weight': 0.3, 'target': 0.75} + } + + contexts = [ + {'system_load': 0.7, 'error_rate': 0.05, 'goals': goals}, + {'system_load': 0.8, 'error_rate': 0.06, 'goals': goals}, + {'system_load': 0.6, 'error_rate': 0.04, 'goals': goals} + ] + + autonomous_engine = MockAutonomousEngine() + decisions = [] + + for context in contexts: + decision = await autonomous_engine.make_autonomous_decision(context) + decisions.append(decision) + + # Verify multi-goal consideration + assert len(decisions) == 3 + for decision in decisions: + assert 'action' in decision + assert 'confidence' in decision + +if __name__ == '__main__': + pytest.main([__file__]) diff --git a/tests/phase5/test_vision_integration.py b/tests/phase5/test_vision_integration.py new file mode 100644 index 00000000..7e95e52c --- /dev/null +++ b/tests/phase5/test_vision_integration.py @@ -0,0 +1,641 @@ +""" +Phase 5: Computer Vision Integration Tests +Tests for visual intelligence, image processing, and multi-modal integration +""" + +import pytest +import asyncio +import json +import base64 +from datetime import datetime, timedelta +from unittest.mock import Mock, AsyncMock +from typing import Dict, List, Any, Optional, Tuple + +# Mock imports for testing +class MockVisionProcessor: + def __init__(self): + self.processed_images = {} + self.detection_results = {} + self.analysis_results = {} + + async def process_image(self, image_data: bytes, processing_type: str = 'general') -> Dict[str, Any]: + """Process image data""" + image_id = f"img_{len(self.processed_images)}" + result = { + 'image_id': image_id, + 'processing_type': processing_type, + 'size': len(image_data), + 'format': 'processed', + 'timestamp': datetime.utcnow().isoformat(), + 'analysis': await self._analyze_image(image_data, processing_type) + } + self.processed_images[image_id] = result + return result + + async def _analyze_image(self, image_data: bytes, processing_type: str) -> Dict[str, Any]: + """Analyze image based on processing type""" + if processing_type == 'object_detection': + return await self._detect_objects(image_data) + elif processing_type == 'scene_analysis': + return await self._analyze_scene(image_data) + elif processing_type == 'text_extraction': + return await self._extract_text(image_data) + else: + return await self._general_analysis(image_data) + + async def _detect_objects(self, image_data: bytes) -> Dict[str, Any]: + """Detect objects in image""" + # Mock object detection + objects = [ + {'class': 'person', 'confidence': 0.92, 'bbox': [100, 150, 200, 300]}, + {'class': 'car', 'confidence': 0.87, 'bbox': [300, 200, 500, 350]}, + {'class': 'building', 'confidence': 0.95, 'bbox': [0, 0, 600, 400]} + ] + + self.detection_results[f"detection_{len(self.detection_results)}"] = objects + + return { + 'objects_detected': len(objects), + 'objects': objects, + 'detection_confidence': sum(obj['confidence'] for obj in objects) / len(objects) + } + + async def _analyze_scene(self, image_data: bytes) -> Dict[str, Any]: + """Analyze scene context""" + # Mock scene analysis + scene_info = { + 'scene_type': 'urban_street', + 'confidence': 0.88, + 'elements': ['vehicles', 'pedestrians', 'buildings'], + 'weather': 'clear', + 'time_of_day': 'daytime', + 'complexity': 'medium' + } + + return scene_info + + async def _extract_text(self, image_data: bytes) -> Dict[str, Any]: + """Extract text from image""" + # Mock OCR + text_data = { + 'text_found': True, + 'extracted_text': ['STOP', 'MAIN ST', 'NO PARKING'], + 'confidence': 0.91, + 'language': 'en', + 'text_regions': [ + {'text': 'STOP', 'bbox': [50, 100, 150, 150]}, + {'text': 'MAIN ST', 'bbox': [200, 100, 350, 150]} + ] + } + + return text_data + + async def _general_analysis(self, image_data: bytes) -> Dict[str, Any]: + """General image analysis""" + return { + 'brightness': 0.7, + 'contrast': 0.8, + 'sharpness': 0.75, + 'color_distribution': {'red': 0.3, 'green': 0.4, 'blue': 0.3}, + 'dominant_colors': ['blue', 'green', 'white'], + 'image_quality': 'good' + } + +class MockMultiModalAgent: + def __init__(self): + self.vision_processor = MockVisionProcessor() + self.integrated_results = {} + + async def process_multi_modal(self, inputs: Dict[str, Any]) -> Dict[str, Any]: + """Process multi-modal inputs""" + result_id = f"multi_{len(self.integrated_results)}" + + # Process different modalities + results = {} + + if 'image' in inputs: + results['vision'] = await self.vision_processor.process_image( + inputs['image'], + inputs.get('vision_processing_type', 'general') + ) + + if 'text' in inputs: + results['text'] = await self._process_text(inputs['text']) + + if 'sensor_data' in inputs: + results['sensor'] = await self._process_sensor_data(inputs['sensor_data']) + + # Integrate results + integrated_result = { + 'result_id': result_id, + 'modalities_processed': list(results.keys()), + 'integration': await self._integrate_modalities(results), + 'timestamp': datetime.utcnow().isoformat() + } + + self.integrated_results[result_id] = integrated_result + return integrated_result + + async def _process_text(self, text: str) -> Dict[str, Any]: + """Process text input""" + return { + 'text_length': len(text), + 'language': 'en', + 'sentiment': 'neutral', + 'entities': [], + 'keywords': text.split()[:5] + } + + async def _process_sensor_data(self, sensor_data: Dict[str, Any]) -> Dict[str, Any]: + """Process sensor data""" + return { + 'sensor_type': sensor_data.get('type', 'unknown'), + 'readings': sensor_data.get('readings', {}), + 'timestamp': sensor_data.get('timestamp', datetime.utcnow().isoformat()), + 'quality': 'good' + } + + async def _integrate_modalities(self, results: Dict[str, Any]) -> Dict[str, Any]: + """Integrate results from different modalities""" + integration = { + 'confidence': 0.85, + 'completeness': len(results) / 3.0, # Assuming 3 modalities max + 'cross_modal_insights': [] + } + + # Add cross-modal insights + if 'vision' in results and 'text' in results: + if 'objects' in results['vision'].get('analysis', {}): + integration['cross_modal_insights'].append( + f"Visual context: {len(results['vision']['analysis']['objects'])} objects detected" + ) + + return integration + +class MockContextIntegration: + def __init__(self): + self.context_history = [] + self.context_models = {} + + async def integrate_context(self, vision_result: Dict[str, Any], context_data: Dict[str, Any]) -> Dict[str, Any]: + """Integrate vision results with context""" + context_id = f"ctx_{len(self.context_history)}" + + integration = { + 'context_id': context_id, + 'vision_result': vision_result, + 'context_data': context_data, + 'enhanced_understanding': await self._enhance_understanding(vision_result, context_data), + 'timestamp': datetime.utcnow().isoformat() + } + + self.context_history.append(integration) + return integration + + async def _enhance_understanding(self, vision_result: Dict[str, Any], context_data: Dict[str, Any]) -> Dict[str, Any]: + """Enhance understanding with context""" + enhanced = { + 'scene_understanding': vision_result.get('analysis', {}), + 'contextual_insights': [], + 'confidence_boost': 0.0 + } + + # Add contextual insights + if context_data.get('location') == 'intersection': + enhanced['contextual_insights'].append("Traffic monitoring context") + enhanced['confidence_boost'] += 0.1 + + if context_data.get('time_of_day') == 'night': + enhanced['contextual_insights'].append("Low light conditions detected") + enhanced['confidence_boost'] -= 0.05 + + return enhanced + +class TestVisionProcessor: + """Test vision processing functionality""" + + def setup_method(self): + self.vision_processor = MockVisionProcessor() + self.sample_image = b'sample_image_data_for_testing' + + @pytest.mark.asyncio + async def test_image_processing(self): + """Test basic image processing""" + result = await self.vision_processor.process_image(self.sample_image) + + assert result['image_id'].startswith('img_') + assert result['size'] == len(self.sample_image) + assert result['format'] == 'processed' + assert 'analysis' in result + assert 'timestamp' in result + + @pytest.mark.asyncio + async def test_object_detection(self): + """Test object detection functionality""" + result = await self.vision_processor.process_image(self.sample_image, 'object_detection') + + assert 'analysis' in result + assert 'objects_detected' in result['analysis'] + assert 'objects' in result['analysis'] + assert len(result['analysis']['objects']) > 0 + + # Check object structure + for obj in result['analysis']['objects']: + assert 'class' in obj + assert 'confidence' in obj + assert 'bbox' in obj + assert 0 <= obj['confidence'] <= 1 + + @pytest.mark.asyncio + async def test_scene_analysis(self): + """Test scene analysis functionality""" + result = await self.vision_processor.process_image(self.sample_image, 'scene_analysis') + + assert 'analysis' in result + assert 'scene_type' in result['analysis'] + assert 'confidence' in result['analysis'] + assert 'elements' in result['analysis'] + + assert result['analysis']['scene_type'] == 'urban_street' + assert 0 <= result['analysis']['confidence'] <= 1 + + @pytest.mark.asyncio + async def test_text_extraction(self): + """Test text extraction (OCR) functionality""" + result = await self.vision_processor.process_image(self.sample_image, 'text_extraction') + + assert 'analysis' in result + assert 'text_found' in result['analysis'] + assert 'extracted_text' in result['analysis'] + + if result['analysis']['text_found']: + assert len(result['analysis']['extracted_text']) > 0 + assert 'confidence' in result['analysis'] + + @pytest.mark.asyncio + async def test_general_analysis(self): + """Test general image analysis""" + result = await self.vision_processor.process_image(self.sample_image, 'general') + + assert 'analysis' in result + assert 'brightness' in result['analysis'] + assert 'contrast' in result['analysis'] + assert 'sharpness' in result['analysis'] + assert 'color_distribution' in result['analysis'] + + # Check value ranges + assert 0 <= result['analysis']['brightness'] <= 1 + assert 0 <= result['analysis']['contrast'] <= 1 + assert 0 <= result['analysis']['sharpness'] <= 1 + +class TestMultiModalIntegration: + """Test multi-modal integration""" + + def setup_method(self): + self.multi_modal_agent = MockMultiModalAgent() + self.sample_image = b'sample_image_data' + self.sample_text = "This is a sample text for testing" + self.sample_sensor_data = { + 'type': 'temperature', + 'readings': {'value': 25.5, 'unit': 'celsius'}, + 'timestamp': datetime.utcnow().isoformat() + } + + @pytest.mark.asyncio + async def test_vision_only_processing(self): + """Test processing with only vision input""" + inputs = {'image': self.sample_image} + + result = await self.multi_modal_agent.process_multi_modal(inputs) + + assert result['result_id'].startswith('multi_') + assert 'vision' in result['modalities_processed'] + assert 'integration' in result + assert 'confidence' in result['integration'] + + @pytest.mark.asyncio + async def test_text_only_processing(self): + """Test processing with only text input""" + inputs = {'text': self.sample_text} + + result = await self.multi_modal_agent.process_multi_modal(inputs) + + assert result['result_id'].startswith('multi_') + assert 'text' in result['modalities_processed'] + assert 'integration' in result + + @pytest.mark.asyncio + async def test_sensor_only_processing(self): + """Test processing with only sensor input""" + inputs = {'sensor_data': self.sample_sensor_data} + + result = await self.multi_modal_agent.process_multi_modal(inputs) + + assert result['result_id'].startswith('multi_') + assert 'sensor' in result['modalities_processed'] + assert 'integration' in result + + @pytest.mark.asyncio + async def test_full_multi_modal_processing(self): + """Test processing with all modalities""" + inputs = { + 'image': self.sample_image, + 'text': self.sample_text, + 'sensor_data': self.sample_sensor_data + } + + result = await self.multi_modal_agent.process_multi_modal(inputs) + + assert result['result_id'].startswith('multi_') + assert len(result['modalities_processed']) == 3 + assert 'vision' in result['modalities_processed'] + assert 'text' in result['modalities_processed'] + assert 'sensor' in result['modalities_processed'] + assert 'integration' in result + assert 'cross_modal_insights' in result['integration'] + + @pytest.mark.asyncio + async def test_cross_modal_insights(self): + """Test cross-modal insight generation""" + inputs = { + 'image': self.sample_image, + 'text': self.sample_text, + 'vision_processing_type': 'object_detection' + } + + result = await self.multi_modal_agent.process_multi_modal(inputs) + + assert 'cross_modal_insights' in result['integration'] + assert len(result['integration']['cross_modal_insights']) > 0 + +class TestContextIntegration: + """Test context integration with vision""" + + def setup_method(self): + self.context_integration = MockContextIntegration() + self.vision_processor = MockVisionProcessor() + self.sample_image = b'sample_image_data' + + @pytest.mark.asyncio + async def test_basic_context_integration(self): + """Test basic context integration""" + vision_result = await self.vision_processor.process_image(self.sample_image) + context_data = { + 'location': 'intersection', + 'time_of_day': 'daytime', + 'weather': 'clear' + } + + result = await self.context_integration.integrate_context(vision_result, context_data) + + assert result['context_id'].startswith('ctx_') + assert 'vision_result' in result + assert 'context_data' in result + assert 'enhanced_understanding' in result + + @pytest.mark.asyncio + async def test_location_context(self): + """Test location-based context integration""" + vision_result = await self.vision_processor.process_image(self.sample_image, 'object_detection') + context_data = { + 'location': 'intersection', + 'traffic_flow': 'moderate' + } + + result = await self.context_integration.integrate_context(vision_result, context_data) + + assert 'enhanced_understanding' in result + assert 'contextual_insights' in result['enhanced_understanding'] + assert any('traffic' in insight for insight in result['enhanced_understanding']['contextual_insights']) + + @pytest.mark.asyncio + async def test_time_context(self): + """Test time-based context integration""" + vision_result = await self.vision_processor.process_image(self.sample_image) + context_data = { + 'time_of_day': 'night', + 'lighting_conditions': 'low' + } + + result = await self.context_integration.integrate_context(vision_result, context_data) + + assert 'enhanced_understanding' in result + assert 'confidence_boost' in result['enhanced_understanding'] + assert result['enhanced_understanding']['confidence_boost'] < 0 # Night time penalty + + @pytest.mark.asyncio + async def test_context_history_tracking(self): + """Test context history tracking""" + for i in range(3): + vision_result = await self.vision_processor.process_image(self.sample_image) + context_data = { + 'location': f'location_{i}', + 'timestamp': datetime.utcnow().isoformat() + } + await self.context_integration.integrate_context(vision_result, context_data) + + assert len(self.context_integration.context_history) == 3 + for context in self.context_integration.context_history: + assert context['context_id'].startswith('ctx_') + +class TestVisualReasoning: + """Test visual reasoning capabilities""" + + def setup_method(self): + self.vision_processor = MockVisionProcessor() + self.multi_modal_agent = MockMultiModalAgent() + self.sample_image = b'sample_image_data' + + @pytest.mark.asyncio + async def test_visual_scene_understanding(self): + """Test visual scene understanding""" + result = await self.vision_processor.process_image(self.sample_image, 'scene_analysis') + + assert 'analysis' in result + assert 'scene_type' in result['analysis'] + assert 'elements' in result['analysis'] + assert 'complexity' in result['analysis'] + + # Verify scene understanding + scene = result['analysis'] + assert len(scene['elements']) > 0 + assert scene['complexity'] in ['low', 'medium', 'high'] + + @pytest.mark.asyncio + async def test_object_relationships(self): + """Test understanding object relationships""" + result = await self.vision_processor.process_image(self.sample_image, 'object_detection') + + assert 'analysis' in result + assert 'objects' in result['analysis'] + + objects = result['analysis']['objects'] + if len(objects) > 1: + # Mock relationship analysis + relationships = [] + for i, obj1 in enumerate(objects): + for obj2 in objects[i+1:]: + if obj1['class'] == 'person' and obj2['class'] == 'car': + relationships.append('person_near_car') + + assert len(relationships) >= 0 + + @pytest.mark.asyncio + async def test_spatial_reasoning(self): + """Test spatial reasoning""" + result = await self.vision_processor.process_image(self.sample_image, 'object_detection') + + assert 'analysis' in result + assert 'objects' in result['analysis'] + + objects = result['analysis']['objects'] + for obj in objects: + assert 'bbox' in obj + assert len(obj['bbox']) == 4 # [x1, y1, x2, y2] + + # Verify bbox coordinates + x1, y1, x2, y2 = obj['bbox'] + assert x2 > x1 + assert y2 > y1 + + @pytest.mark.asyncio + async def test_temporal_reasoning(self): + """Test temporal reasoning (changes over time)""" + # Simulate processing multiple images over time + results = [] + for i in range(3): + result = await self.vision_processor.process_image(self.sample_image) + results.append(result) + await asyncio.sleep(0.01) # Small delay + + # Analyze temporal changes + if len(results) > 1: + # Mock temporal analysis + changes = [] + for i in range(1, len(results)): + if results[i]['analysis'] != results[i-1]['analysis']: + changes.append(f"Change detected at step {i}") + + # Should have some analysis of changes + assert len(results) == 3 + +class TestPerformanceMetrics: + """Test performance metrics for vision processing""" + + def setup_method(self): + self.vision_processor = MockVisionProcessor() + self.sample_image = b'sample_image_data' + + @pytest.mark.asyncio + async def test_processing_speed(self): + """Test image processing speed""" + start_time = datetime.utcnow() + + result = await self.vision_processor.process_image(self.sample_image) + + end_time = datetime.utcnow() + processing_time = (end_time - start_time).total_seconds() + + assert processing_time < 2.0 # Should process within 2 seconds + assert result['image_id'].startswith('img_') + + @pytest.mark.asyncio + async def test_batch_processing(self): + """Test batch image processing""" + images = [self.sample_image] * 5 + + start_time = datetime.utcnow() + results = [] + for image in images: + result = await self.vision_processor.process_image(image) + results.append(result) + end_time = datetime.utcnow() + + total_time = (end_time - start_time).total_seconds() + avg_time = total_time / len(images) + + assert len(results) == 5 + assert avg_time < 1.0 # Average should be under 1 second per image + + @pytest.mark.asyncio + async def test_memory_usage(self): + """Test memory usage during processing""" + import psutil + import os + + process = psutil.Process(os.getpid()) + memory_before = process.memory_info().rss + + # Process multiple images + for i in range(10): + await self.vision_processor.process_image(self.sample_image) + + memory_after = process.memory_info().rss + memory_increase = memory_after - memory_before + + # Memory increase should be reasonable (less than 100MB) + assert memory_increase < 100 * 1024 * 1024 # 100MB in bytes + +# Integration tests +class TestVisionIntegration: + """Integration tests for vision system""" + + @pytest.mark.asyncio + async def test_end_to_end_vision_pipeline(self): + """Test complete vision processing pipeline""" + vision_processor = MockVisionProcessor() + multi_modal_agent = MockMultiModalAgent() + context_integration = MockContextIntegration() + + # Step 1: Process image with object detection + image_result = await vision_processor.process_image(b'test_image', 'object_detection') + + # Step 2: Integrate with context + context_data = { + 'location': 'urban_intersection', + 'time': 'daytime', + 'purpose': 'traffic_monitoring' + } + + context_result = await context_integration.integrate_context(image_result, context_data) + + # Step 3: Multi-modal processing + multi_modal_inputs = { + 'image': b'test_image', + 'text': 'Traffic monitoring report', + 'sensor_data': {'type': 'camera', 'status': 'active'} + } + + multi_modal_result = await multi_modal_agent.process_multi_modal(multi_modal_inputs) + + # Verify pipeline + assert image_result['image_id'].startswith('img_') + assert context_result['context_id'].startswith('ctx_') + assert multi_modal_result['result_id'].startswith('multi_') + assert 'objects' in image_result['analysis'] + assert 'enhanced_understanding' in context_result + assert len(multi_modal_result['modalities_processed']) == 3 + + @pytest.mark.asyncio + async def test_real_time_vision_processing(self): + """Test real-time vision processing capabilities""" + vision_processor = MockVisionProcessor() + + # Simulate real-time processing + processing_times = [] + for i in range(10): + start_time = datetime.utcnow() + await vision_processor.process_image(f'frame_{i}'.encode()) + end_time = datetime.utcnow() + processing_times.append((end_time - start_time).total_seconds()) + + avg_time = sum(processing_times) / len(processing_times) + max_time = max(processing_times) + + # Real-time constraints + assert avg_time < 0.5 # Average under 500ms + assert max_time < 1.0 # Max under 1 second + assert len(processing_times) == 10 + +if __name__ == '__main__': + pytest.main([__file__]) diff --git a/tests/run_all_phase_tests.py b/tests/run_all_phase_tests.py new file mode 100644 index 00000000..5a8ca987 --- /dev/null +++ b/tests/run_all_phase_tests.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +""" +Run all phase tests for agent systems implementation +""" + +import subprocess +import sys +import os +from pathlib import Path + +def run_phase_tests(): + """Run tests for all phases""" + base_dir = Path(__file__).parent + phases = ['phase1', 'phase2', 'phase3', 'phase4', 'phase5'] + + results = {} + + for phase in phases: + phase_dir = base_dir / phase + print(f"\n{'='*60}") + print(f"Running {phase.upper()} Tests") + print(f"{'='*60}") + + if not phase_dir.exists(): + print(f"āŒ {phase} directory not found") + results[phase] = {'status': 'skipped', 'reason': 'directory_not_found'} + continue + + # Find test files + test_files = list(phase_dir.glob('test_*.py')) + + if not test_files: + print(f"āŒ No test files found in {phase}") + results[phase] = {'status': 'skipped', 'reason': 'no_test_files'} + continue + + # Run tests for this phase + phase_results = {} + for test_file in test_files: + print(f"\nšŸ”¹ Running {test_file.name}") + try: + result = subprocess.run([ + sys.executable, '-m', 'pytest', + str(test_file), + '-v', + '--tb=short' + ], capture_output=True, text=True, cwd=base_dir) + + phase_results[test_file.name] = { + 'returncode': result.returncode, + 'stdout': result.stdout, + 'stderr': result.stderr + } + + if result.returncode == 0: + print(f"āœ… {test_file.name} - PASSED") + else: + print(f"āŒ {test_file.name} - FAILED") + print(f"Error: {result.stderr}") + + except Exception as e: + print(f"āŒ Error running {test_file.name}: {e}") + phase_results[test_file.name] = { + 'returncode': -1, + 'stdout': '', + 'stderr': str(e) + } + + results[phase] = { + 'status': 'completed', + 'tests': phase_results, + 'total_tests': len(test_files) + } + + # Print summary + print(f"\n{'='*60}") + print("PHASE TEST SUMMARY") + print(f"{'='*60}") + + total_phases = len(phases) + completed_phases = sum(1 for phase in results.values() if phase['status'] == 'completed') + skipped_phases = sum(1 for phase in results.values() if phase['status'] == 'skipped') + + print(f"Total Phases: {total_phases}") + print(f"Completed: {completed_phases}") + print(f"Skipped: {skipped_phases}") + + for phase, result in results.items(): + print(f"\n{phase.upper()}:") + if result['status'] == 'completed': + passed = sum(1 for test in result['tests'].values() if test['returncode'] == 0) + failed = sum(1 for test in result['tests'].values() if test['returncode'] != 0) + print(f" Tests: {result['total_tests']} (āœ… {passed}, āŒ {failed})") + else: + print(f" Status: {result['status']} ({result.get('reason', 'unknown')})") + + return results + +if __name__ == '__main__': + run_phase_tests()