""" Base classes for the incremental strategy system. This module contains the fundamental building blocks for all incremental trading strategies: - IncStrategySignal: Represents trading signals with confidence and metadata - IncStrategyBase: Abstract base class that all incremental strategies must inherit from """ import pandas as pd from abc import ABC, abstractmethod from typing import Dict, Optional, List, Union, Any from collections import deque import logging # Import the original signal class for compatibility from ..strategies.base import StrategySignal # Create alias for consistency IncStrategySignal = StrategySignal class IncStrategyBase(ABC): """ Abstract base class for all incremental trading strategies. This class defines the interface that all incremental strategies must implement: - get_minimum_buffer_size(): Specify minimum data requirements - calculate_on_data(): Process new data points incrementally - supports_incremental_calculation(): Whether strategy supports incremental mode - get_entry_signal(): Generate entry signals - get_exit_signal(): Generate exit signals The incremental approach allows strategies to: - Process new data points without full recalculation - Maintain bounded memory usage regardless of data history length - Provide real-time performance with minimal latency - Support both initialization and incremental modes Attributes: name (str): Strategy name weight (float): Strategy weight for combination params (Dict): Strategy parameters calculation_mode (str): Current mode ('initialization' or 'incremental') is_warmed_up (bool): Whether strategy has sufficient data for reliable signals timeframe_buffers (Dict): Rolling buffers for different timeframes indicator_states (Dict): Internal indicator calculation states Example: class MyIncStrategy(IncStrategyBase): def get_minimum_buffer_size(self): return {"15min": 50, "1min": 750} def calculate_on_data(self, new_data_point, timestamp): # Process new data incrementally self._update_indicators(new_data_point) def get_entry_signal(self): # Generate signal based on current state if self._should_enter(): return IncStrategySignal("ENTRY", confidence=0.8) return IncStrategySignal("HOLD", confidence=0.0) """ def __init__(self, name: str, weight: float = 1.0, params: Optional[Dict] = None): """ Initialize the incremental strategy base. Args: name: Strategy name/identifier weight: Strategy weight for combination (default: 1.0) params: Strategy-specific parameters """ self.name = name self.weight = weight self.params = params or {} # Calculation state self._calculation_mode = "initialization" self._is_warmed_up = False self._data_points_received = 0 # Timeframe management self._timeframe_buffers = {} self._timeframe_last_update = {} self._buffer_size_multiplier = self.params.get("buffer_size_multiplier", 2.0) # Indicator states (strategy-specific) self._indicator_states = {} # Signal generation state self._last_signals = {} self._signal_history = deque(maxlen=100) # Error handling self._max_acceptable_gap = pd.Timedelta(self.params.get("max_acceptable_gap", "5min")) self._state_validation_enabled = self.params.get("enable_state_validation", True) # Performance monitoring self._performance_metrics = { 'update_times': deque(maxlen=1000), 'signal_generation_times': deque(maxlen=1000), 'state_validation_failures': 0, 'data_gaps_handled': 0 } # Compatibility with original strategy interface self.initialized = False self.timeframes_data = {} @property def calculation_mode(self) -> str: """Current calculation mode: 'initialization' or 'incremental'""" return self._calculation_mode @property def is_warmed_up(self) -> bool: """Whether strategy has sufficient data for reliable signals""" return self._is_warmed_up @abstractmethod def get_minimum_buffer_size(self) -> Dict[str, int]: """ Return minimum data points needed for each timeframe. This method must be implemented by each strategy to specify how much historical data is required for reliable calculations. Returns: Dict[str, int]: {timeframe: min_points} mapping Example: return {"15min": 50, "1min": 750} # 50 15min candles = 750 1min candles """ pass @abstractmethod def calculate_on_data(self, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None: """ Process a single new data point incrementally. This method is called for each new data point and should update the strategy's internal state incrementally. Args: new_data_point: OHLCV data point {open, high, low, close, volume} timestamp: Timestamp of the data point """ pass @abstractmethod def supports_incremental_calculation(self) -> bool: """ Whether strategy supports incremental calculation. Returns: bool: True if incremental mode supported, False for fallback to batch mode """ pass @abstractmethod def get_entry_signal(self) -> IncStrategySignal: """ Generate entry signal based on current strategy state. This method should use the current internal state to determine whether an entry signal should be generated. Returns: IncStrategySignal: Entry signal with confidence level """ pass @abstractmethod def get_exit_signal(self) -> IncStrategySignal: """ Generate exit signal based on current strategy state. This method should use the current internal state to determine whether an exit signal should be generated. Returns: IncStrategySignal: Exit signal with confidence level """ pass def get_confidence(self) -> float: """ Get strategy confidence for the current market state. Default implementation returns 1.0. Strategies can override this to provide dynamic confidence based on market conditions. Returns: float: Confidence level (0.0 to 1.0) """ return 1.0 def reset_calculation_state(self) -> None: """Reset internal calculation state for reinitialization.""" self._calculation_mode = "initialization" self._is_warmed_up = False self._data_points_received = 0 self._timeframe_buffers.clear() self._timeframe_last_update.clear() self._indicator_states.clear() self._last_signals.clear() self._signal_history.clear() # Reset performance metrics for key in self._performance_metrics: if isinstance(self._performance_metrics[key], deque): self._performance_metrics[key].clear() else: self._performance_metrics[key] = 0 def get_current_state_summary(self) -> Dict[str, Any]: """Get summary of current calculation state for debugging.""" return { 'strategy_name': self.name, 'calculation_mode': self._calculation_mode, 'is_warmed_up': self._is_warmed_up, 'data_points_received': self._data_points_received, 'timeframes': list(self._timeframe_buffers.keys()), 'buffer_sizes': {tf: len(buf) for tf, buf in self._timeframe_buffers.items()}, 'indicator_states': {name: state.get_state_summary() if hasattr(state, 'get_state_summary') else str(state) for name, state in self._indicator_states.items()}, 'last_signals': self._last_signals, 'performance_metrics': { 'avg_update_time': sum(self._performance_metrics['update_times']) / len(self._performance_metrics['update_times']) if self._performance_metrics['update_times'] else 0, 'avg_signal_time': sum(self._performance_metrics['signal_generation_times']) / len(self._performance_metrics['signal_generation_times']) if self._performance_metrics['signal_generation_times'] else 0, 'validation_failures': self._performance_metrics['state_validation_failures'], 'data_gaps_handled': self._performance_metrics['data_gaps_handled'] } } def _update_timeframe_buffers(self, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None: """Update all timeframe buffers with new data point.""" # Get minimum buffer sizes min_buffer_sizes = self.get_minimum_buffer_size() for timeframe in min_buffer_sizes.keys(): # Calculate actual buffer size with multiplier min_size = min_buffer_sizes[timeframe] actual_buffer_size = int(min_size * self._buffer_size_multiplier) # Initialize buffer if needed if timeframe not in self._timeframe_buffers: self._timeframe_buffers[timeframe] = deque(maxlen=actual_buffer_size) self._timeframe_last_update[timeframe] = None # Check if this timeframe should be updated if self._should_update_timeframe(timeframe, timestamp): # For 1min timeframe, add data directly if timeframe == "1min": data_point = new_data_point.copy() data_point['timestamp'] = timestamp self._timeframe_buffers[timeframe].append(data_point) self._timeframe_last_update[timeframe] = timestamp else: # For other timeframes, we need to aggregate from 1min data self._aggregate_to_timeframe(timeframe, new_data_point, timestamp) def _should_update_timeframe(self, timeframe: str, timestamp: pd.Timestamp) -> bool: """Check if timeframe should be updated based on timestamp.""" if timeframe == "1min": return True # Always update 1min last_update = self._timeframe_last_update.get(timeframe) if last_update is None: return True # First update # Calculate timeframe interval if timeframe.endswith("min"): minutes = int(timeframe[:-3]) interval = pd.Timedelta(minutes=minutes) elif timeframe.endswith("h"): hours = int(timeframe[:-1]) interval = pd.Timedelta(hours=hours) else: return True # Unknown timeframe, update anyway # Check if enough time has passed return timestamp >= last_update + interval def _aggregate_to_timeframe(self, timeframe: str, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None: """Aggregate 1min data to specified timeframe.""" # This is a simplified aggregation - in practice, you might want more sophisticated logic buffer = self._timeframe_buffers[timeframe] # If buffer is empty or we're starting a new period, add new candle if not buffer or self._should_update_timeframe(timeframe, timestamp): aggregated_point = new_data_point.copy() aggregated_point['timestamp'] = timestamp buffer.append(aggregated_point) self._timeframe_last_update[timeframe] = timestamp else: # Update the last candle in the buffer last_candle = buffer[-1] last_candle['high'] = max(last_candle['high'], new_data_point['high']) last_candle['low'] = min(last_candle['low'], new_data_point['low']) last_candle['close'] = new_data_point['close'] last_candle['volume'] += new_data_point['volume'] def _get_timeframe_buffer(self, timeframe: str) -> pd.DataFrame: """Get current buffer for specific timeframe as DataFrame.""" if timeframe not in self._timeframe_buffers: return pd.DataFrame() buffer_data = list(self._timeframe_buffers[timeframe]) if not buffer_data: return pd.DataFrame() df = pd.DataFrame(buffer_data) if 'timestamp' in df.columns: df = df.set_index('timestamp') return df def _validate_calculation_state(self) -> bool: """Validate internal calculation state consistency.""" if not self._state_validation_enabled: return True try: # Check that all required buffers exist min_buffer_sizes = self.get_minimum_buffer_size() for timeframe in min_buffer_sizes.keys(): if timeframe not in self._timeframe_buffers: logging.warning(f"Missing buffer for timeframe {timeframe}") return False # Check that indicator states are valid for name, state in self._indicator_states.items(): if hasattr(state, 'is_initialized') and not state.is_initialized: logging.warning(f"Indicator {name} not initialized") return False return True except Exception as e: logging.error(f"State validation failed: {e}") self._performance_metrics['state_validation_failures'] += 1 return False def _recover_from_state_corruption(self) -> None: """Recover from corrupted calculation state.""" logging.warning(f"Recovering from state corruption in strategy {self.name}") # Reset to initialization mode self._calculation_mode = "initialization" self._is_warmed_up = False # Try to recalculate from available buffer data try: self._reinitialize_from_buffers() except Exception as e: logging.error(f"Failed to recover from buffers: {e}") # Complete reset as last resort self.reset_calculation_state() def _reinitialize_from_buffers(self) -> None: """Reinitialize indicators from available buffer data.""" # This method should be overridden by specific strategies # to implement their own recovery logic pass def handle_data_gap(self, gap_duration: pd.Timedelta) -> None: """Handle gaps in data stream.""" self._performance_metrics['data_gaps_handled'] += 1 if gap_duration > self._max_acceptable_gap: logging.warning(f"Data gap {gap_duration} exceeds maximum acceptable gap {self._max_acceptable_gap}") self._trigger_reinitialization() else: logging.info(f"Handling acceptable data gap: {gap_duration}") # For small gaps, continue with current state def _trigger_reinitialization(self) -> None: """Trigger strategy reinitialization due to data gap or corruption.""" logging.info(f"Triggering reinitialization for strategy {self.name}") self.reset_calculation_state() # Compatibility methods for original strategy interface def get_timeframes(self) -> List[str]: """Get required timeframes (compatibility method).""" return list(self.get_minimum_buffer_size().keys()) def initialize(self, backtester) -> None: """Initialize strategy (compatibility method).""" # This method provides compatibility with the original strategy interface # The actual initialization happens through the incremental interface self.initialized = True logging.info(f"Incremental strategy {self.name} initialized in compatibility mode") def __repr__(self) -> str: """String representation of the strategy.""" return (f"{self.__class__.__name__}(name={self.name}, " f"weight={self.weight}, mode={self._calculation_mode}, " f"warmed_up={self._is_warmed_up}, " f"data_points={self._data_points_received})")