- Introduced a new transformation module that includes safety limits for trade operations, enhancing data integrity and preventing errors. - Refactored existing transformation logic into dedicated classes and functions, improving modularity and maintainability. - Added detailed validation for trade sizes, prices, and symbol formats, ensuring compliance with trading rules. - Implemented logging for significant operations and validation checks, aiding in monitoring and debugging. - Created a changelog to document the new features and changes, providing clarity for future development. - Developed extensive unit tests to cover the new functionality, ensuring reliability and preventing regressions. These changes significantly enhance the architecture of the transformation module, making it more robust and easier to manage.
249 lines
9.4 KiB
Python
249 lines
9.4 KiB
Python
"""
|
|
Real-time candle processor for live trade data.
|
|
|
|
This module provides the RealTimeCandleProcessor class for building OHLCV candles
|
|
from live trade data in real-time.
|
|
"""
|
|
|
|
from datetime import datetime, timezone, timedelta
|
|
from decimal import Decimal
|
|
from typing import Dict, List, Optional, Any, Callable
|
|
from collections import defaultdict
|
|
|
|
from ..data_types import (
|
|
StandardizedTrade,
|
|
OHLCVCandle,
|
|
CandleProcessingConfig,
|
|
ProcessingStats
|
|
)
|
|
from .bucket import TimeframeBucket
|
|
|
|
|
|
class RealTimeCandleProcessor:
|
|
"""
|
|
Real-time candle processor for live trade data.
|
|
|
|
This class processes trades immediately as they arrive from WebSocket,
|
|
building candles incrementally and emitting completed candles when
|
|
time boundaries are crossed.
|
|
|
|
AGGREGATION PROCESS (NO FUTURE LEAKAGE):
|
|
|
|
1. Trade arrives from WebSocket/API with timestamp T
|
|
2. For each configured timeframe (1m, 5m, etc.):
|
|
a. Calculate which time bucket this trade belongs to
|
|
b. Get current bucket for this timeframe
|
|
c. Check if trade timestamp crosses time boundary
|
|
d. If boundary crossed: complete and emit previous bucket, create new bucket
|
|
e. Add trade to current bucket (updates OHLCV)
|
|
3. Only emit candles when time boundary is definitively crossed
|
|
4. Never emit incomplete/future candles during real-time processing
|
|
|
|
TIMESTAMP ALIGNMENT:
|
|
- Uses RIGHT-ALIGNED timestamps (industry standard)
|
|
- 1-minute candle covering 09:00:00-09:01:00 gets timestamp 09:01:00
|
|
- 5-minute candle covering 09:00:00-09:05:00 gets timestamp 09:05:00
|
|
- Candle represents PAST data, never future
|
|
"""
|
|
|
|
def __init__(self,
|
|
symbol: str,
|
|
exchange: str,
|
|
config: Optional[CandleProcessingConfig] = None,
|
|
component_name: str = "realtime_candle_processor",
|
|
logger = None):
|
|
"""
|
|
Initialize real-time candle processor.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTC-USDT')
|
|
exchange: Exchange name
|
|
config: Candle processing configuration
|
|
component_name: Name for logging/stats
|
|
logger: Optional logger instance
|
|
"""
|
|
self.symbol = symbol
|
|
self.exchange = exchange
|
|
self.config = config or CandleProcessingConfig()
|
|
self.component_name = component_name
|
|
self.logger = logger
|
|
|
|
# Current buckets for each timeframe
|
|
self.current_buckets: Dict[str, TimeframeBucket] = {}
|
|
|
|
# Callbacks for completed candles
|
|
self.candle_callbacks: List[Callable[[OHLCVCandle], None]] = []
|
|
|
|
# Stats tracking
|
|
self.stats = ProcessingStats()
|
|
self.stats.active_timeframes = len(self.config.timeframes)
|
|
|
|
def add_candle_callback(self, callback: Callable[[OHLCVCandle], None]) -> None:
|
|
"""Add callback to be called when candle is completed."""
|
|
self.candle_callbacks.append(callback)
|
|
|
|
def process_trade(self, trade: StandardizedTrade) -> List[OHLCVCandle]:
|
|
"""
|
|
Process a single trade and return any completed candles.
|
|
|
|
Args:
|
|
trade: Standardized trade data
|
|
|
|
Returns:
|
|
List of completed candles (if any time boundaries were crossed)
|
|
"""
|
|
self.stats.trades_processed += 1
|
|
self.stats.last_trade_time = trade.timestamp
|
|
|
|
completed_candles = []
|
|
for timeframe in self.config.timeframes:
|
|
completed = self._process_trade_for_timeframe(trade, timeframe)
|
|
if completed:
|
|
completed_candles.append(completed)
|
|
self.stats.candles_emitted += 1
|
|
self.stats.last_candle_time = completed.end_time
|
|
|
|
return completed_candles
|
|
|
|
def _process_trade_for_timeframe(self, trade: StandardizedTrade, timeframe: str) -> Optional[OHLCVCandle]:
|
|
"""
|
|
Process trade for a specific timeframe and return completed candle if boundary crossed.
|
|
|
|
Args:
|
|
trade: Trade to process
|
|
timeframe: Timeframe to process for (e.g., '1m', '5m')
|
|
|
|
Returns:
|
|
Completed candle if time boundary crossed, None otherwise
|
|
"""
|
|
# Calculate which bucket this trade belongs to
|
|
bucket_start = self._get_bucket_start_time(trade.timestamp, timeframe)
|
|
|
|
# Get current bucket for this timeframe
|
|
current_bucket = self.current_buckets.get(timeframe)
|
|
completed_candle = None
|
|
|
|
# If we have a current bucket and trade belongs in a new bucket,
|
|
# complete current bucket and create new one
|
|
if current_bucket and bucket_start >= current_bucket.end_time:
|
|
completed_candle = current_bucket.to_candle(is_complete=True)
|
|
self._emit_candle(completed_candle)
|
|
current_bucket = None
|
|
|
|
# Create new bucket if needed
|
|
if not current_bucket:
|
|
current_bucket = TimeframeBucket(
|
|
symbol=self.symbol,
|
|
timeframe=timeframe,
|
|
start_time=bucket_start,
|
|
exchange=self.exchange
|
|
)
|
|
self.current_buckets[timeframe] = current_bucket
|
|
|
|
# Add trade to current bucket
|
|
current_bucket.add_trade(trade)
|
|
|
|
return completed_candle
|
|
|
|
def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime:
|
|
"""
|
|
Calculate the start time for the bucket that this timestamp belongs to.
|
|
|
|
IMPORTANT: Uses RIGHT-ALIGNED timestamps
|
|
- For 5m timeframe, buckets start at 00:00, 00:05, 00:10, etc.
|
|
- Trade at 09:03:45 belongs to 09:00-09:05 bucket
|
|
- Trade at 09:07:30 belongs to 09:05-09:10 bucket
|
|
|
|
Args:
|
|
timestamp: Trade timestamp
|
|
timeframe: Time period (e.g., '1m', '5m', '1h')
|
|
|
|
Returns:
|
|
Start time for the appropriate bucket
|
|
"""
|
|
if timeframe == '1s':
|
|
return timestamp.replace(microsecond=0)
|
|
elif timeframe == '5s':
|
|
seconds = (timestamp.second // 5) * 5
|
|
return timestamp.replace(second=seconds, microsecond=0)
|
|
elif timeframe == '10s':
|
|
seconds = (timestamp.second // 10) * 10
|
|
return timestamp.replace(second=seconds, microsecond=0)
|
|
elif timeframe == '15s':
|
|
seconds = (timestamp.second // 15) * 15
|
|
return timestamp.replace(second=seconds, microsecond=0)
|
|
elif timeframe == '30s':
|
|
seconds = (timestamp.second // 30) * 30
|
|
return timestamp.replace(second=seconds, microsecond=0)
|
|
elif timeframe == '1m':
|
|
return timestamp.replace(second=0, microsecond=0)
|
|
elif timeframe == '5m':
|
|
minutes = (timestamp.minute // 5) * 5
|
|
return timestamp.replace(minute=minutes, second=0, microsecond=0)
|
|
elif timeframe == '15m':
|
|
minutes = (timestamp.minute // 15) * 15
|
|
return timestamp.replace(minute=minutes, second=0, microsecond=0)
|
|
elif timeframe == '30m':
|
|
minutes = (timestamp.minute // 30) * 30
|
|
return timestamp.replace(minute=minutes, second=0, microsecond=0)
|
|
elif timeframe == '1h':
|
|
return timestamp.replace(minute=0, second=0, microsecond=0)
|
|
elif timeframe == '4h':
|
|
hours = (timestamp.hour // 4) * 4
|
|
return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0)
|
|
elif timeframe == '1d':
|
|
return timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
else:
|
|
raise ValueError(f"Unsupported timeframe: {timeframe}")
|
|
|
|
def _emit_candle(self, candle: OHLCVCandle) -> None:
|
|
"""Emit completed candle to all registered callbacks."""
|
|
for callback in self.candle_callbacks:
|
|
try:
|
|
callback(candle)
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"Error in candle callback: {e}")
|
|
self.stats.errors_count += 1
|
|
|
|
def get_current_candles(self, incomplete: bool = True) -> List[OHLCVCandle]:
|
|
"""
|
|
Get current (incomplete) candles for all timeframes.
|
|
|
|
Args:
|
|
incomplete: Whether to mark candles as incomplete (default True)
|
|
"""
|
|
return [
|
|
bucket.to_candle(is_complete=not incomplete)
|
|
for bucket in self.current_buckets.values()
|
|
]
|
|
|
|
def force_complete_all_candles(self) -> List[OHLCVCandle]:
|
|
"""
|
|
Force completion of all current candles (e.g., on connection close).
|
|
|
|
Returns:
|
|
List of completed candles
|
|
"""
|
|
completed = []
|
|
for timeframe, bucket in self.current_buckets.items():
|
|
candle = bucket.to_candle(is_complete=True)
|
|
completed.append(candle)
|
|
self._emit_candle(candle)
|
|
self.stats.candles_emitted += 1
|
|
self.current_buckets.clear()
|
|
return completed
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get processing statistics."""
|
|
stats_dict = self.stats.to_dict()
|
|
stats_dict.update({
|
|
'component': self.component_name,
|
|
'symbol': self.symbol,
|
|
'exchange': self.exchange,
|
|
'active_timeframes': list(self.current_buckets.keys())
|
|
})
|
|
return stats_dict
|
|
|
|
|
|
__all__ = ['RealTimeCandleProcessor'] |