- Introduced a new Strategy class to encapsulate trading strategies, including the Market Regime Strategy that adapts to different market conditions. - Refactored BollingerBands and RSI classes to accept configuration parameters for improved flexibility and maintainability. - Updated test_bbrsi.py to utilize the new strategy implementation and adjusted date ranges for testing. - Enhanced documentation to include details about the new Strategy class and its components.
110 lines
4.5 KiB
Python
110 lines
4.5 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
|
|
class RSI:
|
|
"""
|
|
A class to calculate the Relative Strength Index (RSI).
|
|
"""
|
|
def __init__(self, config):
|
|
"""
|
|
Initializes the RSI calculator.
|
|
|
|
Args:
|
|
period (int): The period for RSI calculation. Default is 14.
|
|
Must be a positive integer.
|
|
"""
|
|
if not isinstance(config['rsi_period'], int) or config['rsi_period'] <= 0:
|
|
raise ValueError("Period must be a positive integer.")
|
|
self.period = config['rsi_period']
|
|
|
|
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
|
|
"""
|
|
Calculates the RSI and adds it as a column to the input DataFrame.
|
|
|
|
Args:
|
|
data_df (pd.DataFrame): DataFrame with historical price data.
|
|
Must contain the 'price_column'.
|
|
price_column (str): The name of the column containing price data.
|
|
Default is 'close'.
|
|
|
|
Returns:
|
|
pd.DataFrame: The input DataFrame with an added 'RSI' column.
|
|
Returns the original DataFrame with no 'RSI' column
|
|
if the period is larger than the number of data points.
|
|
"""
|
|
if price_column not in data_df.columns:
|
|
raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
|
|
|
|
if len(data_df) < self.period:
|
|
print(f"Warning: Data length ({len(data_df)}) is less than RSI period ({self.period}). RSI will not be calculated.")
|
|
return data_df.copy()
|
|
|
|
df = data_df.copy()
|
|
delta = df[price_column].diff(1)
|
|
|
|
gain = delta.where(delta > 0, 0)
|
|
loss = -delta.where(delta < 0, 0) # Ensure loss is positive
|
|
|
|
# Calculate initial average gain and loss (SMA)
|
|
avg_gain = gain.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period]
|
|
avg_loss = loss.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period]
|
|
|
|
|
|
# Calculate subsequent average gains and losses (EMA-like)
|
|
# Pre-allocate lists for gains and losses to avoid repeated appending to Series
|
|
gains = [0.0] * len(df)
|
|
losses = [0.0] * len(df)
|
|
|
|
if not avg_gain.empty:
|
|
gains[self.period -1] = avg_gain.iloc[0]
|
|
if not avg_loss.empty:
|
|
losses[self.period -1] = avg_loss.iloc[0]
|
|
|
|
|
|
for i in range(self.period, len(df)):
|
|
gains[i] = ((gains[i-1] * (self.period - 1)) + gain.iloc[i]) / self.period
|
|
losses[i] = ((losses[i-1] * (self.period - 1)) + loss.iloc[i]) / self.period
|
|
|
|
df['avg_gain'] = pd.Series(gains, index=df.index)
|
|
df['avg_loss'] = pd.Series(losses, index=df.index)
|
|
|
|
# Calculate RS
|
|
# Handle division by zero: if avg_loss is 0, RS is undefined or infinite.
|
|
# If avg_loss is 0 and avg_gain is also 0, RSI is conventionally 50.
|
|
# If avg_loss is 0 and avg_gain > 0, RSI is conventionally 100.
|
|
rs = df['avg_gain'] / df['avg_loss']
|
|
|
|
# Calculate RSI
|
|
# RSI = 100 - (100 / (1 + RS))
|
|
# If avg_loss is 0:
|
|
# If avg_gain > 0, RS -> inf, RSI -> 100
|
|
# If avg_gain == 0, RS -> NaN (0/0), RSI -> 50 (conventionally, or could be 0 or 100 depending on interpretation)
|
|
# We will use a common convention where RSI is 100 if avg_loss is 0 and avg_gain > 0,
|
|
# and RSI is 0 if avg_loss is 0 and avg_gain is 0 (or 50, let's use 0 to indicate no strength if both are 0).
|
|
# However, to avoid NaN from 0/0, it's better to calculate RSI directly with conditions.
|
|
|
|
rsi_values = []
|
|
for i in range(len(df)):
|
|
avg_g = df['avg_gain'].iloc[i]
|
|
avg_l = df['avg_loss'].iloc[i]
|
|
|
|
if i < self.period -1 : # Not enough data for initial SMA
|
|
rsi_values.append(np.nan)
|
|
continue
|
|
|
|
if avg_l == 0:
|
|
if avg_g == 0:
|
|
rsi_values.append(50) # Or 0, or np.nan depending on how you want to treat this. 50 implies neutrality.
|
|
else:
|
|
rsi_values.append(100) # Max strength
|
|
else:
|
|
rs_val = avg_g / avg_l
|
|
rsi_values.append(100 - (100 / (1 + rs_val)))
|
|
|
|
df['RSI'] = pd.Series(rsi_values, index=df.index)
|
|
|
|
# Remove intermediate columns if desired, or keep them for debugging
|
|
# df.drop(columns=['avg_gain', 'avg_loss'], inplace=True)
|
|
|
|
return df
|