136 lines
4.4 KiB
Python
Raw Permalink Normal View History

"""
Unified data transformer class.
This module provides a unified transformer implementation that can be used
across different exchanges with consistent field mappings.
"""
from typing import Dict, Any, Optional, List
import logging
from ..data_types import StandardizedTrade
from .base import BaseDataTransformer
class UnifiedDataTransformer(BaseDataTransformer):
"""
Unified transformer for consistent data transformation across exchanges.
This class provides a standardized way to transform data by using
consistent field mappings across different exchanges.
"""
def __init__(
self,
base_transformer: BaseDataTransformer,
component_name: str = "unified_transformer",
logger: Optional[logging.Logger] = None
):
"""
Initialize unified transformer.
Args:
base_transformer: Base transformer instance to wrap
component_name: Component name for logging
logger: Optional logger instance
"""
super().__init__(
exchange=base_transformer.exchange,
component_name=component_name,
logger=logger or base_transformer.logger
)
self.base_transformer = base_transformer
def transform_trade_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[StandardizedTrade]:
"""
Transform raw trade data using base transformer.
Args:
raw_data: Raw trade data dictionary
symbol: Trading symbol
Returns:
StandardizedTrade object or None if transformation fails
"""
try:
return self.base_transformer.transform_trade_data(raw_data, symbol)
except Exception as e:
self._log_error(f"Failed to transform trade data", e)
return None
def transform_orderbook_data(self, raw_data: Dict[str, Any], symbol: str) -> Dict[str, Any]:
"""
Transform orderbook data using base transformer.
Args:
raw_data: Raw orderbook data dictionary
symbol: Trading symbol
Returns:
Transformed orderbook data
"""
try:
return self.base_transformer.transform_orderbook_data(raw_data, symbol)
except Exception as e:
self._log_error(f"Failed to transform orderbook data", e)
return {}
def transform_ticker_data(self, raw_data: Dict[str, Any], symbol: str) -> Dict[str, Any]:
"""
Transform ticker data using base transformer.
Args:
raw_data: Raw ticker data dictionary
symbol: Trading symbol
Returns:
Transformed ticker data
"""
try:
return self.base_transformer.transform_ticker_data(raw_data, symbol)
except Exception as e:
self._log_error(f"Failed to transform ticker data", e)
return {}
def batch_transform_trades(
self,
raw_trades: List[Dict[str, Any]],
symbol: str,
field_mapping: Optional[Dict[str, str]] = None
) -> List[StandardizedTrade]:
"""
Transform a batch of raw trades.
Args:
raw_trades: List of raw trade dictionaries
symbol: Trading symbol
field_mapping: Optional field mapping for raw data
Returns:
List of StandardizedTrade objects
"""
try:
return [
self.transform_trade_data(raw_trade, symbol)
for raw_trade in raw_trades
if raw_trade is not None
]
except Exception as e:
self._log_error(f"Failed to batch transform trades", e)
return []
def get_transformer_info(self) -> Dict[str, Any]:
"""Get transformer information."""
base_info = self.base_transformer.get_transformer_info()
return {
"exchange": base_info["exchange"],
"component": base_info["component"],
"unified_component": self.component_name,
"batch_processing": True,
"candle_aggregation": True,
"capabilities": {
**base_info["capabilities"],
"unified_transformation": True,
"candle_aggregation": True
}
}