✅ Phase 3: Decision Framework Tests - Decision engine functionality tests - Voting system tests (majority, weighted, unanimous) - Consensus algorithm tests - Agent lifecycle management tests - Integration tests for decision processes ✅ Phase 4: Autonomous Decision Making Tests - Autonomous decision engine tests - Learning system tests (experience-based learning) - Policy engine tests (compliance evaluation) - Self-correction mechanism tests - Goal-oriented behavior tests - Full autonomous cycle integration tests ✅ Phase 5: Computer Vision Integration Tests - Vision processor tests (object detection, scene analysis, OCR) - Multi-modal integration tests - Context integration tests - Visual reasoning tests (spatial, temporal) - Performance metrics tests - End-to-end vision pipeline tests ✅ Test Infrastructure - Comprehensive test runner for all phases - Mock implementations for testing - Performance testing capabilities - Integration test coverage - Phase-based test organization 🚀 All Phase Tests Now Implemented and Ready for Execution!
642 lines
24 KiB
Python
642 lines
24 KiB
Python
"""
|
|
Phase 5: Computer Vision Integration Tests
|
|
Tests for visual intelligence, image processing, and multi-modal integration
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
import json
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
from unittest.mock import Mock, AsyncMock
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
|
|
# Mock imports for testing
|
|
class MockVisionProcessor:
|
|
def __init__(self):
|
|
self.processed_images = {}
|
|
self.detection_results = {}
|
|
self.analysis_results = {}
|
|
|
|
async def process_image(self, image_data: bytes, processing_type: str = 'general') -> Dict[str, Any]:
|
|
"""Process image data"""
|
|
image_id = f"img_{len(self.processed_images)}"
|
|
result = {
|
|
'image_id': image_id,
|
|
'processing_type': processing_type,
|
|
'size': len(image_data),
|
|
'format': 'processed',
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'analysis': await self._analyze_image(image_data, processing_type)
|
|
}
|
|
self.processed_images[image_id] = result
|
|
return result
|
|
|
|
async def _analyze_image(self, image_data: bytes, processing_type: str) -> Dict[str, Any]:
|
|
"""Analyze image based on processing type"""
|
|
if processing_type == 'object_detection':
|
|
return await self._detect_objects(image_data)
|
|
elif processing_type == 'scene_analysis':
|
|
return await self._analyze_scene(image_data)
|
|
elif processing_type == 'text_extraction':
|
|
return await self._extract_text(image_data)
|
|
else:
|
|
return await self._general_analysis(image_data)
|
|
|
|
async def _detect_objects(self, image_data: bytes) -> Dict[str, Any]:
|
|
"""Detect objects in image"""
|
|
# Mock object detection
|
|
objects = [
|
|
{'class': 'person', 'confidence': 0.92, 'bbox': [100, 150, 200, 300]},
|
|
{'class': 'car', 'confidence': 0.87, 'bbox': [300, 200, 500, 350]},
|
|
{'class': 'building', 'confidence': 0.95, 'bbox': [0, 0, 600, 400]}
|
|
]
|
|
|
|
self.detection_results[f"detection_{len(self.detection_results)}"] = objects
|
|
|
|
return {
|
|
'objects_detected': len(objects),
|
|
'objects': objects,
|
|
'detection_confidence': sum(obj['confidence'] for obj in objects) / len(objects)
|
|
}
|
|
|
|
async def _analyze_scene(self, image_data: bytes) -> Dict[str, Any]:
|
|
"""Analyze scene context"""
|
|
# Mock scene analysis
|
|
scene_info = {
|
|
'scene_type': 'urban_street',
|
|
'confidence': 0.88,
|
|
'elements': ['vehicles', 'pedestrians', 'buildings'],
|
|
'weather': 'clear',
|
|
'time_of_day': 'daytime',
|
|
'complexity': 'medium'
|
|
}
|
|
|
|
return scene_info
|
|
|
|
async def _extract_text(self, image_data: bytes) -> Dict[str, Any]:
|
|
"""Extract text from image"""
|
|
# Mock OCR
|
|
text_data = {
|
|
'text_found': True,
|
|
'extracted_text': ['STOP', 'MAIN ST', 'NO PARKING'],
|
|
'confidence': 0.91,
|
|
'language': 'en',
|
|
'text_regions': [
|
|
{'text': 'STOP', 'bbox': [50, 100, 150, 150]},
|
|
{'text': 'MAIN ST', 'bbox': [200, 100, 350, 150]}
|
|
]
|
|
}
|
|
|
|
return text_data
|
|
|
|
async def _general_analysis(self, image_data: bytes) -> Dict[str, Any]:
|
|
"""General image analysis"""
|
|
return {
|
|
'brightness': 0.7,
|
|
'contrast': 0.8,
|
|
'sharpness': 0.75,
|
|
'color_distribution': {'red': 0.3, 'green': 0.4, 'blue': 0.3},
|
|
'dominant_colors': ['blue', 'green', 'white'],
|
|
'image_quality': 'good'
|
|
}
|
|
|
|
class MockMultiModalAgent:
|
|
def __init__(self):
|
|
self.vision_processor = MockVisionProcessor()
|
|
self.integrated_results = {}
|
|
|
|
async def process_multi_modal(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Process multi-modal inputs"""
|
|
result_id = f"multi_{len(self.integrated_results)}"
|
|
|
|
# Process different modalities
|
|
results = {}
|
|
|
|
if 'image' in inputs:
|
|
results['vision'] = await self.vision_processor.process_image(
|
|
inputs['image'],
|
|
inputs.get('vision_processing_type', 'general')
|
|
)
|
|
|
|
if 'text' in inputs:
|
|
results['text'] = await self._process_text(inputs['text'])
|
|
|
|
if 'sensor_data' in inputs:
|
|
results['sensor'] = await self._process_sensor_data(inputs['sensor_data'])
|
|
|
|
# Integrate results
|
|
integrated_result = {
|
|
'result_id': result_id,
|
|
'modalities_processed': list(results.keys()),
|
|
'integration': await self._integrate_modalities(results),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
self.integrated_results[result_id] = integrated_result
|
|
return integrated_result
|
|
|
|
async def _process_text(self, text: str) -> Dict[str, Any]:
|
|
"""Process text input"""
|
|
return {
|
|
'text_length': len(text),
|
|
'language': 'en',
|
|
'sentiment': 'neutral',
|
|
'entities': [],
|
|
'keywords': text.split()[:5]
|
|
}
|
|
|
|
async def _process_sensor_data(self, sensor_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Process sensor data"""
|
|
return {
|
|
'sensor_type': sensor_data.get('type', 'unknown'),
|
|
'readings': sensor_data.get('readings', {}),
|
|
'timestamp': sensor_data.get('timestamp', datetime.utcnow().isoformat()),
|
|
'quality': 'good'
|
|
}
|
|
|
|
async def _integrate_modalities(self, results: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Integrate results from different modalities"""
|
|
integration = {
|
|
'confidence': 0.85,
|
|
'completeness': len(results) / 3.0, # Assuming 3 modalities max
|
|
'cross_modal_insights': []
|
|
}
|
|
|
|
# Add cross-modal insights
|
|
if 'vision' in results and 'text' in results:
|
|
if 'objects' in results['vision'].get('analysis', {}):
|
|
integration['cross_modal_insights'].append(
|
|
f"Visual context: {len(results['vision']['analysis']['objects'])} objects detected"
|
|
)
|
|
|
|
return integration
|
|
|
|
class MockContextIntegration:
|
|
def __init__(self):
|
|
self.context_history = []
|
|
self.context_models = {}
|
|
|
|
async def integrate_context(self, vision_result: Dict[str, Any], context_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Integrate vision results with context"""
|
|
context_id = f"ctx_{len(self.context_history)}"
|
|
|
|
integration = {
|
|
'context_id': context_id,
|
|
'vision_result': vision_result,
|
|
'context_data': context_data,
|
|
'enhanced_understanding': await self._enhance_understanding(vision_result, context_data),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
self.context_history.append(integration)
|
|
return integration
|
|
|
|
async def _enhance_understanding(self, vision_result: Dict[str, Any], context_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Enhance understanding with context"""
|
|
enhanced = {
|
|
'scene_understanding': vision_result.get('analysis', {}),
|
|
'contextual_insights': [],
|
|
'confidence_boost': 0.0
|
|
}
|
|
|
|
# Add contextual insights
|
|
if context_data.get('location') == 'intersection':
|
|
enhanced['contextual_insights'].append("Traffic monitoring context")
|
|
enhanced['confidence_boost'] += 0.1
|
|
|
|
if context_data.get('time_of_day') == 'night':
|
|
enhanced['contextual_insights'].append("Low light conditions detected")
|
|
enhanced['confidence_boost'] -= 0.05
|
|
|
|
return enhanced
|
|
|
|
class TestVisionProcessor:
|
|
"""Test vision processing functionality"""
|
|
|
|
def setup_method(self):
|
|
self.vision_processor = MockVisionProcessor()
|
|
self.sample_image = b'sample_image_data_for_testing'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_image_processing(self):
|
|
"""Test basic image processing"""
|
|
result = await self.vision_processor.process_image(self.sample_image)
|
|
|
|
assert result['image_id'].startswith('img_')
|
|
assert result['size'] == len(self.sample_image)
|
|
assert result['format'] == 'processed'
|
|
assert 'analysis' in result
|
|
assert 'timestamp' in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_object_detection(self):
|
|
"""Test object detection functionality"""
|
|
result = await self.vision_processor.process_image(self.sample_image, 'object_detection')
|
|
|
|
assert 'analysis' in result
|
|
assert 'objects_detected' in result['analysis']
|
|
assert 'objects' in result['analysis']
|
|
assert len(result['analysis']['objects']) > 0
|
|
|
|
# Check object structure
|
|
for obj in result['analysis']['objects']:
|
|
assert 'class' in obj
|
|
assert 'confidence' in obj
|
|
assert 'bbox' in obj
|
|
assert 0 <= obj['confidence'] <= 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scene_analysis(self):
|
|
"""Test scene analysis functionality"""
|
|
result = await self.vision_processor.process_image(self.sample_image, 'scene_analysis')
|
|
|
|
assert 'analysis' in result
|
|
assert 'scene_type' in result['analysis']
|
|
assert 'confidence' in result['analysis']
|
|
assert 'elements' in result['analysis']
|
|
|
|
assert result['analysis']['scene_type'] == 'urban_street'
|
|
assert 0 <= result['analysis']['confidence'] <= 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_text_extraction(self):
|
|
"""Test text extraction (OCR) functionality"""
|
|
result = await self.vision_processor.process_image(self.sample_image, 'text_extraction')
|
|
|
|
assert 'analysis' in result
|
|
assert 'text_found' in result['analysis']
|
|
assert 'extracted_text' in result['analysis']
|
|
|
|
if result['analysis']['text_found']:
|
|
assert len(result['analysis']['extracted_text']) > 0
|
|
assert 'confidence' in result['analysis']
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_general_analysis(self):
|
|
"""Test general image analysis"""
|
|
result = await self.vision_processor.process_image(self.sample_image, 'general')
|
|
|
|
assert 'analysis' in result
|
|
assert 'brightness' in result['analysis']
|
|
assert 'contrast' in result['analysis']
|
|
assert 'sharpness' in result['analysis']
|
|
assert 'color_distribution' in result['analysis']
|
|
|
|
# Check value ranges
|
|
assert 0 <= result['analysis']['brightness'] <= 1
|
|
assert 0 <= result['analysis']['contrast'] <= 1
|
|
assert 0 <= result['analysis']['sharpness'] <= 1
|
|
|
|
class TestMultiModalIntegration:
|
|
"""Test multi-modal integration"""
|
|
|
|
def setup_method(self):
|
|
self.multi_modal_agent = MockMultiModalAgent()
|
|
self.sample_image = b'sample_image_data'
|
|
self.sample_text = "This is a sample text for testing"
|
|
self.sample_sensor_data = {
|
|
'type': 'temperature',
|
|
'readings': {'value': 25.5, 'unit': 'celsius'},
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vision_only_processing(self):
|
|
"""Test processing with only vision input"""
|
|
inputs = {'image': self.sample_image}
|
|
|
|
result = await self.multi_modal_agent.process_multi_modal(inputs)
|
|
|
|
assert result['result_id'].startswith('multi_')
|
|
assert 'vision' in result['modalities_processed']
|
|
assert 'integration' in result
|
|
assert 'confidence' in result['integration']
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_text_only_processing(self):
|
|
"""Test processing with only text input"""
|
|
inputs = {'text': self.sample_text}
|
|
|
|
result = await self.multi_modal_agent.process_multi_modal(inputs)
|
|
|
|
assert result['result_id'].startswith('multi_')
|
|
assert 'text' in result['modalities_processed']
|
|
assert 'integration' in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sensor_only_processing(self):
|
|
"""Test processing with only sensor input"""
|
|
inputs = {'sensor_data': self.sample_sensor_data}
|
|
|
|
result = await self.multi_modal_agent.process_multi_modal(inputs)
|
|
|
|
assert result['result_id'].startswith('multi_')
|
|
assert 'sensor' in result['modalities_processed']
|
|
assert 'integration' in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_multi_modal_processing(self):
|
|
"""Test processing with all modalities"""
|
|
inputs = {
|
|
'image': self.sample_image,
|
|
'text': self.sample_text,
|
|
'sensor_data': self.sample_sensor_data
|
|
}
|
|
|
|
result = await self.multi_modal_agent.process_multi_modal(inputs)
|
|
|
|
assert result['result_id'].startswith('multi_')
|
|
assert len(result['modalities_processed']) == 3
|
|
assert 'vision' in result['modalities_processed']
|
|
assert 'text' in result['modalities_processed']
|
|
assert 'sensor' in result['modalities_processed']
|
|
assert 'integration' in result
|
|
assert 'cross_modal_insights' in result['integration']
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cross_modal_insights(self):
|
|
"""Test cross-modal insight generation"""
|
|
inputs = {
|
|
'image': self.sample_image,
|
|
'text': self.sample_text,
|
|
'vision_processing_type': 'object_detection'
|
|
}
|
|
|
|
result = await self.multi_modal_agent.process_multi_modal(inputs)
|
|
|
|
assert 'cross_modal_insights' in result['integration']
|
|
assert len(result['integration']['cross_modal_insights']) > 0
|
|
|
|
class TestContextIntegration:
|
|
"""Test context integration with vision"""
|
|
|
|
def setup_method(self):
|
|
self.context_integration = MockContextIntegration()
|
|
self.vision_processor = MockVisionProcessor()
|
|
self.sample_image = b'sample_image_data'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_basic_context_integration(self):
|
|
"""Test basic context integration"""
|
|
vision_result = await self.vision_processor.process_image(self.sample_image)
|
|
context_data = {
|
|
'location': 'intersection',
|
|
'time_of_day': 'daytime',
|
|
'weather': 'clear'
|
|
}
|
|
|
|
result = await self.context_integration.integrate_context(vision_result, context_data)
|
|
|
|
assert result['context_id'].startswith('ctx_')
|
|
assert 'vision_result' in result
|
|
assert 'context_data' in result
|
|
assert 'enhanced_understanding' in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_location_context(self):
|
|
"""Test location-based context integration"""
|
|
vision_result = await self.vision_processor.process_image(self.sample_image, 'object_detection')
|
|
context_data = {
|
|
'location': 'intersection',
|
|
'traffic_flow': 'moderate'
|
|
}
|
|
|
|
result = await self.context_integration.integrate_context(vision_result, context_data)
|
|
|
|
assert 'enhanced_understanding' in result
|
|
assert 'contextual_insights' in result['enhanced_understanding']
|
|
assert any('traffic' in insight for insight in result['enhanced_understanding']['contextual_insights'])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_time_context(self):
|
|
"""Test time-based context integration"""
|
|
vision_result = await self.vision_processor.process_image(self.sample_image)
|
|
context_data = {
|
|
'time_of_day': 'night',
|
|
'lighting_conditions': 'low'
|
|
}
|
|
|
|
result = await self.context_integration.integrate_context(vision_result, context_data)
|
|
|
|
assert 'enhanced_understanding' in result
|
|
assert 'confidence_boost' in result['enhanced_understanding']
|
|
assert result['enhanced_understanding']['confidence_boost'] < 0 # Night time penalty
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_context_history_tracking(self):
|
|
"""Test context history tracking"""
|
|
for i in range(3):
|
|
vision_result = await self.vision_processor.process_image(self.sample_image)
|
|
context_data = {
|
|
'location': f'location_{i}',
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
await self.context_integration.integrate_context(vision_result, context_data)
|
|
|
|
assert len(self.context_integration.context_history) == 3
|
|
for context in self.context_integration.context_history:
|
|
assert context['context_id'].startswith('ctx_')
|
|
|
|
class TestVisualReasoning:
|
|
"""Test visual reasoning capabilities"""
|
|
|
|
def setup_method(self):
|
|
self.vision_processor = MockVisionProcessor()
|
|
self.multi_modal_agent = MockMultiModalAgent()
|
|
self.sample_image = b'sample_image_data'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_visual_scene_understanding(self):
|
|
"""Test visual scene understanding"""
|
|
result = await self.vision_processor.process_image(self.sample_image, 'scene_analysis')
|
|
|
|
assert 'analysis' in result
|
|
assert 'scene_type' in result['analysis']
|
|
assert 'elements' in result['analysis']
|
|
assert 'complexity' in result['analysis']
|
|
|
|
# Verify scene understanding
|
|
scene = result['analysis']
|
|
assert len(scene['elements']) > 0
|
|
assert scene['complexity'] in ['low', 'medium', 'high']
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_object_relationships(self):
|
|
"""Test understanding object relationships"""
|
|
result = await self.vision_processor.process_image(self.sample_image, 'object_detection')
|
|
|
|
assert 'analysis' in result
|
|
assert 'objects' in result['analysis']
|
|
|
|
objects = result['analysis']['objects']
|
|
if len(objects) > 1:
|
|
# Mock relationship analysis
|
|
relationships = []
|
|
for i, obj1 in enumerate(objects):
|
|
for obj2 in objects[i+1:]:
|
|
if obj1['class'] == 'person' and obj2['class'] == 'car':
|
|
relationships.append('person_near_car')
|
|
|
|
assert len(relationships) >= 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_spatial_reasoning(self):
|
|
"""Test spatial reasoning"""
|
|
result = await self.vision_processor.process_image(self.sample_image, 'object_detection')
|
|
|
|
assert 'analysis' in result
|
|
assert 'objects' in result['analysis']
|
|
|
|
objects = result['analysis']['objects']
|
|
for obj in objects:
|
|
assert 'bbox' in obj
|
|
assert len(obj['bbox']) == 4 # [x1, y1, x2, y2]
|
|
|
|
# Verify bbox coordinates
|
|
x1, y1, x2, y2 = obj['bbox']
|
|
assert x2 > x1
|
|
assert y2 > y1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_temporal_reasoning(self):
|
|
"""Test temporal reasoning (changes over time)"""
|
|
# Simulate processing multiple images over time
|
|
results = []
|
|
for i in range(3):
|
|
result = await self.vision_processor.process_image(self.sample_image)
|
|
results.append(result)
|
|
await asyncio.sleep(0.01) # Small delay
|
|
|
|
# Analyze temporal changes
|
|
if len(results) > 1:
|
|
# Mock temporal analysis
|
|
changes = []
|
|
for i in range(1, len(results)):
|
|
if results[i]['analysis'] != results[i-1]['analysis']:
|
|
changes.append(f"Change detected at step {i}")
|
|
|
|
# Should have some analysis of changes
|
|
assert len(results) == 3
|
|
|
|
class TestPerformanceMetrics:
|
|
"""Test performance metrics for vision processing"""
|
|
|
|
def setup_method(self):
|
|
self.vision_processor = MockVisionProcessor()
|
|
self.sample_image = b'sample_image_data'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_processing_speed(self):
|
|
"""Test image processing speed"""
|
|
start_time = datetime.utcnow()
|
|
|
|
result = await self.vision_processor.process_image(self.sample_image)
|
|
|
|
end_time = datetime.utcnow()
|
|
processing_time = (end_time - start_time).total_seconds()
|
|
|
|
assert processing_time < 2.0 # Should process within 2 seconds
|
|
assert result['image_id'].startswith('img_')
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_batch_processing(self):
|
|
"""Test batch image processing"""
|
|
images = [self.sample_image] * 5
|
|
|
|
start_time = datetime.utcnow()
|
|
results = []
|
|
for image in images:
|
|
result = await self.vision_processor.process_image(image)
|
|
results.append(result)
|
|
end_time = datetime.utcnow()
|
|
|
|
total_time = (end_time - start_time).total_seconds()
|
|
avg_time = total_time / len(images)
|
|
|
|
assert len(results) == 5
|
|
assert avg_time < 1.0 # Average should be under 1 second per image
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_memory_usage(self):
|
|
"""Test memory usage during processing"""
|
|
import psutil
|
|
import os
|
|
|
|
process = psutil.Process(os.getpid())
|
|
memory_before = process.memory_info().rss
|
|
|
|
# Process multiple images
|
|
for i in range(10):
|
|
await self.vision_processor.process_image(self.sample_image)
|
|
|
|
memory_after = process.memory_info().rss
|
|
memory_increase = memory_after - memory_before
|
|
|
|
# Memory increase should be reasonable (less than 100MB)
|
|
assert memory_increase < 100 * 1024 * 1024 # 100MB in bytes
|
|
|
|
# Integration tests
|
|
class TestVisionIntegration:
|
|
"""Integration tests for vision system"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_end_to_end_vision_pipeline(self):
|
|
"""Test complete vision processing pipeline"""
|
|
vision_processor = MockVisionProcessor()
|
|
multi_modal_agent = MockMultiModalAgent()
|
|
context_integration = MockContextIntegration()
|
|
|
|
# Step 1: Process image with object detection
|
|
image_result = await vision_processor.process_image(b'test_image', 'object_detection')
|
|
|
|
# Step 2: Integrate with context
|
|
context_data = {
|
|
'location': 'urban_intersection',
|
|
'time': 'daytime',
|
|
'purpose': 'traffic_monitoring'
|
|
}
|
|
|
|
context_result = await context_integration.integrate_context(image_result, context_data)
|
|
|
|
# Step 3: Multi-modal processing
|
|
multi_modal_inputs = {
|
|
'image': b'test_image',
|
|
'text': 'Traffic monitoring report',
|
|
'sensor_data': {'type': 'camera', 'status': 'active'}
|
|
}
|
|
|
|
multi_modal_result = await multi_modal_agent.process_multi_modal(multi_modal_inputs)
|
|
|
|
# Verify pipeline
|
|
assert image_result['image_id'].startswith('img_')
|
|
assert context_result['context_id'].startswith('ctx_')
|
|
assert multi_modal_result['result_id'].startswith('multi_')
|
|
assert 'objects' in image_result['analysis']
|
|
assert 'enhanced_understanding' in context_result
|
|
assert len(multi_modal_result['modalities_processed']) == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_real_time_vision_processing(self):
|
|
"""Test real-time vision processing capabilities"""
|
|
vision_processor = MockVisionProcessor()
|
|
|
|
# Simulate real-time processing
|
|
processing_times = []
|
|
for i in range(10):
|
|
start_time = datetime.utcnow()
|
|
await vision_processor.process_image(f'frame_{i}'.encode())
|
|
end_time = datetime.utcnow()
|
|
processing_times.append((end_time - start_time).total_seconds())
|
|
|
|
avg_time = sum(processing_times) / len(processing_times)
|
|
max_time = max(processing_times)
|
|
|
|
# Real-time constraints
|
|
assert avg_time < 0.5 # Average under 500ms
|
|
assert max_time < 1.0 # Max under 1 second
|
|
assert len(processing_times) == 10
|
|
|
|
if __name__ == '__main__':
|
|
pytest.main([__file__])
|