""" Incremental BBRS Strategy (Bollinger Bands + RSI Strategy) This module implements an incremental version of the Bollinger Bands + RSI Strategy (BBRS) for real-time data processing. It maintains constant memory usage and provides identical results to the batch implementation after the warm-up period. Key Features: - Accepts minute-level data input for real-time compatibility - Internal timeframe aggregation (1min, 5min, 15min, 1h, etc.) - Incremental Bollinger Bands calculation - Incremental RSI calculation with Wilder's smoothing - Market regime detection (trending vs sideways) - Real-time signal generation - Constant memory usage """ import pandas as pd import numpy as np from typing import Dict, Optional, List, Any, Tuple, Union import logging from collections import deque from .base import IncStrategyBase, IncStrategySignal from .indicators.bollinger_bands import BollingerBandsState from .indicators.rsi import RSIState logger = logging.getLogger(__name__) class BBRSStrategy(IncStrategyBase): """ Incremental BBRS (Bollinger Bands + RSI) strategy implementation. This strategy combines Bollinger Bands and RSI indicators to detect market conditions and generate trading signals. It adapts its behavior based on market regime detection (trending vs sideways markets). The strategy uses different Bollinger Band multipliers and RSI thresholds for different market regimes: - Trending markets: Breakout strategy with higher BB multiplier - Sideways markets: Mean reversion strategy with lower BB multiplier Parameters: timeframe (str): Primary timeframe for analysis (default: "1h") bb_period (int): Bollinger Bands period (default: 20) rsi_period (int): RSI period (default: 14) bb_width_threshold (float): BB width threshold for regime detection (default: 0.05) trending_bb_multiplier (float): BB multiplier for trending markets (default: 2.5) sideways_bb_multiplier (float): BB multiplier for sideways markets (default: 1.8) trending_rsi_thresholds (list): RSI thresholds for trending markets (default: [30, 70]) sideways_rsi_thresholds (list): RSI thresholds for sideways markets (default: [40, 60]) squeeze_strategy (bool): Enable squeeze strategy (default: True) enable_logging (bool): Enable detailed logging (default: False) Example: strategy = BBRSStrategy("bbrs", weight=1.0, params={ "timeframe": "1h", "bb_period": 20, "rsi_period": 14, "bb_width_threshold": 0.05, "trending_bb_multiplier": 2.5, "sideways_bb_multiplier": 1.8, "trending_rsi_thresholds": [30, 70], "sideways_rsi_thresholds": [40, 60], "squeeze_strategy": True }) """ def __init__(self, name: str = "bbrs", weight: float = 1.0, params: Optional[Dict] = None): """Initialize the incremental BBRS strategy.""" super().__init__(name, weight, params) # Strategy configuration self.primary_timeframe = self.params.get("timeframe", "1h") self.bb_period = self.params.get("bb_period", 20) self.rsi_period = self.params.get("rsi_period", 14) self.bb_width_threshold = self.params.get("bb_width_threshold", 0.05) # Market regime specific parameters self.trending_bb_multiplier = self.params.get("trending_bb_multiplier", 2.5) self.sideways_bb_multiplier = self.params.get("sideways_bb_multiplier", 1.8) self.trending_rsi_thresholds = tuple(self.params.get("trending_rsi_thresholds", [30, 70])) self.sideways_rsi_thresholds = tuple(self.params.get("sideways_rsi_thresholds", [40, 60])) self.squeeze_strategy = self.params.get("squeeze_strategy", True) self.enable_logging = self.params.get("enable_logging", False) # Configure logging level if self.enable_logging: logger.setLevel(logging.DEBUG) # Initialize indicators with different multipliers for regime detection self.bb_trending = BollingerBandsState(self.bb_period, self.trending_bb_multiplier) self.bb_sideways = BollingerBandsState(self.bb_period, self.sideways_bb_multiplier) self.bb_reference = BollingerBandsState(self.bb_period, 2.0) # For regime detection self.rsi = RSIState(self.rsi_period) # Volume tracking for volume analysis self.volume_history = deque(maxlen=20) # 20-period volume MA self.volume_sum = 0.0 self.volume_ma = None # Strategy state self.current_price = None self.current_volume = None self.current_market_regime = "trending" # Default to trending self.last_bb_result = None self.last_rsi_value = None # Signal generation state self._last_entry_signal = None self._last_exit_signal = None self._signal_count = {"entry": 0, "exit": 0} # Performance tracking self._update_count = 0 self._last_update_time = None logger.info(f"BBRSStrategy initialized: timeframe={self.primary_timeframe}, " f"bb_period={self.bb_period}, rsi_period={self.rsi_period}, " f"aggregation_enabled={self._timeframe_aggregator is not None}") if self.enable_logging: logger.info(f"Using new timeframe utilities with mathematically correct aggregation") logger.info(f"Volume aggregation now uses proper sum() for accurate volume spike detection") if self._timeframe_aggregator: stats = self.get_timeframe_aggregator_stats() logger.debug(f"Timeframe aggregator stats: {stats}") def get_minimum_buffer_size(self) -> Dict[str, int]: """ Return minimum data points needed for reliable BBRS calculations. Returns: Dict[str, int]: {timeframe: min_points} mapping """ # Need enough data for BB, RSI, and volume MA min_buffer_size = max(self.bb_period, self.rsi_period, 20) * 2 + 10 return {self.primary_timeframe: min_buffer_size} def calculate_on_data(self, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None: """ Process a single new data point incrementally. Args: new_data_point: OHLCV data point {open, high, low, close, volume} timestamp: Timestamp of the data point """ try: self._update_count += 1 self._last_update_time = timestamp if self.enable_logging: logger.debug(f"Processing data point {self._update_count} at {timestamp}") close_price = float(new_data_point['close']) volume = float(new_data_point['volume']) # Update indicators bb_trending_result = self.bb_trending.update(close_price) bb_sideways_result = self.bb_sideways.update(close_price) bb_reference_result = self.bb_reference.update(close_price) rsi_value = self.rsi.update(close_price) # Update volume tracking self._update_volume_tracking(volume) # Determine market regime self.current_market_regime = self._determine_market_regime(bb_reference_result) # Select appropriate BB values based on regime if self.current_market_regime == "sideways": self.last_bb_result = bb_sideways_result else: # trending self.last_bb_result = bb_trending_result # Store current state self.current_price = close_price self.current_volume = volume self.last_rsi_value = rsi_value self._data_points_received += 1 # Update warm-up status if not self._is_warmed_up and self.is_warmed_up(): self._is_warmed_up = True logger.info(f"BBRSStrategy warmed up after {self._update_count} data points") if self.enable_logging and self._update_count % 10 == 0: logger.debug(f"BBRS state: price=${close_price:.2f}, " f"regime={self.current_market_regime}, " f"rsi={rsi_value:.1f}, " f"bb_width={bb_reference_result.get('bandwidth', 0):.4f}") except Exception as e: logger.error(f"Error in calculate_on_data: {e}") raise def supports_incremental_calculation(self) -> bool: """ Whether strategy supports incremental calculation. Returns: bool: True (this strategy is fully incremental) """ return True def get_entry_signal(self) -> IncStrategySignal: """ Generate entry signal based on BBRS strategy logic. Returns: IncStrategySignal: Entry signal if conditions are met, hold signal otherwise """ if not self.is_warmed_up(): return IncStrategySignal.HOLD() # Check for entry condition if self._check_entry_condition(): self._signal_count["entry"] += 1 self._last_entry_signal = { 'timestamp': self._last_update_time, 'price': self.current_price, 'market_regime': self.current_market_regime, 'rsi': self.last_rsi_value, 'update_count': self._update_count } if self.enable_logging: logger.info(f"ENTRY SIGNAL generated at {self._last_update_time} " f"(signal #{self._signal_count['entry']})") return IncStrategySignal.BUY(confidence=1.0, metadata={ "market_regime": self.current_market_regime, "rsi": self.last_rsi_value, "bb_position": self._get_bb_position(), "signal_count": self._signal_count["entry"] }) return IncStrategySignal.HOLD() def get_exit_signal(self) -> IncStrategySignal: """ Generate exit signal based on BBRS strategy logic. Returns: IncStrategySignal: Exit signal if conditions are met, hold signal otherwise """ if not self.is_warmed_up(): return IncStrategySignal.HOLD() # Check for exit condition if self._check_exit_condition(): self._signal_count["exit"] += 1 self._last_exit_signal = { 'timestamp': self._last_update_time, 'price': self.current_price, 'market_regime': self.current_market_regime, 'rsi': self.last_rsi_value, 'update_count': self._update_count } if self.enable_logging: logger.info(f"EXIT SIGNAL generated at {self._last_update_time} " f"(signal #{self._signal_count['exit']})") return IncStrategySignal.SELL(confidence=1.0, metadata={ "market_regime": self.current_market_regime, "rsi": self.last_rsi_value, "bb_position": self._get_bb_position(), "signal_count": self._signal_count["exit"] }) return IncStrategySignal.HOLD() def get_confidence(self) -> float: """ Get strategy confidence based on signal strength. Returns: float: Confidence level (0.0 to 1.0) """ if not self.is_warmed_up(): return 0.0 # Higher confidence when signals are clear if self._check_entry_condition() or self._check_exit_condition(): return 1.0 # Medium confidence during normal operation return 0.5 def _update_volume_tracking(self, volume: float) -> None: """Update volume moving average tracking.""" # Update rolling sum if len(self.volume_history) == 20: # maxlen reached self.volume_sum -= self.volume_history[0] self.volume_history.append(volume) self.volume_sum += volume # Calculate moving average if len(self.volume_history) > 0: self.volume_ma = self.volume_sum / len(self.volume_history) else: self.volume_ma = volume def _determine_market_regime(self, bb_reference: Dict[str, float]) -> str: """ Determine market regime based on Bollinger Band width. Args: bb_reference: Reference BB result for regime detection Returns: "sideways" or "trending" """ if not self.bb_reference.is_warmed_up(): return "trending" # Default to trending during warm-up bb_width = bb_reference['bandwidth'] if bb_width < self.bb_width_threshold: return "sideways" else: return "trending" def _check_volume_spike(self) -> bool: """Check if current volume represents a spike (≥1.5× average).""" if self.volume_ma is None or self.volume_ma == 0 or self.current_volume is None: return False return self.current_volume >= 1.5 * self.volume_ma def _get_bb_position(self) -> str: """Get current price position relative to Bollinger Bands.""" if not self.last_bb_result or self.current_price is None: return 'unknown' upper_band = self.last_bb_result['upper_band'] lower_band = self.last_bb_result['lower_band'] if self.current_price > upper_band: return 'above_upper' elif self.current_price < lower_band: return 'below_lower' else: return 'between_bands' def _check_entry_condition(self) -> bool: """ Check if entry condition is met based on market regime. Returns: bool: True if entry condition is met """ if not self.is_warmed_up() or self.last_bb_result is None: return False if np.isnan(self.last_rsi_value): return False upper_band = self.last_bb_result['upper_band'] lower_band = self.last_bb_result['lower_band'] if self.current_market_regime == "sideways": # Sideways market (Mean Reversion) rsi_low, rsi_high = self.sideways_rsi_thresholds buy_condition = (self.current_price <= lower_band) and (self.last_rsi_value <= rsi_low) if self.squeeze_strategy: # Add volume contraction filter for sideways markets volume_contraction = self.current_volume < 0.7 * (self.volume_ma or self.current_volume) buy_condition = buy_condition and volume_contraction return buy_condition else: # trending # Trending market (Breakout Mode) volume_spike = self._check_volume_spike() buy_condition = (self.current_price < lower_band) and (self.last_rsi_value < 50) and volume_spike return buy_condition def _check_exit_condition(self) -> bool: """ Check if exit condition is met based on market regime. Returns: bool: True if exit condition is met """ if not self.is_warmed_up() or self.last_bb_result is None: return False if np.isnan(self.last_rsi_value): return False upper_band = self.last_bb_result['upper_band'] lower_band = self.last_bb_result['lower_band'] if self.current_market_regime == "sideways": # Sideways market (Mean Reversion) rsi_low, rsi_high = self.sideways_rsi_thresholds sell_condition = (self.current_price >= upper_band) and (self.last_rsi_value >= rsi_high) if self.squeeze_strategy: # Add volume contraction filter for sideways markets volume_contraction = self.current_volume < 0.7 * (self.volume_ma or self.current_volume) sell_condition = sell_condition and volume_contraction return sell_condition else: # trending # Trending market (Breakout Mode) volume_spike = self._check_volume_spike() sell_condition = (self.current_price > upper_band) and (self.last_rsi_value > 50) and volume_spike return sell_condition def is_warmed_up(self) -> bool: """ Check if strategy is warmed up and ready for reliable signals. Returns: True if all indicators are warmed up """ return (self.bb_trending.is_warmed_up() and self.bb_sideways.is_warmed_up() and self.bb_reference.is_warmed_up() and self.rsi.is_warmed_up() and len(self.volume_history) >= 20) def reset_calculation_state(self) -> None: """Reset internal calculation state for reinitialization.""" super().reset_calculation_state() # Reset indicators self.bb_trending.reset() self.bb_sideways.reset() self.bb_reference.reset() self.rsi.reset() # Reset volume tracking self.volume_history.clear() self.volume_sum = 0.0 self.volume_ma = None # Reset strategy state self.current_price = None self.current_volume = None self.current_market_regime = "trending" self.last_bb_result = None self.last_rsi_value = None # Reset signal state self._last_entry_signal = None self._last_exit_signal = None self._signal_count = {"entry": 0, "exit": 0} # Reset performance tracking self._update_count = 0 self._last_update_time = None logger.info("BBRSStrategy state reset") def get_current_state_summary(self) -> Dict[str, Any]: """Get detailed state summary for debugging and monitoring.""" base_summary = super().get_current_state_summary() # Add BBRS-specific state base_summary.update({ 'primary_timeframe': self.primary_timeframe, 'current_price': self.current_price, 'current_volume': self.current_volume, 'volume_ma': self.volume_ma, 'current_market_regime': self.current_market_regime, 'last_rsi_value': self.last_rsi_value, 'bb_position': self._get_bb_position(), 'volume_spike': self._check_volume_spike(), 'signal_counts': self._signal_count.copy(), 'update_count': self._update_count, 'last_update_time': str(self._last_update_time) if self._last_update_time else None, 'last_entry_signal': self._last_entry_signal, 'last_exit_signal': self._last_exit_signal, 'indicators_warmed_up': { 'bb_trending': self.bb_trending.is_warmed_up(), 'bb_sideways': self.bb_sideways.is_warmed_up(), 'bb_reference': self.bb_reference.is_warmed_up(), 'rsi': self.rsi.is_warmed_up(), 'volume_tracking': len(self.volume_history) >= 20 }, 'config': { 'bb_period': self.bb_period, 'rsi_period': self.rsi_period, 'bb_width_threshold': self.bb_width_threshold, 'trending_bb_multiplier': self.trending_bb_multiplier, 'sideways_bb_multiplier': self.sideways_bb_multiplier, 'trending_rsi_thresholds': self.trending_rsi_thresholds, 'sideways_rsi_thresholds': self.sideways_rsi_thresholds, 'squeeze_strategy': self.squeeze_strategy } }) return base_summary def __repr__(self) -> str: """String representation of the strategy.""" return (f"BBRSStrategy(timeframe={self.primary_timeframe}, " f"bb_period={self.bb_period}, rsi_period={self.rsi_period}, " f"regime={self.current_market_regime}, " f"warmed_up={self.is_warmed_up()}, " f"updates={self._update_count})") # Compatibility alias for easier imports IncBBRSStrategy = BBRSStrategy