- Restructure .env.example with security-focused documentation, service-specific environment file references, and AWS Secrets Manager integration - Update CLI tests workflow to single Python 3.13 version, add pytest-mock dependency, and consolidate test execution with coverage - Add comprehensive security validation to package publishing workflow with manual approval gates, secret scanning, and release
675 lines
24 KiB
Python
675 lines
24 KiB
Python
"""
|
|
Tests for Event-Driven Redis Cache System
|
|
|
|
Comprehensive test suite for distributed caching with event-driven invalidation
|
|
ensuring immediate propagation of GPU availability and pricing changes.
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
import json
|
|
import time
|
|
from unittest.mock import Mock, AsyncMock, patch
|
|
from datetime import datetime, timedelta
|
|
|
|
from aitbc_cache.event_driven_cache import (
|
|
EventDrivenCacheManager,
|
|
CacheEventType,
|
|
CacheEvent,
|
|
CacheConfig,
|
|
cached_result
|
|
)
|
|
|
|
from aitbc_cache.gpu_marketplace_cache import (
|
|
GPUMarketplaceCacheManager,
|
|
GPUInfo,
|
|
BookingInfo,
|
|
MarketStats,
|
|
init_marketplace_cache,
|
|
get_marketplace_cache
|
|
)
|
|
|
|
|
|
class TestEventDrivenCacheManager:
|
|
"""Test the core event-driven cache manager"""
|
|
|
|
@pytest.fixture
|
|
async def cache_manager(self):
|
|
"""Create a cache manager for testing"""
|
|
manager = EventDrivenCacheManager(
|
|
redis_url="redis://localhost:6379/1", # Use different DB for testing
|
|
node_id="test_node_123"
|
|
)
|
|
|
|
# Mock Redis connection for testing
|
|
with patch('redis.asyncio.Redis') as mock_redis:
|
|
mock_client = AsyncMock()
|
|
mock_redis.return_value = mock_client
|
|
|
|
# Mock ping response
|
|
mock_client.ping.return_value = True
|
|
|
|
# Mock pubsub
|
|
mock_pubsub = AsyncMock()
|
|
mock_client.pubsub.return_value = mock_pubsub
|
|
|
|
await manager.connect()
|
|
|
|
yield manager
|
|
|
|
await manager.disconnect()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_connection(self, cache_manager):
|
|
"""Test cache manager connection"""
|
|
assert cache_manager.is_connected is True
|
|
assert cache_manager.node_id == "test_node_123"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_set_and_get(self, cache_manager):
|
|
"""Test basic cache set and get operations"""
|
|
test_data = {"gpu_id": "gpu_123", "status": "available"}
|
|
|
|
# Set data
|
|
await cache_manager.set('gpu_availability', {'gpu_id': 'gpu_123'}, test_data)
|
|
|
|
# Get data
|
|
result = await cache_manager.get('gpu_availability', {'gpu_id': 'gpu_123'})
|
|
|
|
assert result is not None
|
|
assert result['gpu_id'] == 'gpu_123'
|
|
assert result['status'] == 'available'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_l1_cache_fallback(self, cache_manager):
|
|
"""Test L1 cache fallback when Redis is unavailable"""
|
|
test_data = {"message": "test data"}
|
|
|
|
# Mock Redis failure
|
|
cache_manager.redis_client = None
|
|
|
|
# Should still work with L1 cache
|
|
await cache_manager.set('test_cache', {'key': 'value'}, test_data)
|
|
result = await cache_manager.get('test_cache', {'key': 'value'})
|
|
|
|
assert result is not None
|
|
assert result['message'] == 'test data'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_invalidation(self, cache_manager):
|
|
"""Test cache invalidation"""
|
|
test_data = {"gpu_id": "gpu_456", "status": "busy"}
|
|
|
|
# Set data
|
|
await cache_manager.set('gpu_availability', {'gpu_id': 'gpu_456'}, test_data)
|
|
|
|
# Verify it's cached
|
|
result = await cache_manager.get('gpu_availability', {'gpu_id': 'gpu_456'})
|
|
assert result is not None
|
|
|
|
# Invalidate cache
|
|
await cache_manager.invalidate_cache('gpu_availability')
|
|
|
|
# Should be gone from L1 cache
|
|
assert len(cache_manager.l1_cache) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_event_publishing(self, cache_manager):
|
|
"""Test event publishing for cache invalidation"""
|
|
# Mock Redis publish
|
|
cache_manager.redis_client.publish = AsyncMock()
|
|
|
|
# Publish GPU availability change event
|
|
await cache_manager.notify_gpu_availability_change('gpu_789', 'offline')
|
|
|
|
# Verify event was published
|
|
cache_manager.redis_client.publish.assert_called_once()
|
|
|
|
# Check event data
|
|
call_args = cache_manager.redis_client.publish.call_args
|
|
event_data = json.loads(call_args[0][1])
|
|
|
|
assert event_data['event_type'] == 'gpu_availability_changed'
|
|
assert event_data['resource_id'] == 'gpu_789'
|
|
assert event_data['data']['gpu_id'] == 'gpu_789'
|
|
assert event_data['data']['status'] == 'offline'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_event_handling(self, cache_manager):
|
|
"""Test handling of incoming invalidation events"""
|
|
test_data = {"gpu_id": "gpu_event", "status": "available"}
|
|
|
|
# Set data in L1 cache
|
|
cache_key = cache_manager._generate_cache_key('gpu_avail', {'gpu_id': 'gpu_event'})
|
|
cache_manager.l1_cache[cache_key] = {
|
|
'data': test_data,
|
|
'expires_at': time.time() + 300
|
|
}
|
|
|
|
# Simulate incoming event
|
|
event_data = {
|
|
'event_type': 'gpu_availability_changed',
|
|
'resource_id': 'gpu_event',
|
|
'data': {'gpu_id': 'gpu_event', 'status': 'busy'},
|
|
'timestamp': time.time(),
|
|
'source_node': 'other_node',
|
|
'event_id': 'event_123',
|
|
'affected_namespaces': ['gpu_avail']
|
|
}
|
|
|
|
# Process event
|
|
await cache_manager._process_invalidation_event(event_data)
|
|
|
|
# L1 cache should be invalidated
|
|
assert cache_key not in cache_manager.l1_cache
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_statistics(self, cache_manager):
|
|
"""Test cache statistics tracking"""
|
|
# Perform some cache operations
|
|
await cache_manager.set('test_cache', {'key': 'value'}, {'data': 'test'})
|
|
await cache_manager.get('test_cache', {'key': 'value'})
|
|
await cache_manager.get('nonexistent_cache', {'key': 'value'})
|
|
|
|
stats = await cache_manager.get_cache_stats()
|
|
|
|
assert 'cache_hits' in stats
|
|
assert 'cache_misses' in stats
|
|
assert 'events_processed' in stats
|
|
assert 'l1_cache_size' in stats
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health_check(self, cache_manager):
|
|
"""Test cache health check"""
|
|
health = await cache_manager.health_check()
|
|
|
|
assert 'status' in health
|
|
assert 'redis_connected' in health
|
|
assert 'pubsub_active' in health
|
|
assert 'event_queue_size' in health
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cached_decorator(self, cache_manager):
|
|
"""Test the cached result decorator"""
|
|
call_count = 0
|
|
|
|
@cached_result('test_cache', ttl=60)
|
|
async def expensive_function(param1, param2):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
return f"result_{param1}_{param2}"
|
|
|
|
# First call should execute function
|
|
result1 = await expensive_function('a', 'b')
|
|
assert result1 == "result_a_b"
|
|
assert call_count == 1
|
|
|
|
# Second call should use cache
|
|
result2 = await expensive_function('a', 'b')
|
|
assert result2 == "result_a_b"
|
|
assert call_count == 1 # Should not increment
|
|
|
|
# Different parameters should execute function
|
|
result3 = await expensive_function('c', 'd')
|
|
assert result3 == "result_c_d"
|
|
assert call_count == 2
|
|
|
|
|
|
class TestGPUMarketplaceCacheManager:
|
|
"""Test the GPU marketplace cache manager"""
|
|
|
|
@pytest.fixture
|
|
async def marketplace_cache(self):
|
|
"""Create a marketplace cache manager for testing"""
|
|
# Mock cache manager
|
|
mock_cache_manager = AsyncMock()
|
|
mock_cache_manager.get = AsyncMock()
|
|
mock_cache_manager.set = AsyncMock()
|
|
mock_cache_manager.invalidate_cache = AsyncMock()
|
|
mock_cache_manager.notify_gpu_availability_change = AsyncMock()
|
|
mock_cache_manager.notify_pricing_update = AsyncMock()
|
|
mock_cache_manager.notify_booking_created = AsyncMock()
|
|
mock_cache_manager.notify_booking_cancelled = AsyncMock()
|
|
|
|
manager = GPUMarketplaceCacheManager(mock_cache_manager)
|
|
yield manager
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gpu_availability_caching(self, marketplace_cache):
|
|
"""Test GPU availability caching"""
|
|
gpus = [
|
|
GPUInfo(
|
|
gpu_id="gpu_001",
|
|
provider_id="provider_1",
|
|
gpu_type="RTX 3080",
|
|
memory_gb=10,
|
|
cuda_cores=8704,
|
|
base_price_per_hour=0.1,
|
|
current_price_per_hour=0.12,
|
|
availability_status="available",
|
|
region="us-east",
|
|
performance_score=95.0,
|
|
last_updated=datetime.utcnow()
|
|
),
|
|
GPUInfo(
|
|
gpu_id="gpu_002",
|
|
provider_id="provider_2",
|
|
gpu_type="RTX 3090",
|
|
memory_gb=24,
|
|
cuda_cores=10496,
|
|
base_price_per_hour=0.15,
|
|
current_price_per_hour=0.18,
|
|
availability_status="busy",
|
|
region="us-west",
|
|
performance_score=98.0,
|
|
last_updated=datetime.utcnow()
|
|
)
|
|
]
|
|
|
|
# Set GPU availability
|
|
await marketplace_cache.set_gpu_availability(gpus)
|
|
|
|
# Verify cache.set was called
|
|
assert marketplace_cache.cache.set.call_count > 0
|
|
|
|
# Test filtering
|
|
marketplace_cache.cache.get.return_value = [gpus[0].__dict__]
|
|
result = await marketplace_cache.get_gpu_availability(region="us-east")
|
|
|
|
assert len(result) == 1
|
|
assert result[0].gpu_id == "gpu_001"
|
|
assert result[0].region == "us-east"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gpu_status_update(self, marketplace_cache):
|
|
"""Test GPU status update with event notification"""
|
|
# Mock existing GPU
|
|
existing_gpu = GPUInfo(
|
|
gpu_id="gpu_003",
|
|
provider_id="provider_3",
|
|
gpu_type="A100",
|
|
memory_gb=40,
|
|
cuda_cores=6912,
|
|
base_price_per_hour=0.5,
|
|
current_price_per_hour=0.5,
|
|
availability_status="available",
|
|
region="eu-central",
|
|
performance_score=99.0,
|
|
last_updated=datetime.utcnow()
|
|
)
|
|
|
|
marketplace_cache.cache.get.return_value = [existing_gpu.__dict__]
|
|
|
|
# Update status
|
|
await marketplace_cache.update_gpu_status("gpu_003", "maintenance")
|
|
|
|
# Verify notification was sent
|
|
marketplace_cache.cache.notify_gpu_availability_change.assert_called_once_with(
|
|
"gpu_003", "maintenance"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dynamic_pricing(self, marketplace_cache):
|
|
"""Test dynamic pricing calculation"""
|
|
# Mock GPU data with low availability
|
|
gpus = [
|
|
GPUInfo(
|
|
gpu_id="gpu_004",
|
|
provider_id="provider_4",
|
|
gpu_type="RTX 3080",
|
|
memory_gb=10,
|
|
cuda_cores=8704,
|
|
base_price_per_hour=0.1,
|
|
current_price_per_hour=0.1,
|
|
availability_status="available",
|
|
region="us-east",
|
|
performance_score=95.0,
|
|
last_updated=datetime.utcnow()
|
|
)
|
|
# Only 1 GPU available (low availability scenario)
|
|
]
|
|
|
|
marketplace_cache.cache.get.return_value = [gpus[0].__dict__]
|
|
|
|
# Calculate dynamic pricing
|
|
price = await marketplace_cache.get_dynamic_pricing("gpu_004")
|
|
|
|
# Should be higher than base price due to low availability
|
|
assert price > gpus[0].base_price_per_hour
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_booking_creation(self, marketplace_cache):
|
|
"""Test booking creation with cache updates"""
|
|
booking = BookingInfo(
|
|
booking_id="booking_001",
|
|
gpu_id="gpu_005",
|
|
user_id="user_123",
|
|
start_time=datetime.utcnow(),
|
|
end_time=datetime.utcnow() + timedelta(hours=2),
|
|
status="active",
|
|
total_cost=0.2,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
|
|
# Mock GPU data
|
|
gpu = GPUInfo(
|
|
gpu_id="gpu_005",
|
|
provider_id="provider_5",
|
|
gpu_type="RTX 3080",
|
|
memory_gb=10,
|
|
cuda_cores=8704,
|
|
base_price_per_hour=0.1,
|
|
current_price_per_hour=0.1,
|
|
availability_status="available",
|
|
region="us-east",
|
|
performance_score=95.0,
|
|
last_updated=datetime.utcnow()
|
|
)
|
|
|
|
marketplace_cache.cache.get.return_value = [gpu.__dict__]
|
|
|
|
# Create booking
|
|
result = await marketplace_cache.create_booking(booking)
|
|
|
|
assert result is True
|
|
|
|
# Verify GPU status was updated
|
|
marketplace_cache.cache.notify_gpu_availability_change.assert_called()
|
|
|
|
# Verify booking event was published
|
|
marketplace_cache.cache.notify_booking_created.assert_called_with(
|
|
"booking_001", "gpu_005"
|
|
)
|
|
|
|
# Verify relevant caches were invalidated
|
|
marketplace_cache.cache.invalidate_cache.assert_any_call('order_book')
|
|
marketplace_cache.cache.invalidate_cache.assert_any_call('market_stats')
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_booking_cancellation(self, marketplace_cache):
|
|
"""Test booking cancellation with cache updates"""
|
|
# Mock GPU data
|
|
gpu = GPUInfo(
|
|
gpu_id="gpu_006",
|
|
provider_id="provider_6",
|
|
gpu_type="RTX 3090",
|
|
memory_gb=24,
|
|
cuda_cores=10496,
|
|
base_price_per_hour=0.15,
|
|
current_price_per_hour=0.15,
|
|
availability_status="busy",
|
|
region="us-west",
|
|
performance_score=98.0,
|
|
last_updated=datetime.utcnow()
|
|
)
|
|
|
|
marketplace_cache.cache.get.return_value = [gpu.__dict__]
|
|
|
|
# Cancel booking
|
|
result = await marketplace_cache.cancel_booking("booking_002", "gpu_006")
|
|
|
|
assert result is True
|
|
|
|
# Verify GPU status was updated to available
|
|
marketplace_cache.cache.notify_gpu_availability_change.assert_called()
|
|
|
|
# Verify cancellation event was published
|
|
marketplace_cache.cache.notify_booking_cancelled.assert_called_with(
|
|
"booking_002", "gpu_006"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_market_statistics(self, marketplace_cache):
|
|
"""Test market statistics calculation"""
|
|
# Mock GPU data
|
|
gpus = [
|
|
GPUInfo(
|
|
gpu_id="gpu_007",
|
|
provider_id="provider_7",
|
|
gpu_type="RTX 3080",
|
|
memory_gb=10,
|
|
cuda_cores=8704,
|
|
base_price_per_hour=0.1,
|
|
current_price_per_hour=0.12,
|
|
availability_status="available",
|
|
region="us-east",
|
|
performance_score=95.0,
|
|
last_updated=datetime.utcnow()
|
|
),
|
|
GPUInfo(
|
|
gpu_id="gpu_008",
|
|
provider_id="provider_8",
|
|
gpu_type="RTX 3090",
|
|
memory_gb=24,
|
|
cuda_cores=10496,
|
|
base_price_per_hour=0.15,
|
|
current_price_per_hour=0.18,
|
|
availability_status="busy",
|
|
region="us-west",
|
|
performance_score=98.0,
|
|
last_updated=datetime.utcnow()
|
|
)
|
|
]
|
|
|
|
marketplace_cache.cache.get.return_value = [gpu.__dict__ for gpu in gpus]
|
|
|
|
# Get market stats
|
|
stats = await marketplace_cache.get_market_stats()
|
|
|
|
assert isinstance(stats, MarketStats)
|
|
assert stats.total_gpus == 2
|
|
assert stats.available_gpus == 1
|
|
assert stats.busy_gpus == 1
|
|
assert stats.utilization_rate == 0.5
|
|
assert stats.average_price_per_hour == 0.12 # Average of available GPUs
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gpu_search(self, marketplace_cache):
|
|
"""Test GPU search functionality"""
|
|
# Mock GPU data
|
|
gpus = [
|
|
GPUInfo(
|
|
gpu_id="gpu_009",
|
|
provider_id="provider_9",
|
|
gpu_type="RTX 3080",
|
|
memory_gb=10,
|
|
cuda_cores=8704,
|
|
base_price_per_hour=0.1,
|
|
current_price_per_hour=0.1,
|
|
availability_status="available",
|
|
region="us-east",
|
|
performance_score=95.0,
|
|
last_updated=datetime.utcnow()
|
|
),
|
|
GPUInfo(
|
|
gpu_id="gpu_010",
|
|
provider_id="provider_10",
|
|
gpu_type="RTX 3090",
|
|
memory_gb=24,
|
|
cuda_cores=10496,
|
|
base_price_per_hour=0.15,
|
|
current_price_per_hour=0.15,
|
|
availability_status="available",
|
|
region="us-west",
|
|
performance_score=98.0,
|
|
last_updated=datetime.utcnow()
|
|
)
|
|
]
|
|
|
|
marketplace_cache.cache.get.return_value = [gpu.__dict__ for gpu in gpus]
|
|
|
|
# Search with criteria
|
|
results = await marketplace_cache.search_gpus(
|
|
min_memory=16,
|
|
max_price=0.2
|
|
)
|
|
|
|
# Should only return RTX 3090 (24GB memory, $0.15/hour)
|
|
assert len(results) == 1
|
|
assert results[0].gpu_type == "RTX 3090"
|
|
assert results[0].memory_gb == 24
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_top_performing_gpus(self, marketplace_cache):
|
|
"""Test getting top performing GPUs"""
|
|
# Mock GPU data with different performance scores
|
|
gpus = [
|
|
GPUInfo(
|
|
gpu_id="gpu_011",
|
|
provider_id="provider_11",
|
|
gpu_type="A100",
|
|
memory_gb=40,
|
|
cuda_cores=6912,
|
|
base_price_per_hour=0.5,
|
|
current_price_per_hour=0.5,
|
|
availability_status="available",
|
|
region="us-east",
|
|
performance_score=99.0,
|
|
last_updated=datetime.utcnow()
|
|
),
|
|
GPUInfo(
|
|
gpu_id="gpu_012",
|
|
provider_id="provider_12",
|
|
gpu_type="RTX 3080",
|
|
memory_gb=10,
|
|
cuda_cores=8704,
|
|
base_price_per_hour=0.1,
|
|
current_price_per_hour=0.1,
|
|
availability_status="available",
|
|
region="us-west",
|
|
performance_score=95.0,
|
|
last_updated=datetime.utcnow()
|
|
)
|
|
]
|
|
|
|
marketplace_cache.cache.get.return_value = [gpu.__dict__ for gpu in gpus]
|
|
|
|
# Get top performing GPUs
|
|
top_gpus = await marketplace_cache.get_top_performing_gpus(limit=2)
|
|
|
|
assert len(top_gpus) == 2
|
|
assert top_gpus[0].performance_score >= top_gpus[1].performance_score
|
|
assert top_gpus[0].gpu_type == "A100"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cheapest_gpus(self, marketplace_cache):
|
|
"""Test getting cheapest GPUs"""
|
|
# Mock GPU data with different prices
|
|
gpus = [
|
|
GPUInfo(
|
|
gpu_id="gpu_013",
|
|
provider_id="provider_13",
|
|
gpu_type="RTX 3060",
|
|
memory_gb=12,
|
|
cuda_cores=3584,
|
|
base_price_per_hour=0.05,
|
|
current_price_per_hour=0.05,
|
|
availability_status="available",
|
|
region="us-east",
|
|
performance_score=85.0,
|
|
last_updated=datetime.utcnow()
|
|
),
|
|
GPUInfo(
|
|
gpu_id="gpu_014",
|
|
provider_id="provider_14",
|
|
gpu_type="RTX 3080",
|
|
memory_gb=10,
|
|
cuda_cores=8704,
|
|
base_price_per_hour=0.1,
|
|
current_price_per_hour=0.1,
|
|
availability_status="available",
|
|
region="us-west",
|
|
performance_score=95.0,
|
|
last_updated=datetime.utcnow()
|
|
)
|
|
]
|
|
|
|
marketplace_cache.cache.get.return_value = [gpu.__dict__ for gpu in gpus]
|
|
|
|
# Get cheapest GPUs
|
|
cheapest_gpus = await marketplace_cache.get_cheapest_gpus(limit=2)
|
|
|
|
assert len(cheapest_gpus) == 2
|
|
assert cheapest_gpus[0].current_price_per_hour <= cheapest_gpus[1].current_price_per_hour
|
|
assert cheapest_gpus[0].gpu_type == "RTX 3060"
|
|
|
|
|
|
class TestCacheIntegration:
|
|
"""Test integration between cache components"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_marketplace_cache_initialization(self):
|
|
"""Test marketplace cache manager initialization"""
|
|
with patch('aitbc_cache.gpu_marketplace_cache.EventDrivenCacheManager') as mock_cache:
|
|
mock_manager = AsyncMock()
|
|
mock_cache.return_value = mock_manager
|
|
mock_manager.connect = AsyncMock()
|
|
|
|
# Initialize marketplace cache
|
|
manager = await init_marketplace_cache(
|
|
redis_url="redis://localhost:6379/2",
|
|
node_id="test_node",
|
|
region="test_region"
|
|
)
|
|
|
|
assert isinstance(manager, GPUMarketplaceCacheManager)
|
|
mock_cache.assert_called_once()
|
|
mock_manager.connect.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_global_marketplace_cache_access(self):
|
|
"""Test global marketplace cache access"""
|
|
# Mock the global cache
|
|
with patch('aitbc_cache.gpu_marketplace_cache.marketplace_cache') as mock_global:
|
|
mock_global.get = AsyncMock()
|
|
|
|
# Should work when initialized
|
|
result = await get_marketplace_cache()
|
|
assert result is not None
|
|
|
|
# Should raise error when not initialized
|
|
with patch('aitbc_cache.gpu_marketplace_cache.marketplace_cache', None):
|
|
with pytest.raises(RuntimeError, match="Marketplace cache not initialized"):
|
|
await get_marketplace_cache()
|
|
|
|
|
|
class TestCacheEventTypes:
|
|
"""Test different cache event types"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_all_event_types(self):
|
|
"""Test all supported cache event types"""
|
|
event_types = [
|
|
CacheEventType.GPU_AVAILABILITY_CHANGED,
|
|
CacheEventType.PRICING_UPDATED,
|
|
CacheEventType.BOOKING_CREATED,
|
|
CacheEventType.BOOKING_CANCELLED,
|
|
CacheEventType.PROVIDER_STATUS_CHANGED,
|
|
CacheEventType.MARKET_STATS_UPDATED,
|
|
CacheEventType.ORDER_BOOK_UPDATED,
|
|
CacheEventType.MANUAL_INVALIDATION
|
|
]
|
|
|
|
for event_type in event_types:
|
|
# Verify event type can be serialized
|
|
event = CacheEvent(
|
|
event_type=event_type,
|
|
resource_id="test_resource",
|
|
data={"test": "data"},
|
|
timestamp=time.time(),
|
|
source_node="test_node",
|
|
event_id="test_event",
|
|
affected_namespaces=["test_namespace"]
|
|
)
|
|
|
|
# Test JSON serialization
|
|
event_json = json.dumps(event.__dict__, default=str)
|
|
parsed_event = json.loads(event_json)
|
|
|
|
assert parsed_event['event_type'] == event_type.value
|
|
assert parsed_event['resource_id'] == "test_resource"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|