fix: replace datetime.UTC with timezone.utc for Python 3.13 compatibility
Some checks failed
Cross-Node Transaction Testing / transaction-test (push) Successful in 2s
Deploy to Testnet / deploy-testnet (push) Has been cancelled
Multi-Node Stress Testing / stress-test (push) Has been cancelled
Node Failover Simulation / failover-test (push) Failing after 13s

This commit is contained in:
aitbc
2026-05-06 07:58:33 +02:00
parent 68d69b4c1d
commit 274dd81c3d
7 changed files with 30 additions and 30 deletions

View File

@@ -4,7 +4,7 @@ Provides standard response formatters, pagination helpers, error response builde
"""
from typing import Any, Optional, List, Dict, Union
from datetime import datetime, UTC
from datetime import datetime, timezone
from fastapi import HTTPException, status
from pydantic import BaseModel
@@ -19,7 +19,7 @@ class APIResponse(BaseModel):
def __init__(self, **data):
if 'timestamp' not in data:
data['timestamp'] = datetime.now(datetime.UTC).isoformat()
data['timestamp'] = datetime.now(timezone.utc).isoformat()
super().__init__(**data)
@@ -33,7 +33,7 @@ class PaginatedResponse(BaseModel):
def __init__(self, **data):
if 'timestamp' not in data:
data['timestamp'] = datetime.now(datetime.UTC).isoformat()
data['timestamp'] = datetime.now(timezone.utc).isoformat()
super().__init__(**data)
@@ -318,5 +318,5 @@ def build_request_metadata(request) -> Dict[str, str]:
"client_ip": get_client_ip(request),
"user_agent": get_user_agent(request),
"request_id": request.headers.get("X-Request-ID", "unknown"),
"timestamp": datetime.now(datetime.UTC).isoformat()
"timestamp": datetime.now(timezone.utc).isoformat()
}

View File

@@ -5,7 +5,7 @@ Provides toggle between mock and real data sources for development/testing
import os
from typing import Any, Dict, List, Optional
from datetime import datetime, UTC
from datetime import datetime, timezone
import httpx
@@ -119,7 +119,7 @@ class MockDataGenerator:
"hash": MockFactory.generate_hash(),
"validator": validator or MockFactory.generate_ethereum_address(),
"tx_count": min_tx or 5,
"timestamp": datetime.now(datetime.UTC).isoformat()
"timestamp": datetime.now(timezone.utc).isoformat()
})
return blocks

View File

@@ -6,7 +6,7 @@ Provides event bus implementation, pub/sub patterns, and event decorators
import asyncio
from typing import Any, Callable, Dict, List, Optional, TypeVar, Generic
from dataclasses import dataclass
from datetime import datetime, UTC
from datetime import datetime, timezone
from enum import Enum
import inspect
import functools
@@ -34,7 +34,7 @@ class Event:
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now(datetime.UTC)
self.timestamp = datetime.now(timezone.utc)
class EventBus:
@@ -199,7 +199,7 @@ class EventAggregator:
def add_event(self, event: Event) -> None:
"""Add event to aggregation"""
key = event.event_type
now = datetime.now(datetime.UTC)
now = datetime.now(timezone.utc)
if key not in self.aggregated_events:
self.aggregated_events[key] = {
@@ -223,7 +223,7 @@ class EventAggregator:
def get_aggregated_events(self) -> Dict[str, Dict[str, Any]]:
"""Get aggregated events"""
# Remove old events
now = datetime.now(datetime.UTC)
now = datetime.now(timezone.utc)
cutoff = now.timestamp() - self.window_seconds
to_remove = []

View File

@@ -8,7 +8,7 @@ import heapq
import time
from typing import Any, Callable, Dict, List, Optional, TypeVar
from dataclasses import dataclass, field
from datetime import datetime, UTC, timedelta
from datetime import datetime, timezone, timedelta
from enum import Enum
import uuid
@@ -42,7 +42,7 @@ class Job:
args: tuple = field(default_factory=tuple, compare=False)
kwargs: dict = field(default_factory=dict, compare=False)
status: JobStatus = field(default=JobStatus.PENDING, compare=False)
created_at: datetime = field(default_factory=datetime.now(datetime.UTC), compare=False)
created_at: datetime = field(default_factory=datetime.now(timezone.utc), compare=False)
started_at: Optional[datetime] = field(default=None, compare=False)
completed_at: Optional[datetime] = field(default=None, compare=False)
result: Any = field(default=None, compare=False)
@@ -244,7 +244,7 @@ class BackgroundTaskManager:
async with self.semaphore:
try:
self.task_info[task_id]["status"] = "running"
self.task_info[task_id]["started_at"] = datetime.now(datetime.UTC)
self.task_info[task_id]["started_at"] = datetime.now(timezone.utc)
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
@@ -253,18 +253,18 @@ class BackgroundTaskManager:
self.task_info[task_id]["status"] = "completed"
self.task_info[task_id]["result"] = result
self.task_info[task_id]["completed_at"] = datetime.now(datetime.UTC)
self.task_info[task_id]["completed_at"] = datetime.now(timezone.utc)
except Exception as e:
self.task_info[task_id]["status"] = "failed"
self.task_info[task_id]["error"] = str(e)
self.task_info[task_id]["completed_at"] = datetime.now(datetime.UTC)
self.task_info[task_id]["completed_at"] = datetime.now(timezone.utc)
finally:
if task_id in self.tasks:
del self.tasks[task_id]
self.task_info[task_id] = {
"status": "pending",
"created_at": datetime.now(datetime.UTC),
"created_at": datetime.now(timezone.utc),
"started_at": None,
"completed_at": None,
"result": None,
@@ -286,7 +286,7 @@ class BackgroundTaskManager:
pass
self.task_info[task_id]["status"] = "cancelled"
self.task_info[task_id]["completed_at"] = datetime.now(datetime.UTC)
self.task_info[task_id]["completed_at"] = datetime.now(timezone.utc)
del self.tasks[task_id]
return True
return False

View File

@@ -9,7 +9,7 @@ import hashlib
import time
import json
from typing import Optional, Dict, Any
from datetime import datetime, UTC, timedelta
from datetime import datetime, timezone, timedelta
from cryptography.fernet import Fernet
@@ -118,7 +118,7 @@ class APIKeyManager:
"user_id": user_id,
"scopes": scopes or ["read"],
"name": name,
"created_at": datetime.now(datetime.UTC).isoformat(),
"created_at": datetime.now(timezone.utc).isoformat(),
"last_used": None
}
@@ -134,7 +134,7 @@ class APIKeyManager:
return None
# Update last used
key_data["last_used"] = datetime.now(datetime.UTC).isoformat()
key_data["last_used"] = datetime.now(timezone.utc).isoformat()
if self.storage_path:
self._save_keys()

View File

@@ -7,7 +7,7 @@ import json
import os
from typing import Any, Callable, Dict, Optional, TypeVar, Generic, List
from dataclasses import dataclass, field
from datetime import datetime, UTC
from datetime import datetime, timezone
from enum import Enum
from abc import ABC, abstractmethod
import asyncio
@@ -31,7 +31,7 @@ class StateTransition:
"""Record of a state transition"""
from_state: str
to_state: str
timestamp: datetime = field(default_factory=datetime.now(datetime.UTC))
timestamp: datetime = field(default_factory=datetime.now(timezone.utc))
data: Dict[str, Any] = field(default_factory=dict)
@@ -304,7 +304,7 @@ class StateSnapshot:
self.current_state = state_machine.current_state
self.state_data = state_machine.state_data.copy()
self.transitions = state_machine.transitions.copy()
self.timestamp = datetime.now(datetime.UTC)
self.timestamp = datetime.now(timezone.utc)
def restore(self, state_machine: StateMachine) -> None:
"""Restore state machine from snapshot"""

View File

@@ -6,7 +6,7 @@ Provides mock factories, test data generators, and test helpers
import secrets
import json
from typing import Any, Dict, List, Optional, Type, TypeVar, Callable
from datetime import datetime, UTC, timedelta
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass, field
from decimal import Decimal
import uuid
@@ -72,8 +72,8 @@ class TestDataGenerator:
"username": MockFactory.generate_string(8),
"first_name": MockFactory.generate_string(6),
"last_name": MockFactory.generate_string(6),
"created_at": datetime.now(datetime.UTC).isoformat(),
"updated_at": datetime.now(datetime.UTC).isoformat(),
"created_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
"is_active": True,
"role": "user"
}
@@ -91,7 +91,7 @@ class TestDataGenerator:
"gas_price": str(secrets.randbelow(100000000000)),
"gas_limit": secrets.randbelow(100000),
"nonce": secrets.randbelow(1000),
"timestamp": datetime.now(datetime.UTC).isoformat(),
"timestamp": datetime.now(timezone.utc).isoformat(),
"status": "pending"
}
data.update(overrides)
@@ -104,7 +104,7 @@ class TestDataGenerator:
"number": secrets.randbelow(10000000),
"hash": MockFactory.generate_hash(),
"parent_hash": MockFactory.generate_hash(),
"timestamp": datetime.now(datetime.UTC).isoformat(),
"timestamp": datetime.now(timezone.utc).isoformat(),
"transactions": [],
"gas_used": str(secrets.randbelow(10000000)),
"gas_limit": str(15000000),
@@ -122,7 +122,7 @@ class TestDataGenerator:
"user_id": MockFactory.generate_uuid(),
"name": MockFactory.generate_string(10),
"scopes": ["read", "write"],
"created_at": datetime.now(datetime.UTC).isoformat(),
"created_at": datetime.now(timezone.utc).isoformat(),
"last_used": None,
"is_active": True
}
@@ -137,7 +137,7 @@ class TestDataGenerator:
"address": MockFactory.generate_ethereum_address(),
"chain_id": 1,
"balance": str(secrets.randbelow(1000000000000000000)),
"created_at": datetime.now(datetime.UTC).isoformat(),
"created_at": datetime.now(timezone.utc).isoformat(),
"is_active": True
}
data.update(overrides)