- Added optional logger parameter to various classes including `BaseDataCollector`, `CollectorManager`, `RealTimeCandleProcessor`, and `BatchCandleProcessor` to support conditional logging. - Implemented error-only logging mode, allowing components to log only error and critical messages when specified. - Updated logging calls to utilize new helper methods for improved readability and maintainability. - Enhanced documentation to include details on the new logging system and its usage across components. - Ensured that child components inherit the logger from their parent components for consistent logging behavior.
563 lines
21 KiB
Python
563 lines
21 KiB
Python
"""
|
|
Common aggregation utilities for all exchanges.
|
|
|
|
This module provides shared functionality for building OHLCV candles
|
|
from trade data, regardless of the source exchange.
|
|
|
|
AGGREGATION STRATEGY:
|
|
- Uses RIGHT-ALIGNED timestamps (industry standard)
|
|
- Candle timestamp = end time of the interval (close time)
|
|
- 5-minute candle with timestamp 09:05:00 represents data from 09:00:01 to 09:05:00
|
|
- Prevents future leakage by only completing candles when time boundary is crossed
|
|
- Aligns with major exchanges (Binance, OKX, Coinbase)
|
|
|
|
PROCESS FLOW:
|
|
1. Trade arrives with timestamp T
|
|
2. Calculate which time bucket this trade belongs to
|
|
3. If bucket doesn't exist or time boundary crossed, complete previous bucket
|
|
4. Add trade to current bucket
|
|
5. Only emit completed candles (never future data)
|
|
"""
|
|
|
|
from datetime import datetime, timezone, timedelta
|
|
from decimal import Decimal
|
|
from typing import Dict, List, Optional, Any, Iterator, Callable
|
|
from collections import defaultdict
|
|
|
|
from .data_types import (
|
|
StandardizedTrade,
|
|
OHLCVCandle,
|
|
CandleProcessingConfig,
|
|
ProcessingStats
|
|
)
|
|
|
|
|
|
class TimeframeBucket:
|
|
"""
|
|
Time bucket for building OHLCV candles from trades.
|
|
|
|
This class accumulates trades within a specific time period
|
|
and calculates OHLCV data incrementally.
|
|
|
|
IMPORTANT: Uses RIGHT-ALIGNED timestamps
|
|
- start_time: Beginning of the interval (inclusive)
|
|
- end_time: End of the interval (exclusive) - this becomes the candle timestamp
|
|
- Example: 09:00:00 - 09:05:00 bucket -> candle timestamp = 09:05:00
|
|
"""
|
|
|
|
def __init__(self, symbol: str, timeframe: str, start_time: datetime, exchange: str = "unknown"):
|
|
"""
|
|
Initialize time bucket for candle aggregation.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTC-USDT')
|
|
timeframe: Time period (e.g., '1m', '5m', '1h')
|
|
start_time: Start time for this bucket (inclusive)
|
|
exchange: Exchange name
|
|
"""
|
|
self.symbol = symbol
|
|
self.timeframe = timeframe
|
|
self.start_time = start_time
|
|
self.end_time = self._calculate_end_time(start_time, timeframe)
|
|
self.exchange = exchange
|
|
|
|
# OHLCV data
|
|
self.open: Optional[Decimal] = None
|
|
self.high: Optional[Decimal] = None
|
|
self.low: Optional[Decimal] = None
|
|
self.close: Optional[Decimal] = None
|
|
self.volume: Decimal = Decimal('0')
|
|
self.trade_count: int = 0
|
|
|
|
# Tracking
|
|
self.first_trade_time: Optional[datetime] = None
|
|
self.last_trade_time: Optional[datetime] = None
|
|
self.trades: List[StandardizedTrade] = []
|
|
|
|
def add_trade(self, trade: StandardizedTrade) -> bool:
|
|
"""
|
|
Add trade to this bucket if it belongs to this time period.
|
|
|
|
Args:
|
|
trade: Standardized trade data
|
|
|
|
Returns:
|
|
True if trade was added, False if outside time range
|
|
"""
|
|
# Check if trade belongs in this bucket (start_time <= trade.timestamp < end_time)
|
|
if not (self.start_time <= trade.timestamp < self.end_time):
|
|
return False
|
|
|
|
# First trade sets open price
|
|
if self.open is None:
|
|
self.open = trade.price
|
|
self.high = trade.price
|
|
self.low = trade.price
|
|
self.first_trade_time = trade.timestamp
|
|
|
|
# Update OHLCV
|
|
self.high = max(self.high, trade.price)
|
|
self.low = min(self.low, trade.price)
|
|
self.close = trade.price # Last trade sets close
|
|
self.volume += trade.size
|
|
self.trade_count += 1
|
|
self.last_trade_time = trade.timestamp
|
|
|
|
# Store trade for detailed analysis if needed
|
|
self.trades.append(trade)
|
|
|
|
return True
|
|
|
|
def to_candle(self, is_complete: bool = True) -> OHLCVCandle:
|
|
"""
|
|
Convert bucket to OHLCV candle.
|
|
|
|
IMPORTANT: Candle timestamp = end_time (right-aligned, industry standard)
|
|
"""
|
|
return OHLCVCandle(
|
|
symbol=self.symbol,
|
|
timeframe=self.timeframe,
|
|
start_time=self.start_time,
|
|
end_time=self.end_time,
|
|
open=self.open or Decimal('0'),
|
|
high=self.high or Decimal('0'),
|
|
low=self.low or Decimal('0'),
|
|
close=self.close or Decimal('0'),
|
|
volume=self.volume,
|
|
trade_count=self.trade_count,
|
|
exchange=self.exchange,
|
|
is_complete=is_complete,
|
|
first_trade_time=self.first_trade_time,
|
|
last_trade_time=self.last_trade_time
|
|
)
|
|
|
|
def _calculate_end_time(self, start_time: datetime, timeframe: str) -> datetime:
|
|
"""Calculate end time for this timeframe (right-aligned timestamp)."""
|
|
if timeframe == '1m':
|
|
return start_time + timedelta(minutes=1)
|
|
elif timeframe == '5m':
|
|
return start_time + timedelta(minutes=5)
|
|
elif timeframe == '15m':
|
|
return start_time + timedelta(minutes=15)
|
|
elif timeframe == '30m':
|
|
return start_time + timedelta(minutes=30)
|
|
elif timeframe == '1h':
|
|
return start_time + timedelta(hours=1)
|
|
elif timeframe == '4h':
|
|
return start_time + timedelta(hours=4)
|
|
elif timeframe == '1d':
|
|
return start_time + timedelta(days=1)
|
|
else:
|
|
raise ValueError(f"Unsupported timeframe: {timeframe}")
|
|
|
|
|
|
class RealTimeCandleProcessor:
|
|
"""
|
|
Real-time candle processor for live trade data.
|
|
|
|
This class processes trades immediately as they arrive from WebSocket,
|
|
building candles incrementally and emitting completed candles when
|
|
time boundaries are crossed.
|
|
|
|
AGGREGATION PROCESS (NO FUTURE LEAKAGE):
|
|
|
|
1. Trade arrives from WebSocket/API with timestamp T
|
|
2. For each configured timeframe (1m, 5m, etc.):
|
|
a. Calculate which time bucket this trade belongs to
|
|
b. Get current bucket for this timeframe
|
|
c. Check if trade timestamp crosses time boundary
|
|
d. If boundary crossed: complete and emit previous bucket, create new bucket
|
|
e. Add trade to current bucket (updates OHLCV)
|
|
3. Only emit candles when time boundary is definitively crossed
|
|
4. Never emit incomplete/future candles during real-time processing
|
|
|
|
TIMESTAMP ALIGNMENT:
|
|
- Uses RIGHT-ALIGNED timestamps (industry standard)
|
|
- 1-minute candle covering 09:00:00-09:01:00 gets timestamp 09:01:00
|
|
- 5-minute candle covering 09:00:00-09:05:00 gets timestamp 09:05:00
|
|
- Candle represents PAST data, never future
|
|
"""
|
|
|
|
def __init__(self,
|
|
symbol: str,
|
|
exchange: str,
|
|
config: Optional[CandleProcessingConfig] = None,
|
|
component_name: str = "realtime_candle_processor",
|
|
logger = None):
|
|
"""
|
|
Initialize real-time candle processor.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTC-USDT')
|
|
exchange: Exchange name (e.g., 'okx', 'binance')
|
|
config: Processing configuration
|
|
component_name: Name for logging
|
|
"""
|
|
self.symbol = symbol
|
|
self.exchange = exchange
|
|
self.config = config or CandleProcessingConfig()
|
|
self.component_name = component_name
|
|
self.logger = logger
|
|
|
|
# Current buckets for each timeframe
|
|
self.current_buckets: Dict[str, TimeframeBucket] = {}
|
|
|
|
# Callback functions for completed candles
|
|
self.candle_callbacks: List[Callable[[OHLCVCandle], None]] = []
|
|
|
|
# Statistics
|
|
self.stats = ProcessingStats(active_timeframes=len(self.config.timeframes))
|
|
|
|
if self.logger:
|
|
self.logger.info(f"{self.component_name}: Initialized real-time candle processor for {symbol} on {exchange} with timeframes: {self.config.timeframes}")
|
|
|
|
def add_candle_callback(self, callback: Callable[[OHLCVCandle], None]) -> None:
|
|
"""Add callback function to receive completed candles."""
|
|
self.candle_callbacks.append(callback)
|
|
if self.logger:
|
|
self.logger.debug(f"{self.component_name}: Added candle callback: {callback.__name__ if hasattr(callback, '__name__') else str(callback)}")
|
|
|
|
def process_trade(self, trade: StandardizedTrade) -> List[OHLCVCandle]:
|
|
"""
|
|
Process single trade - main entry point for real-time processing.
|
|
|
|
This is called for each trade as it arrives from WebSocket.
|
|
|
|
CRITICAL: Only returns completed candles (time boundary crossed)
|
|
Never returns incomplete/future candles to prevent leakage.
|
|
|
|
Args:
|
|
trade: Standardized trade data
|
|
|
|
Returns:
|
|
List of completed candles (if any time boundaries were crossed)
|
|
"""
|
|
try:
|
|
completed_candles = []
|
|
|
|
# Process trade for each timeframe
|
|
for timeframe in self.config.timeframes:
|
|
candle = self._process_trade_for_timeframe(trade, timeframe)
|
|
if candle:
|
|
completed_candles.append(candle)
|
|
|
|
# Update statistics
|
|
self.stats.trades_processed += 1
|
|
self.stats.last_trade_time = trade.timestamp
|
|
|
|
# Emit completed candles to callbacks
|
|
for candle in completed_candles:
|
|
self._emit_candle(candle)
|
|
|
|
return completed_candles
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error processing trade for {self.symbol}: {e}")
|
|
self.stats.errors_count += 1
|
|
return []
|
|
|
|
def _process_trade_for_timeframe(self, trade: StandardizedTrade, timeframe: str) -> Optional[OHLCVCandle]:
|
|
"""
|
|
Process trade for specific timeframe.
|
|
|
|
CRITICAL LOGIC FOR PREVENTING FUTURE LEAKAGE:
|
|
1. Calculate which bucket this trade belongs to
|
|
2. Check if current bucket exists and matches
|
|
3. If bucket mismatch (time boundary crossed), complete current bucket first
|
|
4. Create new bucket and add trade
|
|
5. Only return completed candles, never incomplete ones
|
|
"""
|
|
try:
|
|
# Calculate which bucket this trade belongs to
|
|
trade_bucket_start = self._get_bucket_start_time(trade.timestamp, timeframe)
|
|
|
|
# Check if we have a current bucket for this timeframe
|
|
current_bucket = self.current_buckets.get(timeframe)
|
|
completed_candle = None
|
|
|
|
# If no bucket exists or time boundary crossed, handle transition
|
|
if current_bucket is None:
|
|
# First bucket for this timeframe
|
|
current_bucket = TimeframeBucket(self.symbol, timeframe, trade_bucket_start, self.exchange)
|
|
self.current_buckets[timeframe] = current_bucket
|
|
elif current_bucket.start_time != trade_bucket_start:
|
|
# Time boundary crossed - complete previous bucket
|
|
if current_bucket.trade_count > 0: # Only complete if it has trades
|
|
completed_candle = current_bucket.to_candle(is_complete=True)
|
|
self.stats.candles_emitted += 1
|
|
self.stats.last_candle_time = completed_candle.end_time
|
|
|
|
# Create new bucket for current time period
|
|
current_bucket = TimeframeBucket(self.symbol, timeframe, trade_bucket_start, self.exchange)
|
|
self.current_buckets[timeframe] = current_bucket
|
|
|
|
# Add trade to current bucket
|
|
if not current_bucket.add_trade(trade):
|
|
# This should never happen if logic is correct
|
|
if self.logger:
|
|
self.logger.warning(f"{self.component_name}: Trade {trade.timestamp} could not be added to bucket {current_bucket.start_time}-{current_bucket.end_time}")
|
|
|
|
return completed_candle
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error processing trade for timeframe {timeframe}: {e}")
|
|
self.stats.errors_count += 1
|
|
return None
|
|
|
|
def _get_bucket_start_time(self, timestamp: datetime, timeframe: str) -> datetime:
|
|
"""
|
|
Calculate bucket start time for given timestamp and timeframe.
|
|
|
|
This function determines which time bucket a trade belongs to.
|
|
The start time is the LEFT boundary of the interval.
|
|
|
|
EXAMPLES:
|
|
- Trade at 09:03:45 for 5m timeframe -> bucket start = 09:00:00
|
|
- Trade at 09:07:23 for 5m timeframe -> bucket start = 09:05:00
|
|
- Trade at 14:00:00 for 1h timeframe -> bucket start = 14:00:00
|
|
|
|
Args:
|
|
timestamp: Trade timestamp
|
|
timeframe: Target timeframe
|
|
|
|
Returns:
|
|
Bucket start time (left boundary)
|
|
"""
|
|
# Normalize to UTC and remove microseconds for clean boundaries
|
|
dt = timestamp.replace(second=0, microsecond=0)
|
|
|
|
if timeframe == '1m':
|
|
# 1-minute buckets align to minute boundaries
|
|
return dt
|
|
elif timeframe == '5m':
|
|
# 5-minute buckets: 00:00, 00:05, 00:10, etc.
|
|
return dt.replace(minute=(dt.minute // 5) * 5)
|
|
elif timeframe == '15m':
|
|
# 15-minute buckets: 00:00, 00:15, 00:30, 00:45
|
|
return dt.replace(minute=(dt.minute // 15) * 15)
|
|
elif timeframe == '30m':
|
|
# 30-minute buckets: 00:00, 00:30
|
|
return dt.replace(minute=(dt.minute // 30) * 30)
|
|
elif timeframe == '1h':
|
|
# 1-hour buckets align to hour boundaries
|
|
return dt.replace(minute=0)
|
|
elif timeframe == '4h':
|
|
# 4-hour buckets: 00:00, 04:00, 08:00, 12:00, 16:00, 20:00
|
|
return dt.replace(minute=0, hour=(dt.hour // 4) * 4)
|
|
elif timeframe == '1d':
|
|
# 1-day buckets align to day boundaries (midnight UTC)
|
|
return dt.replace(minute=0, hour=0)
|
|
else:
|
|
raise ValueError(f"Unsupported timeframe: {timeframe}")
|
|
|
|
def _emit_candle(self, candle: OHLCVCandle) -> None:
|
|
"""Emit completed candle to all callbacks."""
|
|
try:
|
|
for callback in self.candle_callbacks:
|
|
callback(candle)
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error in candle callback: {e}")
|
|
self.stats.errors_count += 1
|
|
|
|
def get_current_candles(self, incomplete: bool = True) -> List[OHLCVCandle]:
|
|
"""
|
|
Get current incomplete candles for all timeframes.
|
|
|
|
WARNING: These are incomplete candles and should NOT be used for trading decisions.
|
|
They are useful for monitoring/debugging only.
|
|
"""
|
|
candles = []
|
|
for bucket in self.current_buckets.values():
|
|
if bucket.trade_count > 0: # Only return buckets with trades
|
|
candles.append(bucket.to_candle(is_complete=False))
|
|
return candles
|
|
|
|
def force_complete_all_candles(self) -> List[OHLCVCandle]:
|
|
"""
|
|
Force completion of all current candles (useful for shutdown/batch processing).
|
|
|
|
WARNING: This should only be used during shutdown or batch processing,
|
|
not during live trading as it forces incomplete candles to be marked complete.
|
|
"""
|
|
completed_candles = []
|
|
for bucket in self.current_buckets.values():
|
|
if bucket.trade_count > 0:
|
|
candle = bucket.to_candle(is_complete=True)
|
|
completed_candles.append(candle)
|
|
self._emit_candle(candle)
|
|
|
|
# Clear buckets
|
|
self.current_buckets.clear()
|
|
return completed_candles
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get processing statistics."""
|
|
stats_dict = self.stats.to_dict()
|
|
stats_dict['current_buckets'] = {
|
|
tf: bucket.trade_count for tf, bucket in self.current_buckets.items()
|
|
}
|
|
return stats_dict
|
|
|
|
|
|
class BatchCandleProcessor:
|
|
"""
|
|
Batch candle processor for historical data processing.
|
|
|
|
This class processes large batches of historical trades efficiently,
|
|
building candles for multiple timeframes simultaneously.
|
|
"""
|
|
|
|
def __init__(self,
|
|
symbol: str,
|
|
exchange: str,
|
|
timeframes: List[str],
|
|
component_name: str = "batch_candle_processor",
|
|
logger = None):
|
|
"""
|
|
Initialize batch candle processor.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
exchange: Exchange name
|
|
timeframes: List of timeframes to process
|
|
component_name: Name for logging
|
|
"""
|
|
self.symbol = symbol
|
|
self.exchange = exchange
|
|
self.timeframes = timeframes
|
|
self.component_name = component_name
|
|
self.logger = logger
|
|
|
|
# Statistics
|
|
self.stats = ProcessingStats(active_timeframes=len(timeframes))
|
|
|
|
if self.logger:
|
|
self.logger.info(f"{self.component_name}: Initialized batch candle processor for {symbol} on {exchange}")
|
|
|
|
def process_trades_to_candles(self, trades: Iterator[StandardizedTrade]) -> List[OHLCVCandle]:
|
|
"""
|
|
Process trade iterator to candles - optimized for batch processing.
|
|
|
|
This function handles ALL scenarios:
|
|
- Historical: Batch trade iterators
|
|
- Backfill: API trade iterators
|
|
- Real-time batch: Multiple trades at once
|
|
|
|
Args:
|
|
trades: Iterator of standardized trades
|
|
|
|
Returns:
|
|
List of completed candles
|
|
"""
|
|
try:
|
|
# Create temporary processor for this batch
|
|
config = CandleProcessingConfig(timeframes=self.timeframes, auto_save_candles=False)
|
|
processor = RealTimeCandleProcessor(
|
|
self.symbol, self.exchange, config,
|
|
f"batch_processor_{self.symbol}_{self.exchange}"
|
|
)
|
|
|
|
all_candles = []
|
|
|
|
# Process all trades
|
|
for trade in trades:
|
|
completed_candles = processor.process_trade(trade)
|
|
all_candles.extend(completed_candles)
|
|
self.stats.trades_processed += 1
|
|
|
|
# Force complete any remaining candles
|
|
remaining_candles = processor.force_complete_all_candles()
|
|
all_candles.extend(remaining_candles)
|
|
|
|
# Update stats
|
|
self.stats.candles_emitted = len(all_candles)
|
|
if all_candles:
|
|
self.stats.last_candle_time = max(candle.end_time for candle in all_candles)
|
|
|
|
if self.logger:
|
|
self.logger.info(f"{self.component_name}: Batch processed {self.stats.trades_processed} trades to {len(all_candles)} candles")
|
|
return all_candles
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"{self.component_name}: Error in batch processing trades to candles: {e}")
|
|
self.stats.errors_count += 1
|
|
return []
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get processing statistics."""
|
|
return self.stats.to_dict()
|
|
|
|
|
|
# Utility functions for common aggregation operations
|
|
|
|
def aggregate_trades_to_candles(trades: List[StandardizedTrade],
|
|
timeframes: List[str],
|
|
symbol: str,
|
|
exchange: str) -> List[OHLCVCandle]:
|
|
"""
|
|
Simple utility function to aggregate a list of trades to candles.
|
|
|
|
Args:
|
|
trades: List of standardized trades
|
|
timeframes: List of timeframes to generate
|
|
symbol: Trading symbol
|
|
exchange: Exchange name
|
|
|
|
Returns:
|
|
List of completed candles
|
|
"""
|
|
processor = BatchCandleProcessor(symbol, exchange, timeframes)
|
|
return processor.process_trades_to_candles(iter(trades))
|
|
|
|
|
|
def validate_timeframe(timeframe: str) -> bool:
|
|
"""
|
|
Validate if timeframe is supported.
|
|
|
|
Args:
|
|
timeframe: Timeframe string (e.g., '1m', '5m', '1h')
|
|
|
|
Returns:
|
|
True if supported, False otherwise
|
|
"""
|
|
supported = ['1m', '5m', '15m', '30m', '1h', '4h', '1d']
|
|
return timeframe in supported
|
|
|
|
|
|
def parse_timeframe(timeframe: str) -> tuple[int, str]:
|
|
"""
|
|
Parse timeframe string into number and unit.
|
|
|
|
Args:
|
|
timeframe: Timeframe string (e.g., '5m', '1h')
|
|
|
|
Returns:
|
|
Tuple of (number, unit)
|
|
|
|
Examples:
|
|
'5m' -> (5, 'm')
|
|
'1h' -> (1, 'h')
|
|
'1d' -> (1, 'd')
|
|
"""
|
|
import re
|
|
match = re.match(r'^(\d+)([mhd])$', timeframe.lower())
|
|
if not match:
|
|
raise ValueError(f"Invalid timeframe format: {timeframe}")
|
|
|
|
number = int(match.group(1))
|
|
unit = match.group(2)
|
|
return number, unit
|
|
|
|
|
|
__all__ = [
|
|
'TimeframeBucket',
|
|
'RealTimeCandleProcessor',
|
|
'BatchCandleProcessor',
|
|
'aggregate_trades_to_candles',
|
|
'validate_timeframe',
|
|
'parse_timeframe'
|
|
] |