""" Timeframe aggregation utilities for the IncrementalTrader framework. This module provides utilities for aggregating minute-level OHLCV data to higher timeframes with mathematical correctness and proper timestamp handling. Key Features: - Uses pandas resampling for mathematical correctness - Supports bar end timestamps (default) to prevent future data leakage - Proper OHLCV aggregation rules (first/max/min/last/sum) - MinuteDataBuffer for efficient real-time data management - Comprehensive error handling and validation Critical Fixes: 1. Bar timestamps represent END of period (no future data leakage) 2. Correct OHLCV aggregation matching pandas resampling 3. Proper handling of incomplete bars and edge cases """ import pandas as pd import numpy as np from typing import Dict, List, Optional, Union, Any from collections import deque import logging import re logger = logging.getLogger(__name__) class TimeframeError(Exception): """Exception raised for timeframe-related errors.""" pass def parse_timeframe_to_minutes(timeframe: str) -> int: """ Parse timeframe string to minutes. Args: timeframe: Timeframe string (e.g., "1min", "5min", "15min", "1h", "4h", "1d") Returns: Number of minutes in the timeframe Raises: TimeframeError: If timeframe format is invalid Examples: >>> parse_timeframe_to_minutes("15min") 15 >>> parse_timeframe_to_minutes("1h") 60 >>> parse_timeframe_to_minutes("1d") 1440 """ if not isinstance(timeframe, str): raise TimeframeError(f"Timeframe must be a string, got {type(timeframe)}") timeframe = timeframe.lower().strip() # Handle common timeframe formats patterns = { r'^(\d+)min$': lambda m: int(m.group(1)), r'^(\d+)h$': lambda m: int(m.group(1)) * 60, r'^(\d+)d$': lambda m: int(m.group(1)) * 1440, r'^(\d+)w$': lambda m: int(m.group(1)) * 10080, # 7 * 24 * 60 } for pattern, converter in patterns.items(): match = re.match(pattern, timeframe) if match: minutes = converter(match) if minutes <= 0: raise TimeframeError(f"Timeframe must be positive, got {minutes} minutes") return minutes raise TimeframeError(f"Invalid timeframe format: {timeframe}. " f"Supported formats: Nmin, Nh, Nd, Nw (e.g., 15min, 1h, 1d)") def aggregate_minute_data_to_timeframe( minute_data: List[Dict[str, Union[float, pd.Timestamp]]], timeframe: str, timestamp_mode: str = "end" ) -> List[Dict[str, Union[float, pd.Timestamp]]]: """ Aggregate minute-level OHLCV data to specified timeframe using pandas resampling. This function provides mathematically correct aggregation that matches pandas resampling behavior, with proper timestamp handling to prevent future data leakage. Args: minute_data: List of minute OHLCV dictionaries with 'timestamp' field timeframe: Target timeframe ("1min", "5min", "15min", "1h", "4h", "1d") timestamp_mode: "end" (default) for bar end timestamps, "start" for bar start Returns: List of aggregated OHLCV dictionaries with proper timestamps Raises: TimeframeError: If timeframe format is invalid or data is malformed ValueError: If minute_data is empty or contains invalid data Examples: >>> minute_data = [ ... {'timestamp': pd.Timestamp('2024-01-01 09:00'), 'open': 100, 'high': 102, 'low': 99, 'close': 101, 'volume': 1000}, ... {'timestamp': pd.Timestamp('2024-01-01 09:01'), 'open': 101, 'high': 103, 'low': 100, 'close': 102, 'volume': 1200}, ... ] >>> result = aggregate_minute_data_to_timeframe(minute_data, "15min") >>> len(result) 1 >>> result[0]['timestamp'] # Bar end timestamp Timestamp('2024-01-01 09:15:00') """ if not minute_data: return [] if not isinstance(minute_data, list): raise ValueError("minute_data must be a list of dictionaries") if timestamp_mode not in ["end", "start"]: raise ValueError("timestamp_mode must be 'end' or 'start'") # Validate timeframe timeframe_minutes = parse_timeframe_to_minutes(timeframe) # If requesting 1min data, return as-is (with timestamp mode adjustment) if timeframe_minutes == 1: if timestamp_mode == "end": # Adjust timestamps to represent bar end (add 1 minute) result = [] for data_point in minute_data: adjusted_point = data_point.copy() adjusted_point['timestamp'] = data_point['timestamp'] + pd.Timedelta(minutes=1) result.append(adjusted_point) return result else: return minute_data.copy() # Validate data structure required_fields = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] for i, data_point in enumerate(minute_data): if not isinstance(data_point, dict): raise ValueError(f"Data point {i} must be a dictionary") for field in required_fields: if field not in data_point: raise ValueError(f"Data point {i} missing required field: {field}") # Validate timestamp if not isinstance(data_point['timestamp'], pd.Timestamp): try: data_point['timestamp'] = pd.Timestamp(data_point['timestamp']) except Exception as e: raise ValueError(f"Invalid timestamp in data point {i}: {e}") try: # Convert to DataFrame for pandas resampling df = pd.DataFrame(minute_data) df = df.set_index('timestamp') # Sort by timestamp to ensure proper ordering df = df.sort_index() # Use pandas resampling for mathematical correctness freq_str = f'{timeframe_minutes}min' # Use trading industry standard grouping: label='left', closed='left' # This means 5min bar starting at 09:00 includes minutes 09:00-09:04 resampled = df.resample(freq_str, label='left', closed='left').agg({ 'open': 'first', # First open in the period 'high': 'max', # Maximum high in the period 'low': 'min', # Minimum low in the period 'close': 'last', # Last close in the period 'volume': 'sum' # Sum of volume in the period }) # Remove any rows with NaN values (incomplete periods) resampled = resampled.dropna() # Convert back to list of dictionaries result = [] for timestamp, row in resampled.iterrows(): # Adjust timestamp based on mode if timestamp_mode == "end": # Convert bar start timestamp to bar end timestamp bar_end_timestamp = timestamp + pd.Timedelta(minutes=timeframe_minutes) final_timestamp = bar_end_timestamp else: # Keep bar start timestamp final_timestamp = timestamp result.append({ 'timestamp': final_timestamp, 'open': float(row['open']), 'high': float(row['high']), 'low': float(row['low']), 'close': float(row['close']), 'volume': float(row['volume']) }) return result except Exception as e: raise TimeframeError(f"Failed to aggregate data to {timeframe}: {e}") def get_latest_complete_bar( minute_data: List[Dict[str, Union[float, pd.Timestamp]]], timeframe: str, timestamp_mode: str = "end" ) -> Optional[Dict[str, Union[float, pd.Timestamp]]]: """ Get the latest complete bar from minute data for the specified timeframe. This function is useful for real-time processing where you only want to process complete bars and avoid using incomplete/future data. Args: minute_data: List of minute OHLCV dictionaries with 'timestamp' field timeframe: Target timeframe ("1min", "5min", "15min", "1h", "4h", "1d") timestamp_mode: "end" (default) for bar end timestamps, "start" for bar start Returns: Latest complete bar dictionary, or None if no complete bars available Examples: >>> minute_data = [...] # 30 minutes of data >>> latest_15m = get_latest_complete_bar(minute_data, "15min") >>> latest_15m['timestamp'] # Will be 15 minutes ago (complete bar) """ if not minute_data: return None # Get all aggregated bars aggregated_bars = aggregate_minute_data_to_timeframe(minute_data, timeframe, timestamp_mode) if not aggregated_bars: return None # For real-time processing, we need to ensure the bar is truly complete # This means the bar's end time should be before the current time latest_minute_timestamp = max(data['timestamp'] for data in minute_data) # Filter out incomplete bars complete_bars = [] for bar in aggregated_bars: if timestamp_mode == "end": # Bar timestamp is the end time, so it should be <= latest minute + 1 minute if bar['timestamp'] <= latest_minute_timestamp + pd.Timedelta(minutes=1): complete_bars.append(bar) else: # Bar timestamp is the start time, check if enough time has passed timeframe_minutes = parse_timeframe_to_minutes(timeframe) bar_end_time = bar['timestamp'] + pd.Timedelta(minutes=timeframe_minutes) if bar_end_time <= latest_minute_timestamp + pd.Timedelta(minutes=1): complete_bars.append(bar) return complete_bars[-1] if complete_bars else None class MinuteDataBuffer: """ Helper class for managing minute data buffers in real-time strategies. This class provides efficient buffer management for minute-level data with automatic aggregation capabilities. It's designed for use in incremental strategies that need to maintain a rolling window of minute data. Features: - Automatic buffer size management with configurable limits - Efficient data access and aggregation methods - Memory-bounded operation (doesn't grow indefinitely) - Thread-safe operations for real-time use - Comprehensive validation and error handling Example: >>> buffer = MinuteDataBuffer(max_size=1440) # 24 hours >>> buffer.add(timestamp, {'open': 100, 'high': 102, 'low': 99, 'close': 101, 'volume': 1000}) >>> bars_15m = buffer.aggregate_to_timeframe("15min", lookback_bars=4) >>> latest_bar = buffer.get_latest_complete_bar("15min") """ def __init__(self, max_size: int = 1440): """ Initialize minute data buffer. Args: max_size: Maximum number of minute data points to keep (default: 1440 = 24 hours) """ if max_size <= 0: raise ValueError("max_size must be positive") self.max_size = max_size self._buffer = deque(maxlen=max_size) self._last_timestamp = None logger.debug(f"Initialized MinuteDataBuffer with max_size={max_size}") def add(self, timestamp: pd.Timestamp, ohlcv_data: Dict[str, float]) -> None: """ Add new minute data point to the buffer. Args: timestamp: Timestamp of the data point ohlcv_data: OHLCV data dictionary (open, high, low, close, volume) Raises: ValueError: If data is invalid or timestamp is out of order """ if not isinstance(timestamp, pd.Timestamp): try: timestamp = pd.Timestamp(timestamp) except Exception as e: raise ValueError(f"Invalid timestamp: {e}") # Validate OHLCV data required_fields = ['open', 'high', 'low', 'close', 'volume'] for field in required_fields: if field not in ohlcv_data: raise ValueError(f"Missing required field: {field}") # Accept both Python numeric types and numpy numeric types if not isinstance(ohlcv_data[field], (int, float, np.number)): raise ValueError(f"Field {field} must be numeric, got {type(ohlcv_data[field])}") # Convert numpy types to Python types to ensure compatibility if isinstance(ohlcv_data[field], np.number): ohlcv_data[field] = float(ohlcv_data[field]) # Check timestamp ordering (allow equal timestamps for updates) if self._last_timestamp is not None and timestamp < self._last_timestamp: logger.warning(f"Out-of-order timestamp: {timestamp} < {self._last_timestamp}") # Create data point data_point = ohlcv_data.copy() data_point['timestamp'] = timestamp # Add to buffer self._buffer.append(data_point) self._last_timestamp = timestamp logger.debug(f"Added data point at {timestamp}, buffer size: {len(self._buffer)}") def get_data(self, lookback_minutes: Optional[int] = None) -> List[Dict[str, Union[float, pd.Timestamp]]]: """ Get data from buffer. Args: lookback_minutes: Number of minutes to look back (None for all data) Returns: List of minute data dictionaries """ if not self._buffer: return [] if lookback_minutes is None: return list(self._buffer) if lookback_minutes <= 0: raise ValueError("lookback_minutes must be positive") # Get data from the last N minutes if len(self._buffer) <= lookback_minutes: return list(self._buffer) return list(self._buffer)[-lookback_minutes:] def aggregate_to_timeframe( self, timeframe: str, lookback_bars: Optional[int] = None, timestamp_mode: str = "end" ) -> List[Dict[str, Union[float, pd.Timestamp]]]: """ Aggregate buffer data to specified timeframe. Args: timeframe: Target timeframe ("5min", "15min", "1h", etc.) lookback_bars: Number of bars to return (None for all available) timestamp_mode: "end" (default) for bar end timestamps, "start" for bar start Returns: List of aggregated OHLCV bars """ if not self._buffer: return [] # Get all buffer data minute_data = list(self._buffer) # Aggregate to timeframe aggregated_bars = aggregate_minute_data_to_timeframe(minute_data, timeframe, timestamp_mode) # Apply lookback limit if lookback_bars is not None and lookback_bars > 0: aggregated_bars = aggregated_bars[-lookback_bars:] return aggregated_bars def get_latest_complete_bar( self, timeframe: str, timestamp_mode: str = "end" ) -> Optional[Dict[str, Union[float, pd.Timestamp]]]: """ Get the latest complete bar for the specified timeframe. Args: timeframe: Target timeframe ("5min", "15min", "1h", etc.) timestamp_mode: "end" (default) for bar end timestamps, "start" for bar start Returns: Latest complete bar dictionary, or None if no complete bars available """ if not self._buffer: return None minute_data = list(self._buffer) return get_latest_complete_bar(minute_data, timeframe, timestamp_mode) def size(self) -> int: """Get current buffer size.""" return len(self._buffer) def is_full(self) -> bool: """Check if buffer is at maximum capacity.""" return len(self._buffer) >= self.max_size def clear(self) -> None: """Clear all data from buffer.""" self._buffer.clear() self._last_timestamp = None logger.debug("Buffer cleared") def get_time_range(self) -> Optional[tuple]: """ Get the time range of data in the buffer. Returns: Tuple of (start_time, end_time) or None if buffer is empty """ if not self._buffer: return None timestamps = [data['timestamp'] for data in self._buffer] return (min(timestamps), max(timestamps)) def __len__(self) -> int: """Get buffer size.""" return len(self._buffer) def __repr__(self) -> str: """String representation of buffer.""" time_range = self.get_time_range() if time_range: start, end = time_range return f"MinuteDataBuffer(size={len(self._buffer)}, range={start} to {end})" else: return f"MinuteDataBuffer(size=0, empty)"