import pandas as pd import numpy as np class RSI: """ A class to calculate the Relative Strength Index (RSI). """ def __init__(self, config): """ Initializes the RSI calculator. Args: period (int): The period for RSI calculation. Default is 14. Must be a positive integer. """ if not isinstance(config['rsi_period'], int) or config['rsi_period'] <= 0: raise ValueError("Period must be a positive integer.") self.period = config['rsi_period'] def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame: """ Calculates the RSI and adds it as a column to the input DataFrame. Args: data_df (pd.DataFrame): DataFrame with historical price data. Must contain the 'price_column'. price_column (str): The name of the column containing price data. Default is 'close'. Returns: pd.DataFrame: The input DataFrame with an added 'RSI' column. Returns the original DataFrame with no 'RSI' column if the period is larger than the number of data points. """ if price_column not in data_df.columns: raise ValueError(f"Price column '{price_column}' not found in DataFrame.") if len(data_df) < self.period: print(f"Warning: Data length ({len(data_df)}) is less than RSI period ({self.period}). RSI will not be calculated.") return data_df.copy() df = data_df.copy() delta = df[price_column].diff(1) gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) # Ensure loss is positive # Calculate initial average gain and loss (SMA) avg_gain = gain.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period] avg_loss = loss.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period] # Calculate subsequent average gains and losses (EMA-like) # Pre-allocate lists for gains and losses to avoid repeated appending to Series gains = [0.0] * len(df) losses = [0.0] * len(df) if not avg_gain.empty: gains[self.period -1] = avg_gain.iloc[0] if not avg_loss.empty: losses[self.period -1] = avg_loss.iloc[0] for i in range(self.period, len(df)): gains[i] = ((gains[i-1] * (self.period - 1)) + gain.iloc[i]) / self.period losses[i] = ((losses[i-1] * (self.period - 1)) + loss.iloc[i]) / self.period df['avg_gain'] = pd.Series(gains, index=df.index) df['avg_loss'] = pd.Series(losses, index=df.index) # Calculate RS # Handle division by zero: if avg_loss is 0, RS is undefined or infinite. # If avg_loss is 0 and avg_gain is also 0, RSI is conventionally 50. # If avg_loss is 0 and avg_gain > 0, RSI is conventionally 100. rs = df['avg_gain'] / df['avg_loss'] # Calculate RSI # RSI = 100 - (100 / (1 + RS)) # If avg_loss is 0: # If avg_gain > 0, RS -> inf, RSI -> 100 # If avg_gain == 0, RS -> NaN (0/0), RSI -> 50 (conventionally, or could be 0 or 100 depending on interpretation) # We will use a common convention where RSI is 100 if avg_loss is 0 and avg_gain > 0, # and RSI is 0 if avg_loss is 0 and avg_gain is 0 (or 50, let's use 0 to indicate no strength if both are 0). # However, to avoid NaN from 0/0, it's better to calculate RSI directly with conditions. rsi_values = [] for i in range(len(df)): avg_g = df['avg_gain'].iloc[i] avg_l = df['avg_loss'].iloc[i] if i < self.period -1 : # Not enough data for initial SMA rsi_values.append(np.nan) continue if avg_l == 0: if avg_g == 0: rsi_values.append(50) # Or 0, or np.nan depending on how you want to treat this. 50 implies neutrality. else: rsi_values.append(100) # Max strength else: rs_val = avg_g / avg_l rsi_values.append(100 - (100 / (1 + rs_val))) df['RSI'] = pd.Series(rsi_values, index=df.index) # Remove intermediate columns if desired, or keep them for debugging # df.drop(columns=['avg_gain', 'avg_loss'], inplace=True) return df