import pandas as pd import numpy as np class BollingerBands: """ Calculates Bollinger Bands for given financial data. """ def __init__(self, config): """ Initializes the BollingerBands calculator. Args: period (int): The period for the moving average and standard deviation. std_dev_multiplier (float): The number of standard deviations for the upper and lower bands. bb_width (float): The width of the Bollinger Bands. """ if config['bb_period'] <= 0: raise ValueError("Period must be a positive integer.") if config['trending']['bb_std_dev_multiplier'] <= 0 or config['sideways']['bb_std_dev_multiplier'] <= 0: raise ValueError("Standard deviation multiplier must be positive.") if config['bb_width'] <= 0: raise ValueError("BB width must be positive.") self.config = config def calculate(self, data_df: pd.DataFrame, price_column: str = 'close', squeeze = False) -> pd.DataFrame: """ Calculates Bollinger Bands and adds them to the DataFrame. Args: data_df (pd.DataFrame): DataFrame with price data. Must include the price_column. price_column (str): The name of the column containing the price data (e.g., 'close'). Returns: pd.DataFrame: The original DataFrame with added columns: 'SMA' (Simple Moving Average), 'UpperBand', 'LowerBand'. """ # Work on a copy to avoid modifying the original DataFrame passed to the function data_df = data_df.copy() if price_column not in data_df.columns: raise ValueError(f"Price column '{price_column}' not found in DataFrame.") if not squeeze: period = self.config['bb_period'] bb_width_threshold = self.config['bb_width'] trending_std_multiplier = self.config['trending']['bb_std_dev_multiplier'] sideways_std_multiplier = self.config['sideways']['bb_std_dev_multiplier'] # Calculate SMA data_df['SMA'] = data_df[price_column].rolling(window=period).mean() # Calculate Standard Deviation std_dev = data_df[price_column].rolling(window=period).std() # Calculate reference Upper and Lower Bands for BBWidth calculation (e.g., using 2.0 std dev) # This ensures BBWidth is calculated based on a consistent band definition before applying adaptive multipliers. ref_upper_band = data_df['SMA'] + (2.0 * std_dev) ref_lower_band = data_df['SMA'] - (2.0 * std_dev) # Calculate the width of the Bollinger Bands # Avoid division by zero or NaN if SMA is zero or NaN by replacing with np.nan data_df['BBWidth'] = np.where(data_df['SMA'] != 0, (ref_upper_band - ref_lower_band) / data_df['SMA'], np.nan) # Calculate the market regime (1 = sideways, 0 = trending) # Handle NaN in BBWidth: if BBWidth is NaN, MarketRegime should also be NaN or a default (e.g. trending) data_df['MarketRegime'] = np.where(data_df['BBWidth'].isna(), np.nan, (data_df['BBWidth'] < bb_width_threshold).astype(float)) # Use float for NaN compatibility # Determine the std dev multiplier for each row based on its market regime conditions = [ data_df['MarketRegime'] == 1, # Sideways market data_df['MarketRegime'] == 0 # Trending market ] choices = [ sideways_std_multiplier, trending_std_multiplier ] # Default multiplier if MarketRegime is NaN (e.g., use trending or a neutral default like 2.0) # For now, let's use trending_std_multiplier as default if MarketRegime is NaN. # This can be adjusted based on desired behavior for periods where regime is undetermined. row_specific_std_multiplier = np.select(conditions, choices, default=trending_std_multiplier) # Calculate final Upper and Lower Bands using the row-specific multiplier data_df['UpperBand'] = data_df['SMA'] + (row_specific_std_multiplier * std_dev) data_df['LowerBand'] = data_df['SMA'] - (row_specific_std_multiplier * std_dev) else: # squeeze is True price_series = data_df[price_column] # Use the static method for the squeeze case with fixed parameters upper_band, sma, lower_band = self.calculate_custom_bands( price_series, window=14, num_std=1.5, min_periods=14 # Match typical squeeze behavior where bands appear after full period ) data_df['SMA'] = sma data_df['UpperBand'] = upper_band data_df['LowerBand'] = lower_band # BBWidth and MarketRegime are not typically calculated/used in a simple squeeze context by this method # If needed, they could be added, but the current structure implies they are part of the non-squeeze path. data_df['BBWidth'] = np.nan data_df['MarketRegime'] = np.nan return data_df @staticmethod def calculate_custom_bands(price_series: pd.Series, window: int = 20, num_std: float = 2.0, min_periods: int = None) -> tuple[pd.Series, pd.Series, pd.Series]: """ Calculates Bollinger Bands with specified window and standard deviation multiplier. Args: price_series (pd.Series): Series of prices. window (int): The period for the moving average and standard deviation. num_std (float): The number of standard deviations for the upper and lower bands. min_periods (int, optional): Minimum number of observations in window required to have a value. Defaults to `window` if None. Returns: tuple[pd.Series, pd.Series, pd.Series]: Upper band, SMA, Lower band. """ if not isinstance(price_series, pd.Series): raise TypeError("price_series must be a pandas Series.") if not isinstance(window, int) or window <= 0: raise ValueError("window must be a positive integer.") if not isinstance(num_std, (int, float)) or num_std <= 0: raise ValueError("num_std must be a positive number.") if min_periods is not None and (not isinstance(min_periods, int) or min_periods <= 0): raise ValueError("min_periods must be a positive integer if provided.") actual_min_periods = window if min_periods is None else min_periods sma = price_series.rolling(window=window, min_periods=actual_min_periods).mean() std = price_series.rolling(window=window, min_periods=actual_min_periods).std() # Replace NaN std with 0 to avoid issues if sma is present but std is not (e.g. constant price in window) std = std.fillna(0) upper_band = sma + (std * num_std) lower_band = sma - (std * num_std) return upper_band, sma, lower_band