""" Unified data transformer class. This module provides a unified transformer implementation that can be used across different exchanges with consistent field mappings. """ from typing import Dict, Any, Optional, List import logging from ..data_types import StandardizedTrade from .base import BaseDataTransformer class UnifiedDataTransformer(BaseDataTransformer): """ Unified transformer for consistent data transformation across exchanges. This class provides a standardized way to transform data by using consistent field mappings across different exchanges. """ def __init__( self, base_transformer: BaseDataTransformer, component_name: str = "unified_transformer", logger: Optional[logging.Logger] = None ): """ Initialize unified transformer. Args: base_transformer: Base transformer instance to wrap component_name: Component name for logging logger: Optional logger instance """ super().__init__( exchange=base_transformer.exchange, component_name=component_name, logger=logger or base_transformer.logger ) self.base_transformer = base_transformer def transform_trade_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[StandardizedTrade]: """ Transform raw trade data using base transformer. Args: raw_data: Raw trade data dictionary symbol: Trading symbol Returns: StandardizedTrade object or None if transformation fails """ try: return self.base_transformer.transform_trade_data(raw_data, symbol) except Exception as e: self._log_error(f"Failed to transform trade data", e) return None def transform_orderbook_data(self, raw_data: Dict[str, Any], symbol: str) -> Dict[str, Any]: """ Transform orderbook data using base transformer. Args: raw_data: Raw orderbook data dictionary symbol: Trading symbol Returns: Transformed orderbook data """ try: return self.base_transformer.transform_orderbook_data(raw_data, symbol) except Exception as e: self._log_error(f"Failed to transform orderbook data", e) return {} def transform_ticker_data(self, raw_data: Dict[str, Any], symbol: str) -> Dict[str, Any]: """ Transform ticker data using base transformer. Args: raw_data: Raw ticker data dictionary symbol: Trading symbol Returns: Transformed ticker data """ try: return self.base_transformer.transform_ticker_data(raw_data, symbol) except Exception as e: self._log_error(f"Failed to transform ticker data", e) return {} def batch_transform_trades( self, raw_trades: List[Dict[str, Any]], symbol: str, field_mapping: Optional[Dict[str, str]] = None ) -> List[StandardizedTrade]: """ Transform a batch of raw trades. Args: raw_trades: List of raw trade dictionaries symbol: Trading symbol field_mapping: Optional field mapping for raw data Returns: List of StandardizedTrade objects """ try: return [ self.transform_trade_data(raw_trade, symbol) for raw_trade in raw_trades if raw_trade is not None ] except Exception as e: self._log_error(f"Failed to batch transform trades", e) return [] def get_transformer_info(self) -> Dict[str, Any]: """Get transformer information.""" base_info = self.base_transformer.get_transformer_info() return { "exchange": base_info["exchange"], "component": base_info["component"], "unified_component": self.component_name, "batch_processing": True, "candle_aggregation": True, "capabilities": { **base_info["capabilities"], "unified_transformation": True, "candle_aggregation": True } }