228 lines
6.4 KiB
Python
Raw Permalink Normal View History

"""
Base data transformer class.
This module provides the base class for all data transformers
with common functionality and interface definitions.
"""
import logging
from typing import Dict, Any, Optional, List
from datetime import datetime
from decimal import Decimal
from ..data_types import StandardizedTrade
from .trade import create_standardized_trade, batch_create_standardized_trades
from .time_utils import timestamp_to_datetime
from .numeric_utils import safe_decimal_conversion
from .normalization import normalize_trade_side, validate_symbol_format
class BaseDataTransformer:
"""Base class for all data transformers."""
def __init__(
self,
exchange: str,
component_name: str = "base_transformer",
logger: Optional[logging.Logger] = None
):
"""
Initialize base transformer.
Args:
exchange: Exchange name
component_name: Component name for logging
logger: Optional logger instance
"""
self.exchange = exchange
self.component_name = component_name
self.logger = logger or logging.getLogger(component_name)
def timestamp_to_datetime(
self,
timestamp: Any,
is_milliseconds: bool = True
) -> datetime:
"""Convert timestamp to datetime."""
return timestamp_to_datetime(
timestamp,
is_milliseconds,
logger=self.logger,
component_name=self.component_name
)
def safe_decimal_conversion(
self,
value: Any,
field_name: str = "value"
) -> Optional[Decimal]:
"""Convert value to Decimal safely."""
return safe_decimal_conversion(
value,
field_name,
logger=self.logger,
component_name=self.component_name
)
def normalize_trade_side(
self,
side: str
) -> str:
"""Normalize trade side."""
try:
return normalize_trade_side(
side,
logger=self.logger,
component_name=self.component_name
)
except ValueError as e:
self.logger.warning(
f"{self.component_name}: Unknown trade side: {side}, defaulting to 'buy'"
)
return 'buy'
def validate_symbol_format(
self,
symbol: str
) -> str:
"""Validate symbol format."""
return validate_symbol_format(
symbol,
logger=self.logger,
component_name=self.component_name
)
def get_transformer_info(self) -> Dict[str, Any]:
"""Get transformer information."""
return {
"exchange": self.exchange,
"component": self.component_name,
"capabilities": {
"trade_transformation": True,
"orderbook_transformation": True,
"ticker_transformation": True,
"batch_processing": True
}
}
def transform_trade_data(
self,
raw_data: Dict[str, Any],
symbol: str
) -> StandardizedTrade:
"""
Transform raw trade data to standardized format.
Args:
raw_data: Raw trade data
symbol: Trading symbol
Returns:
StandardizedTrade object
Raises:
ValueError: If data is invalid
"""
raise NotImplementedError("Subclasses must implement transform_trade_data")
def transform_orderbook_data(
self,
raw_data: Dict[str, Any],
symbol: str
) -> Dict[str, Any]:
"""
Transform raw orderbook data to standardized format.
Args:
raw_data: Raw orderbook data
symbol: Trading symbol
Returns:
Standardized orderbook data
Raises:
ValueError: If data is invalid
"""
raise NotImplementedError("Subclasses must implement transform_orderbook_data")
def transform_ticker_data(
self,
raw_data: Dict[str, Any],
symbol: str
) -> Dict[str, Any]:
"""
Transform raw ticker data to standardized format.
Args:
raw_data: Raw ticker data
symbol: Trading symbol
Returns:
Standardized ticker data
Raises:
ValueError: If data is invalid
"""
raise NotImplementedError("Subclasses must implement transform_ticker_data")
def batch_transform_trades(
self,
raw_trades: List[Dict[str, Any]],
symbol: str
) -> List[StandardizedTrade]:
"""
Transform multiple trades in batch.
Args:
raw_trades: List of raw trade data
symbol: Trading symbol
Returns:
List of StandardizedTrade objects
Raises:
ValueError: If data is invalid
"""
return [
self.transform_trade_data(trade, symbol)
for trade in raw_trades
]
def transform_trades_batch(
self,
raw_trades: List[Dict[str, Any]],
symbol: str,
field_mapping: Dict[str, str]
) -> List[StandardizedTrade]:
"""
Transform a batch of raw trades.
Args:
raw_trades: List of raw trade dictionaries
symbol: Trading symbol
field_mapping: Field mapping for raw data
Returns:
List of StandardizedTrade objects
"""
return batch_create_standardized_trades(
raw_trades=raw_trades,
symbol=symbol,
exchange=self.exchange,
field_mapping=field_mapping
)
def _log_error(self, message: str, error: Optional[Exception] = None) -> None:
"""Log error with component context."""
if error:
self.logger.error(f"{self.component_name}: {message}: {error}")
else:
self.logger.error(f"{self.component_name}: {message}")
def _log_warning(self, message: str) -> None:
"""Log warning with component context."""
self.logger.warning(f"{self.component_name}: {message}")
def _log_info(self, message: str) -> None:
"""Log info with component context."""
self.logger.info(f"{self.component_name}: {message}")