fix: replace deprecated datetime.utcnow() with datetime.now(datetime.UTC)

- Replace all 2,087 uses of datetime.utcnow() across 294 files
- Add UTC import to datetime statements where needed
- Addresses Python 3.12+ deprecation warning (report item #3)
This commit is contained in:
aitbc
2026-04-30 08:36:55 +02:00
parent 4d76bf4d97
commit 5f03ded7ff
294 changed files with 1997 additions and 1997 deletions

View File

@@ -11,7 +11,7 @@ import logging
import subprocess
import sys
import time
from datetime import datetime, timedelta
from datetime import datetime, UTC, timedelta
from pathlib import Path
from typing import Dict, List, Optional
@@ -108,7 +108,7 @@ class ChaosOrchestrator:
def generate_report(self, output_file: Optional[str] = None):
"""Generate a comprehensive chaos test report"""
report = {
"report_generated": datetime.utcnow().isoformat(),
"report_generated": datetime.now(datetime.UTC).isoformat(),
"namespace": self.namespace,
"orchestration": self.results,
"recommendations": []
@@ -204,7 +204,7 @@ class ChaosOrchestrator:
async def run_all_scenarios(self, scenarios: List[str], scenario_args: Dict[str, List[str]]):
"""Run all specified chaos test scenarios"""
logger.info("Starting chaos testing orchestration")
self.results["orchestration_start"] = datetime.utcnow().isoformat()
self.results["orchestration_start"] = datetime.now(datetime.UTC).isoformat()
for scenario in scenarios:
args = scenario_args.get(scenario, [])
@@ -215,7 +215,7 @@ class ChaosOrchestrator:
if result:
self.results["scenarios"].append(result)
self.results["orchestration_end"] = datetime.utcnow().isoformat()
self.results["orchestration_end"] = datetime.now(datetime.UTC).isoformat()
# Calculate summary metrics
self.calculate_summary_metrics()

View File

@@ -12,7 +12,7 @@ import time
import logging
import subprocess
import sys
from datetime import datetime
from datetime import datetime, UTC
from typing import Dict, List, Optional
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -192,7 +192,7 @@ class ChaosTestCoordinator:
async def run_test(self, outage_duration: int = 60, load_duration: int = 120):
"""Run the complete chaos test"""
logger.info("Starting coordinator outage chaos test")
self.metrics["test_start"] = datetime.utcnow().isoformat()
self.metrics["test_start"] = datetime.now(datetime.UTC).isoformat()
# Phase 1: Generate initial load
logger.info("Phase 1: Generating initial load")
@@ -200,7 +200,7 @@ class ChaosTestCoordinator:
# Phase 2: Induce outage
logger.info("Phase 2: Inducing coordinator outage")
self.metrics["outage_start"] = datetime.utcnow().isoformat()
self.metrics["outage_start"] = datetime.now(datetime.UTC).isoformat()
if not self.delete_coordinator_pods():
logger.error("Failed to induce outage")
@@ -216,7 +216,7 @@ class ChaosTestCoordinator:
# Phase 3: Monitor recovery
logger.info("Phase 3: Monitoring service recovery")
self.metrics["outage_end"] = datetime.utcnow().isoformat()
self.metrics["outage_end"] = datetime.now(datetime.UTC).isoformat()
if not await self.wait_for_recovery():
logger.error("Service did not recover")
@@ -227,7 +227,7 @@ class ChaosTestCoordinator:
await self.generate_load(load_duration)
# Calculate metrics
self.metrics["test_end"] = datetime.utcnow().isoformat()
self.metrics["test_end"] = datetime.now(datetime.UTC).isoformat()
self.metrics["mttr"] = self.metrics["recovery_time"]
# Save results

View File

@@ -12,7 +12,7 @@ import time
import logging
import subprocess
import sys
from datetime import datetime
from datetime import datetime, UTC
from typing import Dict, List, Optional
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -263,7 +263,7 @@ class ChaosTestDatabase:
async def run_test(self, failure_type: str = "connection", failure_duration: int = 60):
"""Run the complete database chaos test"""
logger.info(f"Starting database chaos test - failure type: {failure_type}")
self.metrics["test_start"] = datetime.utcnow().isoformat()
self.metrics["test_start"] = datetime.now(datetime.UTC).isoformat()
# Phase 1: Baseline test
logger.info("Phase 1: Baseline connectivity test")
@@ -282,7 +282,7 @@ class ChaosTestDatabase:
# Phase 3: Induce database failure
logger.info("Phase 3: Inducing database failure")
self.metrics["failure_start"] = datetime.utcnow().isoformat()
self.metrics["failure_start"] = datetime.now(datetime.UTC).isoformat()
if failure_type == "connection":
if not self.simulate_database_connection_failure():
@@ -311,7 +311,7 @@ class ChaosTestDatabase:
# Phase 5: Restore database and monitor recovery
logger.info("Phase 5: Restoring database")
self.metrics["failure_end"] = datetime.utcnow().isoformat()
self.metrics["failure_end"] = datetime.now(datetime.UTC).isoformat()
if not self.restore_database():
logger.error("Failed to restore database")
@@ -327,7 +327,7 @@ class ChaosTestDatabase:
await self.generate_load(60)
# Final metrics
self.metrics["test_end"] = datetime.utcnow().isoformat()
self.metrics["test_end"] = datetime.now(datetime.UTC).isoformat()
self.metrics["mttr"] = self.metrics["recovery_time"]
# Save results

View File

@@ -12,7 +12,7 @@ import time
import logging
import subprocess
import sys
from datetime import datetime
from datetime import datetime, UTC
from typing import Dict, List, Optional
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -244,7 +244,7 @@ class ChaosTestNetwork:
async def run_test(self, partition_duration: int = 60, partition_ratio: float = 0.5):
"""Run the complete network partition chaos test"""
logger.info("Starting network partition chaos test")
self.metrics["test_start"] = datetime.utcnow().isoformat()
self.metrics["test_start"] = datetime.now(datetime.UTC).isoformat()
# Get all blockchain pods
all_pods = self.get_blockchain_pods()
@@ -270,7 +270,7 @@ class ChaosTestNetwork:
# Phase 3: Apply network partition
logger.info("Phase 3: Applying network partition")
self.metrics["partition_start"] = datetime.utcnow().isoformat()
self.metrics["partition_start"] = datetime.now(datetime.UTC).isoformat()
if not self.apply_network_partition(remaining_pods, partition_pods):
logger.error("Failed to apply network partition")
@@ -287,7 +287,7 @@ class ChaosTestNetwork:
# Phase 5: Remove partition and monitor recovery
logger.info("Phase 5: Removing network partition")
self.metrics["partition_end"] = datetime.utcnow().isoformat()
self.metrics["partition_end"] = datetime.now(datetime.UTC).isoformat()
if not self.remove_network_partition(all_pods):
logger.error("Failed to remove network partition")
@@ -312,7 +312,7 @@ class ChaosTestNetwork:
await self.generate_load(60)
# Final metrics
self.metrics["test_end"] = datetime.utcnow().isoformat()
self.metrics["test_end"] = datetime.now(datetime.UTC).isoformat()
self.metrics["mttr"] = self.metrics["recovery_time"]
# Save results