TCPDashboard/data/common/transformation.py
Vasily.onl e7ede7f329 Refactor aggregation module and enhance structure
- Split the `aggregation.py` file into a dedicated sub-package, improving modularity and maintainability.
- Moved `TimeframeBucket`, `RealTimeCandleProcessor`, and `BatchCandleProcessor` classes into their respective files within the new `aggregation` sub-package.
- Introduced utility functions for trade aggregation and validation, enhancing code organization.
- Updated import paths throughout the codebase to reflect the new structure, ensuring compatibility.
- Added safety net tests for the aggregation package to verify core functionality and prevent regressions during refactoring.

These changes enhance the overall architecture of the aggregation module, making it more scalable and easier to manage.
2025-06-07 01:17:22 +08:00

484 lines
17 KiB
Python

"""
Base transformation utilities for all exchanges.
This module provides common transformation patterns and base classes
for converting exchange-specific data to standardized formats.
"""
from datetime import datetime, timezone
from decimal import Decimal
from typing import Dict, List, Optional, Any, Iterator
from abc import ABC, abstractmethod
from .data_types import StandardizedTrade, OHLCVCandle, DataValidationResult
from .aggregation.batch import BatchCandleProcessor
class BaseDataTransformer(ABC):
"""
Abstract base class for exchange data transformers.
This class provides common transformation patterns that can be
extended by exchange-specific implementations.
"""
def __init__(self,
exchange_name: str,
component_name: str = "base_data_transformer",
logger = None):
"""
Initialize base data transformer.
Args:
exchange_name: Name of the exchange (e.g., 'okx', 'binance')
component_name: Name for logging
"""
self.exchange_name = exchange_name
self.component_name = component_name
self.logger = logger
if self.logger:
self.logger.info(f"{self.component_name}: Initialized base data transformer for {exchange_name}")
# Abstract methods that must be implemented by subclasses
@abstractmethod
def transform_trade_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[StandardizedTrade]:
"""Transform exchange-specific trade data to standardized format."""
pass
@abstractmethod
def transform_orderbook_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[Dict[str, Any]]:
"""Transform exchange-specific orderbook data to standardized format."""
pass
@abstractmethod
def transform_ticker_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[Dict[str, Any]]:
"""Transform exchange-specific ticker data to standardized format."""
pass
# Common transformation utilities available to all subclasses
def timestamp_to_datetime(self, timestamp: Any, is_milliseconds: bool = True) -> datetime:
"""
Convert various timestamp formats to timezone-aware datetime.
Args:
timestamp: Timestamp in various formats
is_milliseconds: True if timestamp is in milliseconds
Returns:
Timezone-aware datetime object
"""
try:
# Convert to int/float
if isinstance(timestamp, str):
timestamp_num = float(timestamp)
elif isinstance(timestamp, (int, float)):
timestamp_num = float(timestamp)
else:
raise ValueError(f"Invalid timestamp type: {type(timestamp)}")
# Convert to seconds if needed
if is_milliseconds:
timestamp_num = timestamp_num / 1000
# Create timezone-aware datetime
dt = datetime.fromtimestamp(timestamp_num, tz=timezone.utc)
return dt
except Exception as e:
if self.logger:
self.logger.error(f"{self.component_name}: Error converting timestamp {timestamp}: {e}")
# Return current time as fallback
return datetime.now(timezone.utc)
def safe_decimal_conversion(self, value: Any, field_name: str = "value") -> Optional[Decimal]:
"""
Safely convert value to Decimal with error handling.
Args:
value: Value to convert
field_name: Name of field for error logging
Returns:
Decimal value or None if conversion failed
"""
try:
if value is None or value == "":
return None
return Decimal(str(value))
except Exception as e:
if self.logger:
self.logger.warning(f"{self.component_name}: Failed to convert {field_name} '{value}' to Decimal: {e}")
return None
def normalize_trade_side(self, side: str) -> str:
"""
Normalize trade side to standard format.
Args:
side: Raw trade side string
Returns:
Normalized side ('buy' or 'sell')
"""
normalized = side.lower().strip()
# Handle common variations
if normalized in ['buy', 'bid', 'b', '1']:
return 'buy'
elif normalized in ['sell', 'ask', 's', '0']:
return 'sell'
else:
if self.logger:
self.logger.warning(f"{self.component_name}: Unknown trade side: {side}, defaulting to 'buy'")
return 'buy'
def validate_symbol_format(self, symbol: str) -> str:
"""
Validate and normalize symbol format.
Args:
symbol: Raw symbol string
Returns:
Normalized symbol string
"""
if not symbol or not isinstance(symbol, str):
raise ValueError(f"Invalid symbol: {symbol}")
# Basic normalization
normalized = symbol.upper().strip()
if not normalized:
raise ValueError("Empty symbol after normalization")
return normalized
def transform_database_record(self, record: Any) -> Optional[StandardizedTrade]:
"""
Transform database record to standardized format.
This method should be overridden by subclasses to handle
their specific database schema.
Args:
record: Database record
Returns:
StandardizedTrade or None if transformation failed
"""
if self.logger:
self.logger.warning(f"{self.component_name}: transform_database_record not implemented for this exchange")
return None
def get_transformer_info(self) -> Dict[str, Any]:
"""Get transformer information."""
return {
'exchange': self.exchange_name,
'component': self.component_name,
'capabilities': {
'trade_transformation': True,
'orderbook_transformation': True,
'ticker_transformation': True,
'database_transformation': hasattr(self, 'transform_database_record')
}
}
class UnifiedDataTransformer:
"""
Unified data transformation system for all scenarios.
This class provides a common interface for transforming data from
various sources (real-time, historical, backfill) into standardized
formats for further processing.
TRANSFORMATION PROCESS:
1. Raw Data Input (exchange format, database records, etc.)
2. Validation (using exchange-specific validators)
3. Transformation to StandardizedTrade format
4. Optional aggregation to candles
5. Output in consistent format
"""
def __init__(self,
exchange_transformer: BaseDataTransformer,
component_name: str = "unified_data_transformer",
logger = None):
"""
Initialize unified data transformer.
Args:
exchange_transformer: Exchange-specific transformer instance
component_name: Name for logging
"""
self.exchange_transformer = exchange_transformer
self.component_name = component_name
self.logger = logger
if self.logger:
self.logger.info(f"{self.component_name}: Initialized unified data transformer with {exchange_transformer.exchange_name} transformer")
def transform_trade_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[StandardizedTrade]:
"""
Transform trade data using exchange-specific transformer.
Args:
raw_data: Raw trade data from exchange
symbol: Trading symbol
Returns:
Standardized trade or None if transformation failed
"""
try:
return self.exchange_transformer.transform_trade_data(raw_data, symbol)
except Exception as e:
if self.logger:
self.logger.error(f"{self.component_name}: Error in trade transformation: {e}")
return None
def transform_orderbook_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[Dict[str, Any]]:
"""
Transform orderbook data using exchange-specific transformer.
Args:
raw_data: Raw orderbook data from exchange
symbol: Trading symbol
Returns:
Standardized orderbook data or None if transformation failed
"""
try:
return self.exchange_transformer.transform_orderbook_data(raw_data, symbol)
except Exception as e:
if self.logger:
self.logger.error(f"{self.component_name}: Error in orderbook transformation: {e}")
return None
def transform_ticker_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[Dict[str, Any]]:
"""
Transform ticker data using exchange-specific transformer.
Args:
raw_data: Raw ticker data from exchange
symbol: Trading symbol
Returns:
Standardized ticker data or None if transformation failed
"""
try:
return self.exchange_transformer.transform_ticker_data(raw_data, symbol)
except Exception as e:
if self.logger:
self.logger.error(f"{self.component_name}: Error in ticker transformation: {e}")
return None
def process_trades_to_candles(self,
trades: Iterator[StandardizedTrade],
timeframes: List[str],
symbol: str) -> List[OHLCVCandle]:
"""
Process any trade iterator to candles - unified processing function.
This function handles ALL scenarios:
- Real-time: Single trade iterators
- Historical: Batch trade iterators
- Backfill: API trade iterators
Args:
trades: Iterator of standardized trades
timeframes: List of timeframes to generate
symbol: Trading symbol
Returns:
List of completed candles
"""
try:
processor = BatchCandleProcessor(
symbol,
self.exchange_transformer.exchange_name,
timeframes,
f"unified_batch_processor_{symbol}"
)
candles = processor.process_trades_to_candles(trades)
if self.logger:
self.logger.info(f"{self.component_name}: Processed {processor.get_stats()['trades_processed']} trades to {len(candles)} candles")
return candles
except Exception as e:
if self.logger:
self.logger.error(f"{self.component_name}: Error processing trades to candles: {e}")
return []
def batch_transform_trades(self,
raw_trades: List[Dict[str, Any]],
symbol: str) -> List[StandardizedTrade]:
"""
Transform multiple trade records in batch.
Args:
raw_trades: List of raw trade data
symbol: Trading symbol
Returns:
List of successfully transformed trades
"""
transformed_trades = []
errors = 0
for raw_trade in raw_trades:
try:
trade = self.transform_trade_data(raw_trade, symbol)
if trade:
transformed_trades.append(trade)
else:
errors += 1
except Exception as e:
if self.logger:
self.logger.error(f"{self.component_name}: Error transforming trade: {e}")
errors += 1
if self.logger:
self.logger.info(f"{self.component_name}: Batch transformed {len(transformed_trades)} trades successfully, {errors} errors")
return transformed_trades
def get_transformer_info(self) -> Dict[str, Any]:
"""Get comprehensive transformer information."""
base_info = self.exchange_transformer.get_transformer_info()
base_info.update({
'unified_component': self.component_name,
'batch_processing': True,
'candle_aggregation': True
})
return base_info
# Utility functions for common transformation patterns
def create_standardized_trade(symbol: str,
trade_id: str,
price: Any,
size: Any,
side: str,
timestamp: Any,
exchange: str,
raw_data: Optional[Dict[str, Any]] = None,
is_milliseconds: bool = True) -> StandardizedTrade:
"""
Utility function to create StandardizedTrade with proper validation.
Args:
symbol: Trading symbol
trade_id: Trade identifier
price: Trade price (any numeric type)
size: Trade size (any numeric type)
side: Trade side ('buy' or 'sell')
timestamp: Trade timestamp
exchange: Exchange name
raw_data: Original raw data
is_milliseconds: True if timestamp is in milliseconds
Returns:
StandardizedTrade object
Raises:
ValueError: If data is invalid
"""
# Convert timestamp
if isinstance(timestamp, (int, float, str)):
timestamp_num = float(timestamp)
if is_milliseconds:
timestamp_num = timestamp_num / 1000
dt = datetime.fromtimestamp(timestamp_num, tz=timezone.utc)
elif isinstance(timestamp, datetime):
dt = timestamp
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
raise ValueError(f"Invalid timestamp type: {type(timestamp)}")
# Convert price and size to Decimal
try:
decimal_price = Decimal(str(price))
decimal_size = Decimal(str(size))
except Exception as e:
raise ValueError(f"Invalid price or size: {e}")
# Normalize side
normalized_side = side.lower().strip()
if normalized_side not in ['buy', 'sell']:
raise ValueError(f"Invalid trade side: {side}")
return StandardizedTrade(
symbol=symbol.upper().strip(),
trade_id=str(trade_id),
price=decimal_price,
size=decimal_size,
side=normalized_side,
timestamp=dt,
exchange=exchange.lower(),
raw_data=raw_data
)
def batch_create_standardized_trades(raw_trades: List[Dict[str, Any]],
symbol: str,
exchange: str,
field_mapping: Dict[str, str],
is_milliseconds: bool = True) -> List[StandardizedTrade]:
"""
Batch create standardized trades from raw data.
Args:
raw_trades: List of raw trade dictionaries
symbol: Trading symbol
exchange: Exchange name
field_mapping: Mapping of StandardizedTrade fields to raw data fields
is_milliseconds: True if timestamps are in milliseconds
Returns:
List of successfully created StandardizedTrade objects
Example field_mapping:
{
'trade_id': 'id',
'price': 'px',
'size': 'sz',
'side': 'side',
'timestamp': 'ts'
}
"""
trades = []
for raw_trade in raw_trades:
try:
trade = create_standardized_trade(
symbol=symbol,
trade_id=raw_trade[field_mapping['trade_id']],
price=raw_trade[field_mapping['price']],
size=raw_trade[field_mapping['size']],
side=raw_trade[field_mapping['side']],
timestamp=raw_trade[field_mapping['timestamp']],
exchange=exchange,
raw_data=raw_trade,
is_milliseconds=is_milliseconds
)
trades.append(trade)
except Exception as e:
# Log error but continue processing
print(f"Failed to transform trade: {e}")
return trades
__all__ = [
'BaseDataTransformer',
'UnifiedDataTransformer',
'create_standardized_trade',
'batch_create_standardized_trades'
]