Files
aitbc/aitbc/alerting.py
aitbc a266b3b70e ci: replace artifact upload with Gitea release API and add structured logging with rate limiting
- Replaced actions/upload-artifact with Gitea API release creation in build-miner-binary.yml
- Added separate steps for uploading binary, package, and checksums to Gitea release
- Added StructuredFormatter class for JSON log output in aitbc_logging.py
- Added structured logging support with log_context() context manager and LogContext class
- Added structured parameter to setup_logger() and configure_logging()
2026-05-12 21:33:20 +02:00

415 lines
12 KiB
Python

"""
AITBC Alerting Module
Alerting and notification system for AITBC applications
"""
import asyncio
from typing import Callable, Dict, Any, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
import json
from .aitbc_logging import get_logger
logger = get_logger(__name__)
class AlertSeverity(Enum):
"""Alert severity levels"""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class AlertStatus(Enum):
"""Alert status"""
ACTIVE = "active"
ACKNOWLEDGED = "acknowledged"
RESOLVED = "resolved"
@dataclass
class Alert:
"""Alert data structure"""
id: str
severity: AlertSeverity
title: str
message: str
source: str
timestamp: datetime = field(default_factory=datetime.utcnow)
status: AlertStatus = AlertStatus.ACTIVE
metadata: Dict[str, Any] = field(default_factory=dict)
acknowledged_by: Optional[str] = None
acknowledged_at: Optional[datetime] = None
resolved_at: Optional[datetime] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert alert to dictionary"""
return {
"id": self.id,
"severity": self.severity.value,
"title": self.title,
"message": self.message,
"source": self.source,
"timestamp": self.timestamp.isoformat(),
"status": self.status.value,
"metadata": self.metadata,
"acknowledged_by": self.acknowledged_by,
"acknowledged_at": self.acknowledged_at.isoformat() if self.acknowledged_at else None,
"resolved_at": self.resolved_at.isoformat() if self.resolved_at else None,
}
class AlertChannel:
"""Base class for alert channels"""
async def send(self, alert: Alert) -> bool:
"""
Send alert through this channel
Args:
alert: Alert to send
Returns:
True if sent successfully, False otherwise
"""
raise NotImplementedError
class LogAlertChannel(AlertChannel):
"""Log-based alert channel"""
async def send(self, alert: Alert) -> bool:
"""Send alert to logs"""
try:
log_level = {
AlertSeverity.INFO: logger.info,
AlertSeverity.WARNING: logger.warning,
AlertSeverity.ERROR: logger.error,
AlertSeverity.CRITICAL: logger.critical,
}.get(alert.severity, logger.info)
log_level(
f"Alert [{alert.severity.value.upper()}]: {alert.title}",
extra={
"alert_id": alert.id,
"severity": alert.severity.value,
"source": alert.source,
"metadata": alert.metadata,
}
)
return True
except Exception as e:
logger.error(f"Failed to send log alert: {e}")
return False
class WebhookAlertChannel(AlertChannel):
"""Webhook-based alert channel"""
def __init__(self, url: str, headers: Optional[Dict[str, str]] = None):
"""
Initialize webhook channel
Args:
url: Webhook URL
headers: HTTP headers
"""
self.url = url
self.headers = headers or {}
async def send(self, alert: Alert) -> bool:
"""Send alert via webhook"""
try:
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
self.url,
json=alert.to_dict(),
headers=self.headers,
timeout=10.0
)
response.raise_for_status()
return True
except Exception as e:
logger.error(f"Failed to send webhook alert: {e}")
return False
class AlertRule:
"""Alert rule definition"""
def __init__(
self,
name: str,
condition: Callable[[], bool],
severity: AlertSeverity,
title_template: str,
message_template: str,
source: str,
check_interval: int = 60,
cooldown: int = 300,
metadata: Optional[Dict[str, Any]] = None
):
"""
Initialize alert rule
Args:
name: Rule name
condition: Function that returns True if alert should fire
severity: Alert severity
title_template: Template for alert title
message_template: Template for alert message
source: Alert source
check_interval: Check interval in seconds
cooldown: Cooldown period in seconds
metadata: Additional metadata
"""
self.name = name
self.condition = condition
self.severity = severity
self.title_template = title_template
self.message_template = message_template
self.source = source
self.check_interval = check_interval
self.cooldown = cooldown
self.metadata = metadata or {}
self.last_fired: Optional[datetime] = None
self.enabled = True
def should_fire(self) -> bool:
"""Check if alert should fire"""
if not self.enabled:
return False
if self.last_fired:
time_since_last = (datetime.utcnow() - self.last_fired).total_seconds()
if time_since_last < self.cooldown:
return False
return self.condition()
def fire(self) -> Alert:
"""Create alert from this rule"""
self.last_fired = datetime.utcnow()
return Alert(
id=f"{self.name}-{int(datetime.utcnow().timestamp())}",
severity=self.severity,
title=self.title_template,
message=self.message_template,
source=self.source,
metadata=self.metadata
)
class AlertManager:
"""Alert manager for handling alerts and rules"""
def __init__(self):
"""Initialize alert manager"""
self.rules: Dict[str, AlertRule] = {}
self.channels: List[AlertChannel] = []
self.active_alerts: Dict[str, Alert] = {}
self.alert_history: List[Alert] = []
self._running = False
self._task: Optional[asyncio.Task] = None
def add_rule(self, rule: AlertRule) -> None:
"""
Add alert rule
Args:
rule: Alert rule to add
"""
self.rules[rule.name] = rule
logger.info(f"Added alert rule: {rule.name}")
def remove_rule(self, name: str) -> None:
"""
Remove alert rule
Args:
name: Rule name
"""
if name in self.rules:
del self.rules[name]
logger.info(f"Removed alert rule: {name}")
def add_channel(self, channel: AlertChannel) -> None:
"""
Add alert channel
Args:
channel: Alert channel to add
"""
self.channels.append(channel)
logger.info(f"Added alert channel: {channel.__class__.__name__}")
async def check_rules(self) -> None:
"""Check all alert rules and fire if needed"""
for rule in self.rules.values():
try:
if rule.should_fire():
alert = rule.fire()
await self.send_alert(alert)
except Exception as e:
logger.error(f"Error checking rule {rule.name}: {e}")
async def send_alert(self, alert: Alert) -> None:
"""
Send alert through all channels
Args:
alert: Alert to send
"""
self.active_alerts[alert.id] = alert
self.alert_history.append(alert)
# Keep history limited
if len(self.alert_history) > 1000:
self.alert_history = self.alert_history[-1000:]
# Send through all channels
for channel in self.channels:
try:
await channel.send(alert)
except Exception as e:
logger.error(f"Failed to send alert through channel: {e}")
async def acknowledge_alert(self, alert_id: str, acknowledged_by: str) -> bool:
"""
Acknowledge an alert
Args:
alert_id: Alert ID
acknowledged_by: User acknowledging the alert
Returns:
True if acknowledged successfully
"""
if alert_id in self.active_alerts:
alert = self.active_alerts[alert_id]
alert.status = AlertStatus.ACKNOWLEDGED
alert.acknowledged_by = acknowledged_by
alert.acknowledged_at = datetime.utcnow()
logger.info(f"Alert acknowledged: {alert_id} by {acknowledged_by}")
return True
return False
async def resolve_alert(self, alert_id: str) -> bool:
"""
Resolve an alert
Args:
alert_id: Alert ID
Returns:
True if resolved successfully
"""
if alert_id in self.active_alerts:
alert = self.active_alerts[alert_id]
alert.status = AlertStatus.RESOLVED
alert.resolved_at = datetime.utcnow()
del self.active_alerts[alert_id]
logger.info(f"Alert resolved: {alert_id}")
return True
return False
def get_active_alerts(self) -> List[Alert]:
"""Get all active alerts"""
return list(self.active_alerts.values())
def get_alert_history(self, limit: int = 100) -> List[Alert]:
"""
Get alert history
Args:
limit: Maximum number of alerts to return
Returns:
List of alerts
"""
return self.alert_history[-limit:]
async def start(self) -> None:
"""Start alert manager background task"""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._run_checks())
logger.info("Alert manager started")
async def stop(self) -> None:
"""Stop alert manager background task"""
if not self._running:
return
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Alert manager stopped")
async def _run_checks(self) -> None:
"""Background task to check alert rules"""
while self._running:
try:
await self.check_rules()
# Calculate sleep time based on minimum check interval
min_interval = min((rule.check_interval for rule in self.rules.values()), default=60)
await asyncio.sleep(min_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in alert check loop: {e}")
await asyncio.sleep(60)
# Global alert manager instance
_alert_manager: Optional[AlertManager] = None
def get_alert_manager() -> AlertManager:
"""
Get global alert manager instance
Returns:
Alert manager instance
"""
global _alert_manager
if _alert_manager is None:
_alert_manager = AlertManager()
# Add default log channel
_alert_manager.add_channel(LogAlertChannel())
return _alert_manager
def setup_alerting(
webhook_url: Optional[str] = None,
webhook_headers: Optional[Dict[str, str]] = None
) -> AlertManager:
"""
Setup alerting system
Args:
webhook_url: Optional webhook URL for alerts
webhook_headers: Optional webhook headers
Returns:
Alert manager instance
"""
manager = get_alert_manager()
if webhook_url:
manager.add_channel(WebhookAlertChannel(webhook_url, webhook_headers))
return manager