""" Module for managing data callbacks and notifications for data collectors. This module encapsulates the logic for registering, removing, and notifying callback functions when new market data points are received, promoting a clean separation of concerns within the data collector architecture. """ import asyncio from typing import Dict, List, Optional, Any, Callable from data.common.data_types import DataType, MarketDataPoint class CallbackDispatcher: """ Manages the dispatching of market data points to registered callbacks. """ def __init__(self, component_name: str, logger=None): self.component_name = component_name self.logger = logger self._data_callbacks: Dict[DataType, List[Callable]] = { data_type: [] for data_type in DataType } def _log_debug(self, message: str) -> None: if self.logger: self.logger.debug(f"{self.component_name}: {message}") def _log_info(self, message: str) -> None: if self.logger: self.logger.info(f"{self.component_name}: {message}") def _log_warning(self, message: str) -> None: if self.logger: self.logger.warning(f"{self.component_name}: {message}") def _log_error(self, message: str, exc_info: bool = False) -> None: if self.logger: self.logger.error(f"{self.component_name}: {message}", exc_info=exc_info) def add_data_callback(self, data_type: DataType, callback: Callable[[MarketDataPoint], None]) -> None: """ Add a callback function for specific data type. Args: data_type: Type of data to monitor callback: Function to call when data is received """ if callback not in self._data_callbacks[data_type]: self._data_callbacks[data_type].append(callback) self._log_debug(f"Added callback for {data_type.value} data") def remove_data_callback(self, data_type: DataType, callback: Callable[[MarketDataPoint], None]) -> None: """ Remove a callback function for specific data type. Args: data_type: Type of data to stop monitoring callback: Function to remove """ if callback in self._data_callbacks[data_type]: self._data_callbacks[data_type].remove(callback) self._log_debug(f"Removed callback for {data_type.value} data") async def notify_callbacks(self, data_point: MarketDataPoint) -> None: """ Notify all registered callbacks for a data point. Args: data_point: Market data to distribute """ callbacks = self._data_callbacks.get(data_point.data_type, []) for callback in callbacks: try: # Handle both sync and async callbacks if asyncio.iscoroutinefunction(callback): await callback(data_point) else: callback(data_point) except Exception as e: self._log_error(f"Error in data callback for {data_point.data_type.value} {data_point.symbol}: {e}", exc_info=True)