Boilinger Band and RSI implementation
This commit is contained in:
0
cycles/Analysis/__init__.py
Normal file
0
cycles/Analysis/__init__.py
Normal file
50
cycles/Analysis/boillinger_band.py
Normal file
50
cycles/Analysis/boillinger_band.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import pandas as pd
|
||||
|
||||
class BollingerBands:
|
||||
"""
|
||||
Calculates Bollinger Bands for given financial data.
|
||||
"""
|
||||
def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0):
|
||||
"""
|
||||
Initializes the BollingerBands calculator.
|
||||
|
||||
Args:
|
||||
period (int): The period for the moving average and standard deviation.
|
||||
std_dev_multiplier (float): The number of standard deviations for the upper and lower bands.
|
||||
"""
|
||||
if period <= 0:
|
||||
raise ValueError("Period must be a positive integer.")
|
||||
if std_dev_multiplier <= 0:
|
||||
raise ValueError("Standard deviation multiplier must be positive.")
|
||||
|
||||
self.period = period
|
||||
self.std_dev_multiplier = std_dev_multiplier
|
||||
|
||||
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
|
||||
"""
|
||||
Calculates Bollinger Bands and adds them to the DataFrame.
|
||||
|
||||
Args:
|
||||
data_df (pd.DataFrame): DataFrame with price data. Must include the price_column.
|
||||
price_column (str): The name of the column containing the price data (e.g., 'close').
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: The original DataFrame with added columns:
|
||||
'SMA' (Simple Moving Average),
|
||||
'UpperBand',
|
||||
'LowerBand'.
|
||||
"""
|
||||
if price_column not in data_df.columns:
|
||||
raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
|
||||
|
||||
# Calculate SMA
|
||||
data_df['SMA'] = data_df[price_column].rolling(window=self.period).mean()
|
||||
|
||||
# Calculate Standard Deviation
|
||||
std_dev = data_df[price_column].rolling(window=self.period).std()
|
||||
|
||||
# Calculate Upper and Lower Bands
|
||||
data_df['UpperBand'] = data_df['SMA'] + (self.std_dev_multiplier * std_dev)
|
||||
data_df['LowerBand'] = data_df['SMA'] - (self.std_dev_multiplier * std_dev)
|
||||
|
||||
return data_df
|
||||
109
cycles/Analysis/rsi.py
Normal file
109
cycles/Analysis/rsi.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
class RSI:
|
||||
"""
|
||||
A class to calculate the Relative Strength Index (RSI).
|
||||
"""
|
||||
def __init__(self, period: int = 14):
|
||||
"""
|
||||
Initializes the RSI calculator.
|
||||
|
||||
Args:
|
||||
period (int): The period for RSI calculation. Default is 14.
|
||||
Must be a positive integer.
|
||||
"""
|
||||
if not isinstance(period, int) or period <= 0:
|
||||
raise ValueError("Period must be a positive integer.")
|
||||
self.period = period
|
||||
|
||||
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
|
||||
"""
|
||||
Calculates the RSI and adds it as a column to the input DataFrame.
|
||||
|
||||
Args:
|
||||
data_df (pd.DataFrame): DataFrame with historical price data.
|
||||
Must contain the 'price_column'.
|
||||
price_column (str): The name of the column containing price data.
|
||||
Default is 'close'.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: The input DataFrame with an added 'RSI' column.
|
||||
Returns the original DataFrame with no 'RSI' column
|
||||
if the period is larger than the number of data points.
|
||||
"""
|
||||
if price_column not in data_df.columns:
|
||||
raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
|
||||
|
||||
if len(data_df) < self.period:
|
||||
print(f"Warning: Data length ({len(data_df)}) is less than RSI period ({self.period}). RSI will not be calculated.")
|
||||
return data_df.copy()
|
||||
|
||||
df = data_df.copy()
|
||||
delta = df[price_column].diff(1)
|
||||
|
||||
gain = delta.where(delta > 0, 0)
|
||||
loss = -delta.where(delta < 0, 0) # Ensure loss is positive
|
||||
|
||||
# Calculate initial average gain and loss (SMA)
|
||||
avg_gain = gain.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period]
|
||||
avg_loss = loss.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period]
|
||||
|
||||
|
||||
# Calculate subsequent average gains and losses (EMA-like)
|
||||
# Pre-allocate lists for gains and losses to avoid repeated appending to Series
|
||||
gains = [0.0] * len(df)
|
||||
losses = [0.0] * len(df)
|
||||
|
||||
if not avg_gain.empty:
|
||||
gains[self.period -1] = avg_gain.iloc[0]
|
||||
if not avg_loss.empty:
|
||||
losses[self.period -1] = avg_loss.iloc[0]
|
||||
|
||||
|
||||
for i in range(self.period, len(df)):
|
||||
gains[i] = ((gains[i-1] * (self.period - 1)) + gain.iloc[i]) / self.period
|
||||
losses[i] = ((losses[i-1] * (self.period - 1)) + loss.iloc[i]) / self.period
|
||||
|
||||
df['avg_gain'] = pd.Series(gains, index=df.index)
|
||||
df['avg_loss'] = pd.Series(losses, index=df.index)
|
||||
|
||||
# Calculate RS
|
||||
# Handle division by zero: if avg_loss is 0, RS is undefined or infinite.
|
||||
# If avg_loss is 0 and avg_gain is also 0, RSI is conventionally 50.
|
||||
# If avg_loss is 0 and avg_gain > 0, RSI is conventionally 100.
|
||||
rs = df['avg_gain'] / df['avg_loss']
|
||||
|
||||
# Calculate RSI
|
||||
# RSI = 100 - (100 / (1 + RS))
|
||||
# If avg_loss is 0:
|
||||
# If avg_gain > 0, RS -> inf, RSI -> 100
|
||||
# If avg_gain == 0, RS -> NaN (0/0), RSI -> 50 (conventionally, or could be 0 or 100 depending on interpretation)
|
||||
# We will use a common convention where RSI is 100 if avg_loss is 0 and avg_gain > 0,
|
||||
# and RSI is 0 if avg_loss is 0 and avg_gain is 0 (or 50, let's use 0 to indicate no strength if both are 0).
|
||||
# However, to avoid NaN from 0/0, it's better to calculate RSI directly with conditions.
|
||||
|
||||
rsi_values = []
|
||||
for i in range(len(df)):
|
||||
avg_g = df['avg_gain'].iloc[i]
|
||||
avg_l = df['avg_loss'].iloc[i]
|
||||
|
||||
if i < self.period -1 : # Not enough data for initial SMA
|
||||
rsi_values.append(np.nan)
|
||||
continue
|
||||
|
||||
if avg_l == 0:
|
||||
if avg_g == 0:
|
||||
rsi_values.append(50) # Or 0, or np.nan depending on how you want to treat this. 50 implies neutrality.
|
||||
else:
|
||||
rsi_values.append(100) # Max strength
|
||||
else:
|
||||
rs_val = avg_g / avg_l
|
||||
rsi_values.append(100 - (100 / (1 + rs_val)))
|
||||
|
||||
df['RSI'] = pd.Series(rsi_values, index=df.index)
|
||||
|
||||
# Remove intermediate columns if desired, or keep them for debugging
|
||||
# df.drop(columns=['avg_gain', 'avg_loss'], inplace=True)
|
||||
|
||||
return df
|
||||
Reference in New Issue
Block a user