Files
aitbc/apps/blockchain-event-bridge/src/blockchain_event_bridge/polling/conditions.py
aitbc 136364298c Migrate blockchain-event-bridge app to centralized aitbc package utilities
- Migrate polling/batch.py and polling/conditions.py from logging to aitbc.get_logger
2026-04-25 07:20:45 +02:00

52 lines
1.6 KiB
Python

"""Condition-based polling for batch operations."""
import asyncio
from typing import Any, Dict
from aitbc import get_logger
logger = get_logger(__name__)
class ConditionPoller:
"""Polls for specific conditions that should trigger OpenClaw actions."""
def __init__(self, settings: Any) -> None:
self.settings = settings
self._running = False
async def run(self) -> None:
"""Run the condition poller."""
if not self.settings.enable_polling:
logger.info("Condition polling disabled")
return
self._running = True
logger.info("Starting condition poller...")
while self._running:
try:
await self._check_conditions()
await asyncio.sleep(self.settings.polling_interval_seconds)
except asyncio.CancelledError:
logger.info("Condition poller cancelled")
break
except Exception as e:
logger.error(f"Error in condition poller: {e}", exc_info=True)
await asyncio.sleep(5)
async def _check_conditions(self) -> None:
"""Check for conditions that should trigger actions."""
# Placeholder for Phase 3 implementation
# Examples:
# - Agent performance thresholds (SLA violations)
# - Marketplace capacity planning
# - Governance proposal voting deadlines
# - Cross-chain bridge status
pass
async def stop(self) -> None:
"""Stop the condition poller."""
self._running = False
logger.info("Condition poller stopped")