2025-06-07 01:17:22 +08:00
|
|
|
"""
|
|
|
|
|
Real-time candle processor for live trade data.
|
|
|
|
|
|
|
|
|
|
This module provides the RealTimeCandleProcessor class for building OHLCV candles
|
|
|
|
|
from live trade data in real-time.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
|
from decimal import Decimal
|
|
|
|
|
from typing import Dict, List, Optional, Any, Callable
|
|
|
|
|
from collections import defaultdict
|
|
|
|
|
|
2025-06-07 13:23:59 +08:00
|
|
|
from ..data_types import (
|
|
|
|
|
StandardizedTrade,
|
|
|
|
|
OHLCVCandle,
|
|
|
|
|
CandleProcessingConfig,
|
|
|
|
|
ProcessingStats
|
|
|
|
|
)
|
2025-06-07 01:17:22 +08:00
|
|
|
from .bucket import TimeframeBucket
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RealTimeCandleProcessor:
|
|
|
|
|
"""
|
|
|
|
|
Real-time candle processor for live trade data.
|
|
|
|
|
|
|
|
|
|
This class processes trades immediately as they arrive from WebSocket,
|
|
|
|
|
building candles incrementally and emitting completed candles when
|
|
|
|
|
time boundaries are crossed.
|
|
|
|
|
|
|
|
|
|
AGGREGATION PROCESS (NO FUTURE LEAKAGE):
|
|
|
|
|
|
|
|
|
|
1. Trade arrives from WebSocket/API with timestamp T
|
|
|
|
|
2. For each configured timeframe (1m, 5m, etc.):
|
|
|
|
|
a. Calculate which time bucket this trade belongs to
|
|
|
|
|
b. Get current bucket for this timeframe
|
|
|
|
|
c. Check if trade timestamp crosses time boundary
|
|
|
|
|
d. If boundary crossed: complete and emit previous bucket, create new bucket
|
|
|
|
|
e. Add trade to current bucket (updates OHLCV)
|
|
|
|
|
3. Only emit candles when time boundary is definitively crossed
|
|
|
|
|
4. Never emit incomplete/future candles during real-time processing
|
|
|
|
|
|
|
|
|
|
TIMESTAMP ALIGNMENT:
|
|
|
|
|
- Uses RIGHT-ALIGNED timestamps (industry standard)
|
|
|
|
|
- 1-minute candle covering 09:00:00-09:01:00 gets timestamp 09:01:00
|
|
|
|
|
- 5-minute candle covering 09:00:00-09:05:00 gets timestamp 09:05:00
|
|
|
|
|
- Candle represents PAST data, never future
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
|
symbol: str,
|
|
|
|
|
exchange: str,
|
|
|
|
|
config: Optional[CandleProcessingConfig] = None,
|
|
|
|
|
component_name: str = "realtime_candle_processor",
|
|
|
|
|
logger = None):
|
|
|
|
|
"""
|
|
|
|
|
Initialize real-time candle processor.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
symbol: Trading symbol (e.g., 'BTC-USDT')
|
|
|
|
|
exchange: Exchange name
|
|
|
|
|
config: Candle processing configuration
|
|
|
|
|
component_name: Name for logging/stats
|
|
|
|
|
logger: Optional logger instance
|
|
|
|
|
"""
|
|
|
|
|
self.symbol = symbol
|
|
|
|
|
self.exchange = exchange
|
|
|
|
|
self.config = config or CandleProcessingConfig()
|
|
|
|
|
self.component_name = component_name
|
|
|
|
|
self.logger = logger
|
|
|
|
|
|
|
|
|
|
# Current buckets for each timeframe
|
|
|
|
|
self.current_buckets: Dict[str, TimeframeBucket] = {}
|
|
|
|
|
|
|
|
|
|
# Callbacks for completed candles
|
|
|
|
|
self.candle_callbacks: List[Callable[[OHLCVCandle], None]] = []
|
|
|
|
|
|
|
|
|
|
# Stats tracking
|
|
|
|
|
self.stats = ProcessingStats()
|
2025-06-07 13:23:59 +08:00
|
|
|
self.stats.active_timeframes = len(self.config.timeframes)
|
2025-06-07 01:17:22 +08:00
|
|
|
|
|
|
|
|
def add_candle_callback(self, callback: Callable[[OHLCVCandle], None]) -> None:
|
|
|
|
|
"""Add callback to be called when candle is completed."""
|
|
|
|
|
self.candle_callbacks.append(callback)
|
|
|
|
|
|
|
|
|
|
def process_trade(self, trade: StandardizedTrade) -> List[OHLCVCandle]:
|
|
|
|
|
"""
|
|
|
|
|
Process a single trade and return any completed candles.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
trade: Standardized trade data
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List of completed candles (if any time boundaries were crossed)
|
|
|
|
|
"""
|
|
|
|
|
self.stats.trades_processed += 1
|
2025-06-07 13:23:59 +08:00
|
|
|
self.stats.last_trade_time = trade.timestamp
|
2025-06-07 01:17:22 +08:00
|
|
|
|
|
|
|
|
completed_candles = []
|
|
|
|
|
for timeframe in self.config.timeframes:
|
|
|
|
|
completed = self._process_trade_for_timeframe(trade, timeframe)
|
|
|
|
|
if completed:
|
|
|
|
|
completed_candles.append(completed)
|
|
|
|
|
self.stats.candles_emitted += 1
|
2025-06-07 13:23:59 +08:00
|
|
|
self.stats.last_candle_time = completed.end_time
|
2025-06-07 01:17:22 +08:00
|
|
|
|
|
|
|
|
return completed_candles
|
|
|
|
|
|
|
|
|
|
def _process_trade_for_timeframe(self, trade: StandardizedTrade, timeframe: str) -> Optional[OHLCVCandle]:
|
|
|
|
|
"""
|
|
|
|
|
Process trade for a specific timeframe and return completed candle if boundary crossed.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
trade: Trade to process
|
|
|
|
|
timeframe: Timeframe to process for (e.g., '1m', '5m')
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Completed candle if time boundary crossed, None otherwise
|
|
|
|
|
"""
|
|
|
|
|
# Calculate which bucket this trade belongs to
|
|
|
|
|
bucket_start = self._get_bucket_start_time(trade.timestamp, timeframe)
|
|
|
|
|
|
|
|
|
|
# Get current bucket for this timeframe
|
|
|
|
|
current_bucket = self.current_buckets.get(timeframe)
|
|
|
|
|
completed_candle = None
|
|
|
|
|
|
|
|
|
|
# If we have a current bucket and trade belongs in a new bucket,
|
|
|
|
|
# complete current bucket and create new one
|
|
|
|
|
if current_bucket and bucket_start >= current_bucket.end_time:
|
|
|
|
|
completed_candle = current_bucket.to_candle(is_complete=True)
|
|
|
|
|
self._emit_candle(completed_candle)
|
|
|
|
|
current_bucket = None
|
|
|
|
|
|
|
|
|
|
# Create new bucket if needed
|
|
|
|
|
if not current_bucket:
|
|
|
|
|
current_bucket = TimeframeBucket(
|
|
|
|
|
symbol=self.symbol,
|
|
|
|
|
timeframe=timeframe,
|
|
|
|
|
start_time=bucket_start,
|
|
|
|
|
exchange=self.exchange
|
|
|
|
|
)
|
|
|
|
|
self.current_buckets[timeframe] = current_bucket
|
|
|
|
|
|
|
|
|
|
# Add trade to current bucket
|
|
|
|
|
current_bucket.add_trade(trade)
|
|
|
|
|
|
|
|
|
|
return completed_candle
|
|
|
|
|
|
|
|
|
|
def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime:
|
|
|
|
|
"""
|
|
|
|
|
Calculate the start time for the bucket that this timestamp belongs to.
|
|
|
|
|
|
|
|
|
|
IMPORTANT: Uses RIGHT-ALIGNED timestamps
|
|
|
|
|
- For 5m timeframe, buckets start at 00:00, 00:05, 00:10, etc.
|
|
|
|
|
- Trade at 09:03:45 belongs to 09:00-09:05 bucket
|
|
|
|
|
- Trade at 09:07:30 belongs to 09:05-09:10 bucket
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
timestamp: Trade timestamp
|
|
|
|
|
timeframe: Time period (e.g., '1m', '5m', '1h')
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Start time for the appropriate bucket
|
|
|
|
|
"""
|
|
|
|
|
if timeframe == '1s':
|
|
|
|
|
return timestamp.replace(microsecond=0)
|
|
|
|
|
elif timeframe == '5s':
|
|
|
|
|
seconds = (timestamp.second // 5) * 5
|
|
|
|
|
return timestamp.replace(second=seconds, microsecond=0)
|
|
|
|
|
elif timeframe == '10s':
|
|
|
|
|
seconds = (timestamp.second // 10) * 10
|
|
|
|
|
return timestamp.replace(second=seconds, microsecond=0)
|
|
|
|
|
elif timeframe == '15s':
|
|
|
|
|
seconds = (timestamp.second // 15) * 15
|
|
|
|
|
return timestamp.replace(second=seconds, microsecond=0)
|
|
|
|
|
elif timeframe == '30s':
|
|
|
|
|
seconds = (timestamp.second // 30) * 30
|
|
|
|
|
return timestamp.replace(second=seconds, microsecond=0)
|
|
|
|
|
elif timeframe == '1m':
|
|
|
|
|
return timestamp.replace(second=0, microsecond=0)
|
|
|
|
|
elif timeframe == '5m':
|
|
|
|
|
minutes = (timestamp.minute // 5) * 5
|
|
|
|
|
return timestamp.replace(minute=minutes, second=0, microsecond=0)
|
|
|
|
|
elif timeframe == '15m':
|
|
|
|
|
minutes = (timestamp.minute // 15) * 15
|
|
|
|
|
return timestamp.replace(minute=minutes, second=0, microsecond=0)
|
|
|
|
|
elif timeframe == '30m':
|
|
|
|
|
minutes = (timestamp.minute // 30) * 30
|
|
|
|
|
return timestamp.replace(minute=minutes, second=0, microsecond=0)
|
|
|
|
|
elif timeframe == '1h':
|
|
|
|
|
return timestamp.replace(minute=0, second=0, microsecond=0)
|
|
|
|
|
elif timeframe == '4h':
|
|
|
|
|
hours = (timestamp.hour // 4) * 4
|
|
|
|
|
return timestamp.replace(hour=hours, minute=0, second=0, microsecond=0)
|
|
|
|
|
elif timeframe == '1d':
|
|
|
|
|
return timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError(f"Unsupported timeframe: {timeframe}")
|
|
|
|
|
|
|
|
|
|
def _emit_candle(self, candle: OHLCVCandle) -> None:
|
|
|
|
|
"""Emit completed candle to all registered callbacks."""
|
|
|
|
|
for callback in self.candle_callbacks:
|
|
|
|
|
try:
|
|
|
|
|
callback(candle)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
if self.logger:
|
|
|
|
|
self.logger.error(f"Error in candle callback: {e}")
|
2025-06-07 13:23:59 +08:00
|
|
|
self.stats.errors_count += 1
|
2025-06-07 01:17:22 +08:00
|
|
|
|
|
|
|
|
def get_current_candles(self, incomplete: bool = True) -> List[OHLCVCandle]:
|
|
|
|
|
"""
|
|
|
|
|
Get current (incomplete) candles for all timeframes.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
incomplete: Whether to mark candles as incomplete (default True)
|
|
|
|
|
"""
|
|
|
|
|
return [
|
|
|
|
|
bucket.to_candle(is_complete=not incomplete)
|
|
|
|
|
for bucket in self.current_buckets.values()
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def force_complete_all_candles(self) -> List[OHLCVCandle]:
|
|
|
|
|
"""
|
|
|
|
|
Force completion of all current candles (e.g., on connection close).
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List of completed candles
|
|
|
|
|
"""
|
|
|
|
|
completed = []
|
|
|
|
|
for timeframe, bucket in self.current_buckets.items():
|
|
|
|
|
candle = bucket.to_candle(is_complete=True)
|
|
|
|
|
completed.append(candle)
|
|
|
|
|
self._emit_candle(candle)
|
2025-06-07 13:23:59 +08:00
|
|
|
self.stats.candles_emitted += 1
|
2025-06-07 01:17:22 +08:00
|
|
|
self.current_buckets.clear()
|
|
|
|
|
return completed
|
|
|
|
|
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
|
|
|
"""Get processing statistics."""
|
2025-06-07 13:23:59 +08:00
|
|
|
stats_dict = self.stats.to_dict()
|
|
|
|
|
stats_dict.update({
|
|
|
|
|
'component': self.component_name,
|
|
|
|
|
'symbol': self.symbol,
|
|
|
|
|
'exchange': self.exchange,
|
|
|
|
|
'active_timeframes': list(self.current_buckets.keys())
|
|
|
|
|
})
|
|
|
|
|
return stats_dict
|
2025-06-07 01:17:22 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = ['RealTimeCandleProcessor']
|