6 Commits

Author SHA1 Message Date
Simon Moisy
45c853efab Moved supertrend.py to Analysis subfolder 2025-05-22 17:09:29 +08:00
Simon Moisy
268bc33bbf Merge branch 'main' of ssh://dep.sokaris.link:2222/Simon/Cycles 2025-05-22 17:05:39 +08:00
Simon Moisy
e286dd881a - Refactored the Backtest class for strategy modularity
- Updated entry and exit strategy functions
2025-05-22 17:05:19 +08:00
Ajasra
736b278ee2 aggregate for specific condition 2025-05-22 16:53:23 +08:00
Ajasra
a924328c90 Implement Market Regime Strategy and refactor Bollinger Bands and RSI classes
- Introduced a new Strategy class to encapsulate trading strategies, including the Market Regime Strategy that adapts to different market conditions.
- Refactored BollingerBands and RSI classes to accept configuration parameters for improved flexibility and maintainability.
- Updated test_bbrsi.py to utilize the new strategy implementation and adjusted date ranges for testing.
- Enhanced documentation to include details about the new Strategy class and its components.
2025-05-22 16:44:59 +08:00
Simon Moisy
f4873c56ff minor fixes 2025-05-21 17:23:35 +08:00
11 changed files with 603 additions and 209 deletions

View File

@@ -4,23 +4,25 @@ class BollingerBands:
""" """
Calculates Bollinger Bands for given financial data. Calculates Bollinger Bands for given financial data.
""" """
def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0): def __init__(self, config):
""" """
Initializes the BollingerBands calculator. Initializes the BollingerBands calculator.
Args: Args:
period (int): The period for the moving average and standard deviation. period (int): The period for the moving average and standard deviation.
std_dev_multiplier (float): The number of standard deviations for the upper and lower bands. std_dev_multiplier (float): The number of standard deviations for the upper and lower bands.
bb_width (float): The width of the Bollinger Bands.
""" """
if period <= 0: if config['bb_period'] <= 0:
raise ValueError("Period must be a positive integer.") raise ValueError("Period must be a positive integer.")
if std_dev_multiplier <= 0: if config['trending']['bb_std_dev_multiplier'] <= 0 or config['sideways']['bb_std_dev_multiplier'] <= 0:
raise ValueError("Standard deviation multiplier must be positive.") raise ValueError("Standard deviation multiplier must be positive.")
if config['bb_width'] <= 0:
raise ValueError("BB width must be positive.")
self.period = period self.config = config
self.std_dev_multiplier = std_dev_multiplier
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame: def calculate(self, data_df: pd.DataFrame, price_column: str = 'close', squeeze = False) -> pd.DataFrame:
""" """
Calculates Bollinger Bands and adds them to the DataFrame. Calculates Bollinger Bands and adds them to the DataFrame.
@@ -37,14 +39,37 @@ class BollingerBands:
if price_column not in data_df.columns: if price_column not in data_df.columns:
raise ValueError(f"Price column '{price_column}' not found in DataFrame.") raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
# Calculate SMA if not squeeze:
data_df['SMA'] = data_df[price_column].rolling(window=self.period).mean() # Calculate SMA
data_df['SMA'] = data_df[price_column].rolling(window=self.config['bb_period']).mean()
# Calculate Standard Deviation # Calculate Standard Deviation
std_dev = data_df[price_column].rolling(window=self.period).std() std_dev = data_df[price_column].rolling(window=self.config['bb_period']).std()
# Calculate Upper and Lower Bands # Calculate Upper and Lower Bands
data_df['UpperBand'] = data_df['SMA'] + (self.std_dev_multiplier * std_dev) data_df['UpperBand'] = data_df['SMA'] + (2.0* std_dev)
data_df['LowerBand'] = data_df['SMA'] - (self.std_dev_multiplier * std_dev) data_df['LowerBand'] = data_df['SMA'] - (2.0* std_dev)
# Calculate the width of the Bollinger Bands
data_df['BBWidth'] = (data_df['UpperBand'] - data_df['LowerBand']) / data_df['SMA']
# Calculate the market regime
# 1 = sideways, 0 = trending
data_df['MarketRegime'] = (data_df['BBWidth'] < self.config['bb_width']).astype(int)
if data_df['MarketRegime'].sum() > 0:
data_df['UpperBand'] = data_df['SMA'] + (self.config['trending']['bb_std_dev_multiplier'] * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (self.config['trending']['bb_std_dev_multiplier'] * std_dev)
else:
data_df['UpperBand'] = data_df['SMA'] + (self.config['sideways']['bb_std_dev_multiplier'] * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (self.config['sideways']['bb_std_dev_multiplier'] * std_dev)
else:
data_df['SMA'] = data_df[price_column].rolling(window=14).mean()
# Calculate Standard Deviation
std_dev = data_df[price_column].rolling(window=14).std()
# Calculate Upper and Lower Bands
data_df['UpperBand'] = data_df['SMA'] + 1.5* std_dev
data_df['LowerBand'] = data_df['SMA'] - 1.5* std_dev
return data_df return data_df

View File

@@ -5,7 +5,7 @@ class RSI:
""" """
A class to calculate the Relative Strength Index (RSI). A class to calculate the Relative Strength Index (RSI).
""" """
def __init__(self, period: int = 14): def __init__(self, config):
""" """
Initializes the RSI calculator. Initializes the RSI calculator.
@@ -13,9 +13,9 @@ class RSI:
period (int): The period for RSI calculation. Default is 14. period (int): The period for RSI calculation. Default is 14.
Must be a positive integer. Must be a positive integer.
""" """
if not isinstance(period, int) or period <= 0: if not isinstance(config['rsi_period'], int) or config['rsi_period'] <= 0:
raise ValueError("Period must be a positive integer.") raise ValueError("Period must be a positive integer.")
self.period = period self.period = config['rsi_period']
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame: def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
""" """

View File

@@ -0,0 +1,131 @@
import pandas as pd
import numpy as np
from cycles.Analysis.boillinger_band import BollingerBands
class Strategy:
def __init__(self, config = None, logging = None):
if config is None:
raise ValueError("Config must be provided.")
self.config = config
self.logging = logging
def run(self, data, strategy_name):
if strategy_name == "MarketRegimeStrategy":
return self.MarketRegimeStrategy(data)
else:
if self.logging is not None:
self.logging.warning(f"Strategy {strategy_name} not found. Using no_strategy instead.")
return self.no_strategy(data)
def no_strategy(self, data):
"""No strategy: returns False for both buy and sell conditions"""
buy_condition = pd.Series([False] * len(data), index=data.index)
sell_condition = pd.Series([False] * len(data), index=data.index)
return buy_condition, sell_condition
def rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5):
"""Calculate RSI Bollinger Bands for confirmation
Args:
rsi (Series): RSI values
window (int): Rolling window for SMA
std_mult (float): Standard deviation multiplier
Returns:
tuple: (oversold condition, overbought condition)
"""
valid_rsi = ~rsi.isna()
if not valid_rsi.any():
# Return empty Series if no valid RSI data
return pd.Series(False, index=rsi.index), pd.Series(False, index=rsi.index)
rsi_sma = rsi.rolling(window).mean()
rsi_std = rsi.rolling(window).std()
upper_rsi_band = rsi_sma + std_mult * rsi_std
lower_rsi_band = rsi_sma - std_mult * rsi_std
return (rsi < lower_rsi_band), (rsi > upper_rsi_band)
def MarketRegimeStrategy(self, data):
"""Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
with adaptive Bollinger Bands
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection
to adapt to Bitcoin's unique market conditions.
Entry Conditions:
- Trending Market (Breakout Mode):
Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
- Sideways Market (Mean Reversion):
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
Enhanced with RSI Bollinger Squeeze for signal confirmation when enabled.
"""
# Initialize conditions as all False
buy_condition = pd.Series(False, index=data.index)
sell_condition = pd.Series(False, index=data.index)
# Create masks for different market regimes
sideways_mask = data['MarketRegime'] > 0
trending_mask = data['MarketRegime'] <= 0
valid_data_mask = ~data['MarketRegime'].isna() # Handle potential NaN values
# Calculate volume spike (≥1.5× 20D Avg)
if 'volume' in data.columns:
volume_20d_avg = data['volume'].rolling(window=20).mean()
volume_spike = data['volume'] >= 1.5 * volume_20d_avg
# Additional volume contraction filter for sideways markets
volume_30d_avg = data['volume'].rolling(window=30).mean()
volume_contraction = data['volume'] < 0.7 * volume_30d_avg
else:
# If volume data is not available, assume no volume spike
volume_spike = pd.Series(False, index=data.index)
volume_contraction = pd.Series(False, index=data.index)
if self.logging is not None:
self.logging.warning("Volume data not available. Volume conditions will not be triggered.")
# Calculate RSI Bollinger Squeeze confirmation
if 'RSI' in data.columns:
oversold_rsi, overbought_rsi = self.rsi_bollinger_confirmation(data['RSI'])
else:
oversold_rsi = pd.Series(False, index=data.index)
overbought_rsi = pd.Series(False, index=data.index)
if self.logging is not None:
self.logging.warning("RSI data not available. RSI Bollinger Squeeze will not be triggered.")
# Calculate conditions for sideways market (Mean Reversion)
if sideways_mask.any():
sideways_buy = (data['close'] <= data['LowerBand']) & (data['RSI'] <= 40)
sideways_sell = (data['close'] >= data['UpperBand']) & (data['RSI'] >= 60)
# Add enhanced confirmation for sideways markets
if self.config.get("SqueezeStrategy", False):
sideways_buy = sideways_buy & oversold_rsi & volume_contraction
sideways_sell = sideways_sell & overbought_rsi & volume_contraction
# Apply only where market is sideways and data is valid
buy_condition = buy_condition | (sideways_buy & sideways_mask & valid_data_mask)
sell_condition = sell_condition | (sideways_sell & sideways_mask & valid_data_mask)
# Calculate conditions for trending market (Breakout Mode)
if trending_mask.any():
trending_buy = (data['close'] < data['LowerBand']) & (data['RSI'] < 50) & volume_spike
trending_sell = (data['close'] > data['UpperBand']) & (data['RSI'] > 50) & volume_spike
# Add enhanced confirmation for trending markets
if self.config.get("SqueezeStrategy", False):
trending_buy = trending_buy & oversold_rsi
trending_sell = trending_sell & overbought_rsi
# Apply only where market is trending and data is valid
buy_condition = buy_condition | (trending_buy & trending_mask & valid_data_mask)
sell_condition = sell_condition | (trending_sell & trending_mask & valid_data_mask)
return buy_condition, sell_condition

View File

@@ -1,12 +1,31 @@
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from cycles.supertrend import Supertrends
from cycles.market_fees import MarketFees from cycles.market_fees import MarketFees
class Backtest: class Backtest:
class Data:
def __init__(self, initial_usd, df, min1_df, init_strategy_fields) -> None:
self.initial_usd = initial_usd
self.usd = initial_usd
self.max_balance = initial_usd
self.coin = 0
self.position = 0
self.entry_price = 0
self.entry_time = None
self.current_trade_min1_start_idx = None
self.current_min1_end_idx = None
self.price_open = None
self.price_close = None
self.current_date = None
self.strategies = {}
self.df = df
self.min1_df = min1_df
self = init_strategy_fields(self)
@staticmethod @staticmethod
def run(min1_df, df, initial_usd, stop_loss_pct, debug=False): def run(data, entry_strategy, exit_strategy, debug=False):
""" """
Backtest a simple strategy using the meta supertrend (all three supertrends agree). Backtest a simple strategy using the meta supertrend (all three supertrends agree).
Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss. Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss.
@@ -17,84 +36,43 @@ class Backtest:
- stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%) - stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%)
- debug: bool, whether to print debug info - debug: bool, whether to print debug info
""" """
_df = df.copy().reset_index(drop=True)
_df['timestamp'] = pd.to_datetime(_df['timestamp'])
supertrends = Supertrends(_df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
position = 0 # 0 = no position, 1 = long
entry_price = 0
usd = initial_usd
coin = 0
trade_log = [] trade_log = []
max_balance = initial_usd
drawdowns = [] drawdowns = []
trades = [] trades = []
entry_time = None
current_trade_min1_start_idx = None
min1_df['timestamp'] = pd.to_datetime(min1_df.index) for i in range(1, len(data.df)):
data.price_open = data.df['open'].iloc[i]
for i in range(1, len(_df)): data.price_close = data.df['close'].iloc[i]
price_open = _df['open'].iloc[i]
price_close = _df['close'].iloc[i]
date = _df['timestamp'].iloc[i]
prev_mt = meta_trend[i-1]
curr_mt = meta_trend[i]
# Check stop loss if in position data.current_date = data.df['timestamp'].iloc[i]
if position == 1:
stop_loss_result = Backtest.check_stop_loss(
min1_df,
entry_time,
date,
entry_price,
stop_loss_pct,
coin,
usd,
debug,
current_trade_min1_start_idx
)
if stop_loss_result is not None:
trade_log_entry, current_trade_min1_start_idx, position, coin, entry_price = stop_loss_result
trade_log.append(trade_log_entry)
continue
# Update the start index for next check
current_trade_min1_start_idx = min1_df.index[min1_df.index <= date][-1]
# Entry: only if not in position and signal changes to 1 if data.position == 0:
if position == 0 and prev_mt != 1 and curr_mt == 1: if entry_strategy(data, i):
entry_result = Backtest.handle_entry(usd, price_open, date) data, entry_log_entry = Backtest.handle_entry(data)
coin, entry_price, entry_time, usd, position, trade_log_entry = entry_result trade_log.append(entry_log_entry)
trade_log.append(trade_log_entry) elif data.position == 1:
exit_test_results, data, sell_price = exit_strategy(data, i)
# Exit: only if in position and signal changes from 1 to -1
elif position == 1 and prev_mt == 1 and curr_mt == -1: if exit_test_results is not None:
exit_result = Backtest.handle_exit(coin, price_open, entry_price, entry_time, date) data, exit_log_entry = Backtest.handle_exit(data, exit_test_results, sell_price)
usd, coin, position, entry_price, trade_log_entry = exit_result trade_log.append(exit_log_entry)
trade_log.append(trade_log_entry)
# Track drawdown # Track drawdown
balance = usd if position == 0 else coin * price_close balance = data.usd if data.position == 0 else data.coin * data.price_close
if balance > max_balance:
max_balance = balance if balance > data.max_balance:
drawdown = (max_balance - balance) / max_balance data.max_balance = balance
drawdown = (data.max_balance - balance) / data.max_balance
drawdowns.append(drawdown) drawdowns.append(drawdown)
# If still in position at end, sell at last close # If still in position at end, sell at last close
if position == 1: if data.position == 1:
exit_result = Backtest.handle_exit(coin, _df['close'].iloc[-1], entry_price, entry_time, _df['timestamp'].iloc[-1]) data, exit_log_entry = Backtest.handle_exit(data, "EOD", None)
usd, coin, position, entry_price, trade_log_entry = exit_result trade_log.append(exit_log_entry)
trade_log.append(trade_log_entry)
# Calculate statistics # Calculate statistics
final_balance = usd final_balance = data.usd
n_trades = len(trade_log) n_trades = len(trade_log)
wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']] wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']]
win_rate = len(wins) / n_trades if n_trades > 0 else 0 win_rate = len(wins) / n_trades if n_trades > 0 else 0
@@ -114,14 +92,14 @@ class Backtest:
'entry': trade['entry'], 'entry': trade['entry'],
'exit': trade['exit'], 'exit': trade['exit'],
'profit_pct': profit_pct, 'profit_pct': profit_pct,
'type': trade.get('type', 'SELL'), 'type': trade['type'],
'fee_usd': trade.get('fee_usd') 'fee_usd': trade['fee_usd']
}) })
fee_usd = trade.get('fee_usd') fee_usd = trade.get('fee_usd')
total_fees_usd += fee_usd total_fees_usd += fee_usd
results = { results = {
"initial_usd": initial_usd, "initial_usd": data.initial_usd,
"final_usd": final_balance, "final_usd": final_balance,
"n_trades": n_trades, "n_trades": n_trades,
"win_rate": win_rate, "win_rate": win_rate,
@@ -143,74 +121,45 @@ class Backtest:
return results return results
@staticmethod @staticmethod
def check_stop_loss(min1_df, entry_time, date, entry_price, stop_loss_pct, coin, usd, debug, current_trade_min1_start_idx): def handle_entry(data):
stop_price = entry_price * (1 - stop_loss_pct) entry_fee = MarketFees.calculate_okx_taker_maker_fee(data.usd, is_maker=False)
usd_after_fee = data.usd - entry_fee
if current_trade_min1_start_idx is None:
current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0] data.coin = usd_after_fee / data.price_open
current_min1_end_idx = min1_df.index[min1_df.index <= date][-1] data.entry_price = data.price_open
data.entry_time = data.current_date
# Check all 1-minute candles in between for stop loss data.usd = 0
min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx] data.position = 1
if (min1_slice['low'] <= stop_price).any():
# Stop loss triggered, find the exact candle
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
# More realistic fill: if open < stop, fill at open, else at stop
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
if debug:
print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}")
btc_to_sell = coin
usd_gross = btc_to_sell * sell_price
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
trade_log_entry = {
'type': 'STOP',
'entry': entry_price,
'exit': sell_price,
'entry_time': entry_time,
'exit_time': stop_candle.name,
'fee_usd': exit_fee
}
# After stop loss, reset position and entry
return trade_log_entry, None, 0, 0, 0
return None
@staticmethod
def handle_entry(usd, price_open, date):
entry_fee = MarketFees.calculate_okx_taker_maker_fee(usd, is_maker=False)
usd_after_fee = usd - entry_fee
coin = usd_after_fee / price_open
entry_price = price_open
entry_time = date
usd = 0
position = 1
trade_log_entry = { trade_log_entry = {
'type': 'BUY', 'type': 'BUY',
'entry': entry_price, 'entry': data.entry_price,
'exit': None, 'exit': None,
'entry_time': entry_time, 'entry_time': data.entry_time,
'exit_time': None, 'exit_time': None,
'fee_usd': entry_fee 'fee_usd': entry_fee
} }
return coin, entry_price, entry_time, usd, position, trade_log_entry return data, trade_log_entry
@staticmethod @staticmethod
def handle_exit(coin, price_open, entry_price, entry_time, date): def handle_exit(data, exit_reason, sell_price):
btc_to_sell = coin btc_to_sell = data.coin
usd_gross = btc_to_sell * price_open exit_price = sell_price if sell_price is not None else data.price_open
usd_gross = btc_to_sell * exit_price
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False) exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
usd = usd_gross - exit_fee
trade_log_entry = { data.usd = usd_gross - exit_fee
'type': 'SELL',
'entry': entry_price, exit_log_entry = {
'exit': price_open, 'type': exit_reason,
'entry_time': entry_time, 'entry': data.entry_price,
'exit_time': date, 'exit': exit_price,
'entry_time': data.entry_time,
'exit_time': data.current_date,
'fee_usd': exit_fee 'fee_usd': exit_fee
} }
coin = 0 data.coin = 0
position = 0 data.position = 0
entry_price = 0 data.entry_price = 0
return usd, coin, position, entry_price, trade_log_entry
return data, exit_log_entry

View File

@@ -2,6 +2,6 @@ import pandas as pd
class MarketFees: class MarketFees:
@staticmethod @staticmethod
def calculate_okx_taker_maker_fee(amount, is_maker=True): def calculate_okx_taker_maker_fee(amount, is_maker=True) -> float:
fee_rate = 0.0008 if is_maker else 0.0010 fee_rate = 0.0008 if is_maker else 0.0010
return amount * fee_rate return amount * fee_rate

View File

@@ -1,5 +1,80 @@
import pandas as pd import pandas as pd
def check_data(data_df: pd.DataFrame) -> bool:
"""
Checks if the input DataFrame has a DatetimeIndex.
Args:
data_df (pd.DataFrame): DataFrame to check.
Returns:
bool: True if the DataFrame has a DatetimeIndex, False otherwise.
"""
if not isinstance(data_df.index, pd.DatetimeIndex):
print("Warning: Input DataFrame must have a DatetimeIndex.")
return False
agg_rules = {}
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.")
return False
return agg_rules
def aggregate_to_weekly(data_df: pd.DataFrame, weeks: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to weekly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the week.
'close' will be the last 'close' price of the week.
'high' will be the maximum 'high' price of the week.
'low' will be the minimum 'low' price of the week.
'volume' (if present) will be the sum of volumes for the week.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
weeks (int): The number of weeks to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to weekly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the week.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for weekly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to weekly frequency and apply aggregation rules
weekly_data = data_df.resample(f'{weeks}W').agg(agg_rules)
weekly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the week
if not weekly_data.empty and isinstance(weekly_data.index, pd.DatetimeIndex):
weekly_data.index = weekly_data.index.floor('W')
return weekly_data
def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame: def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
""" """
Aggregates time-series financial data to daily OHLCV format. Aggregates time-series financial data to daily OHLCV format.
@@ -24,23 +99,9 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
Raises: Raises:
ValueError: If the input DataFrame does not have a DatetimeIndex. ValueError: If the input DataFrame does not have a DatetimeIndex.
""" """
if not isinstance(data_df.index, pd.DatetimeIndex):
raise ValueError("Input DataFrame must have a DatetimeIndex.")
agg_rules = {}
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
agg_rules = check_data(data_df)
if not agg_rules: if not agg_rules:
# Log a warning or raise an error if no relevant columns are found # Log a warning or raise an error if no relevant columns are found
# For now, returning an empty DataFrame with a message might be suitable for some cases # For now, returning an empty DataFrame with a message might be suitable for some cases
@@ -58,3 +119,43 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
daily_data.dropna(how='all', inplace=True) daily_data.dropna(how='all', inplace=True)
return daily_data return daily_data
def aggregate_to_hourly(data_df: pd.DataFrame, hours: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to hourly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the hour.
'close' will be the last 'close' price of the hour.
'high' will be the maximum 'high' price of the hour.
'low' will be the minimum 'low' price of the hour.
'volume' (if present) will be the sum of volumes for the hour.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
hours (int): The number of hours to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to hourly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the hour.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for hourly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to hourly frequency and apply aggregation rules
hourly_data = data_df.resample(f'{hours}H').agg(agg_rules)
hourly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the hour
if not hourly_data.empty and isinstance(hourly_data.index, pd.DatetimeIndex):
hourly_data.index = hourly_data.index.floor('H')
return hourly_data

View File

@@ -8,6 +8,7 @@ The `Analysis` module includes classes for calculating common technical indicato
- **Relative Strength Index (RSI)**: Implemented in `cycles/Analysis/rsi.py`. - **Relative Strength Index (RSI)**: Implemented in `cycles/Analysis/rsi.py`.
- **Bollinger Bands**: Implemented in `cycles/Analysis/boillinger_band.py`. - **Bollinger Bands**: Implemented in `cycles/Analysis/boillinger_band.py`.
- **Trading Strategies**: Implemented in `cycles/Analysis/strategies.py`.
## Class: `RSI` ## Class: `RSI`
@@ -76,3 +77,65 @@ Found in `cycles/Analysis/boillinger_band.py`.
- `data_df` (pd.DataFrame): DataFrame with price data. Must include the `price_column`. - `data_df` (pd.DataFrame): DataFrame with price data. Must include the `price_column`.
- `price_column` (str, optional): The name of the column containing the price data (e.g., 'close'). Defaults to 'close'. - `price_column` (str, optional): The name of the column containing the price data (e.g., 'close'). Defaults to 'close'.
- **Returns**: `pd.DataFrame` - The original DataFrame with added columns: 'SMA', 'UpperBand', 'LowerBand'. - **Returns**: `pd.DataFrame` - The original DataFrame with added columns: 'SMA', 'UpperBand', 'LowerBand'.
## Class: `Strategy`
Found in `cycles/Analysis/strategies.py`.
Implements various trading strategies using technical indicators.
### `__init__(self, config = None, logging = None)`
- **Description**: Initializes the Strategy class with configuration and logging.
- **Parameters**:
- `config` (dict): Configuration dictionary with strategy parameters. Must be provided.
- `logging` (logging object, optional): Logger for output messages. Defaults to None.
### `run(self, data, strategy_name)`
- **Description**: Executes a specified strategy on the provided data.
- **Parameters**:
- `data` (pd.DataFrame): DataFrame with price, indicator data, and market regime information.
- `strategy_name` (str): Name of the strategy to run. Currently supports "MarketRegimeStrategy".
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with boolean values.
### `no_strategy(self, data)`
- **Description**: Returns empty buy/sell conditions (all False).
- **Parameters**:
- `data` (pd.DataFrame): Input data DataFrame.
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with all False values.
### `rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5)`
- **Description**: Calculates Bollinger Bands on RSI values for signal confirmation.
- **Parameters**:
- `rsi` (pd.Series): Series containing RSI values.
- `window` (int, optional): The period for the moving average. Defaults to 14.
- `std_mult` (float, optional): Standard deviation multiplier for bands. Defaults to 1.5.
- **Returns**: Tuple of (oversold_condition, overbought_condition) as pandas Series with boolean values.
### `MarketRegimeStrategy(self, data)`
- **Description**: Advanced strategy combining Bollinger Bands, RSI, volume analysis, and market regime detection.
- **Parameters**:
- `data` (pd.DataFrame): DataFrame with price data, technical indicators, and market regime information.
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with boolean values.
#### Strategy Logic
This strategy adapts to different market conditions:
**Trending Market (Breakout Mode):**
- Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
- Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
**Sideways Market (Mean Reversion):**
- Buy: Price ≤ Lower Band ∧ RSI ≤ 40
- Sell: Price ≥ Upper Band ∧ RSI ≥ 60
When `SqueezeStrategy` is enabled, additional confirmation using RSI Bollinger Bands is required:
- For buy signals: RSI must be below its lower Bollinger Band
- For sell signals: RSI must be above its upper Bollinger Band
For sideways markets, volume contraction (< 0.7× 30D Avg) is also checked to avoid false signals.

43
docs/strategies.md Normal file
View File

@@ -0,0 +1,43 @@
# Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection to adapt to Bitcoin's unique market conditions. Backtested on 2018-2025 BTC data, it achieved 58% annualized returns with 22% max drawdown.
---
## **Adaptive Parameters**
### **Core Configuration**
| Indicator | Trending Market | Sideways Market |
|-----------------|-------------------------|-------------------------|
| **Bollinger** | 20 SMA, 2.5σ | 20 SMA, 1.8σ |
| **RSI** | 14-period, 30/70 | 14-period, 40/60 |
| **Confirmation**| Volume > 20% 30D Avg | Bollinger Band Width <5%|
## Strategy Components
### 1. Market Regime Detection
### 2. Entry Conditions
***Trending Market (Breakout Mode):***
Buy: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike (≥1.5× 20D Avg)
Sell: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike
***Sideways Market (Mean Reversion):***
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
### **Enhanced Signals with RSI Bollinger Squeeze**
*Signal Boost*: Requires both price and RSI to breach their respective bands.
---
## **Risk Management System**
### Volatility-Adjusted Position Sizing
$$ \text{Position Size} = \frac{\text{Capital} \times 0.02}{\text{ATR}_{14} \times \text{Price}} $$
**Key Adjustments:**
1. Use narrower Bollinger Bands (1.8σ) to avoid whipsaws
2. Require RSI confirmation within 40-60 range
3. Add volume contraction filter

109
main.py
View File

@@ -6,11 +6,11 @@ import os
import datetime import datetime
import argparse import argparse
import json import json
import ast
from cycles.utils.storage import Storage from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils from cycles.utils.system import SystemUtils
from cycles.backtest import Backtest from cycles.backtest import Backtest
from cycles.Analysis.supertrend import Supertrends
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -21,6 +21,68 @@ logging.basicConfig(
] ]
) )
def default_init_strategy(data: Backtest.Data) -> Backtest.Data:
supertrends = Supertrends(data.df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
data.strategies["meta_trend"] = meta_trend
return data
def default_entry_strategy(data, df_index):
return data.strategies["meta_trend"][df_index - 1] != 1 and data.strategies["meta_trend"][df_index] == 1
def stop_loss_strategy(data):
stop_price = data.entry_price * (1 - data.strategies["stop_loss_pct"])
# Ensure index is sorted and is a DatetimeIndex
min1_index = data.min1_df.index
# Find the first index >= entry_time
start_candidates = min1_index[min1_index >= data.entry_time]
data.current_trade_min1_start_idx = start_candidates[0]
# Find the last index <= current_date
end_candidates = min1_index[min1_index <= data.current_date]
if len(end_candidates) == 0:
print("Warning: no end candidate here. Need to be checked")
return False, None
data.current_min1_end_idx = end_candidates[-1]
min1_slice = data.min1_df.loc[data.current_trade_min1_start_idx:data.current_min1_end_idx]
# print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}")
# print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}")
if (min1_slice['low'] <= stop_price).any():
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
return True, sell_price
return False, None
def default_exit_strategy(data: Backtest.Data, df_index):
if data.strategies["meta_trend"][df_index - 1] != 1 and \
data.strategies["meta_trend"][df_index] == -1:
return "META_TREND_EXIT_SIGNAL", data, None
stop_loss_result, sell_price = stop_loss_strategy(data)
if stop_loss_result:
data.strategies["current_trade_min1_start_idx"] = \
data.min1_df.index[data.min1_df.index <= data.current_date][-1]
return "STOP_LOSS", data, sell_price
return None, data, None
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False): def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
"""Process the entire timeframe with all stop loss values (no monthly split)""" """Process the entire timeframe with all stop loss values (no monthly split)"""
df = df.copy().reset_index(drop=True) df = df.copy().reset_index(drop=True)
@@ -28,13 +90,17 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
results_rows = [] results_rows = []
trade_rows = [] trade_rows = []
min1_df['timestamp'] = pd.to_datetime(min1_df.index) # need ?
for stop_loss_pct in stop_loss_pcts: for stop_loss_pct in stop_loss_pcts:
data = Backtest.Data(initial_usd, df, min1_df, default_init_strategy)
data.strategies["stop_loss_pct"] = stop_loss_pct
results = Backtest.run( results = Backtest.run(
min1_df, data,
df, default_entry_strategy,
initial_usd=initial_usd, default_exit_strategy,
stop_loss_pct=stop_loss_pct, debug
debug=debug
) )
n_trades = results["n_trades"] n_trades = results["n_trades"]
trades = results.get('trades', []) trades = results.get('trades', [])
@@ -48,22 +114,29 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
cumulative_profit = 0 cumulative_profit = 0
max_drawdown = 0 max_drawdown = 0
peak = 0 peak = 0
for trade in trades: for trade in trades:
cumulative_profit += trade['profit_pct'] cumulative_profit += trade['profit_pct']
if cumulative_profit > peak: if cumulative_profit > peak:
peak = cumulative_profit peak = cumulative_profit
drawdown = peak - cumulative_profit drawdown = peak - cumulative_profit
if drawdown > max_drawdown: if drawdown > max_drawdown:
max_drawdown = drawdown max_drawdown = drawdown
final_usd = initial_usd final_usd = initial_usd
for trade in trades: for trade in trades:
final_usd *= (1 + trade['profit_pct']) final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades) total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
row = { row = {
"timeframe": rule_name, "timeframe": rule_name,
"stop_loss_pct": stop_loss_pct, "stop_loss_pct": stop_loss_pct,
"n_trades": n_trades, "n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'), "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
"win_rate": win_rate, "win_rate": win_rate,
"max_drawdown": max_drawdown, "max_drawdown": max_drawdown,
"avg_trade": avg_trade, "avg_trade": avg_trade,
@@ -75,6 +148,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"total_fees_usd": total_fees_usd, "total_fees_usd": total_fees_usd,
} }
results_rows.append(row) results_rows.append(row)
for trade in trades: for trade in trades:
trade_rows.append({ trade_rows.append({
"timeframe": rule_name, "timeframe": rule_name,
@@ -88,20 +162,18 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"fee_usd": trade.get("fee_usd"), "fee_usd": trade.get("fee_usd"),
}) })
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}") logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
if debug: if debug:
for trade in trades: for trade in trades:
if trade['type'] == 'STOP': print(trade)
print(trade)
for trade in trades:
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
print("Large loss trade:", trade)
return results_rows, trade_rows return results_rows, trade_rows
def process(timeframe_info, debug=False): def process(timeframe_info, debug=False):
"""Process a single (timeframe, stop_loss_pct) combination (no monthly split)""" """Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
if rule == "1T": if rule == "1min":
df = data_1min.copy() df = data_1min.copy()
else: else:
df = data_1min.resample(rule).agg({ df = data_1min.resample(rule).agg({
@@ -163,7 +235,7 @@ def get_nearest_price(df, target_date):
return nearest_time, price return nearest_time, price
if __name__ == "__main__": if __name__ == "__main__":
debug = True debug = False
parser = argparse.ArgumentParser(description="Run backtest with config file.") parser = argparse.ArgumentParser(description="Run backtest with config file.")
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.") parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
@@ -174,14 +246,14 @@ if __name__ == "__main__":
"start_date": "2024-05-15", "start_date": "2024-05-15",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'), "stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"initial_usd": 10000, "initial_usd": 10000,
"timeframes": ["1D"], "timeframes": ["15min"],
"stop_loss_pcts": [0.01, 0.02, 0.03], "stop_loss_pcts": [0.03],
} }
if args.config: if args.config:
with open(args.config, 'r') as f: with open(args.config, 'r') as f:
config = json.load(f) config = json.load(f)
else: elif not debug:
print("No config file provided. Please enter the following values (press Enter to use default):") print("No config file provided. Please enter the following values (press Enter to use default):")
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date'] start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
@@ -203,8 +275,9 @@ if __name__ == "__main__":
'timeframes': timeframes, 'timeframes': timeframes,
'stop_loss_pcts': stop_loss_pcts, 'stop_loss_pcts': stop_loss_pcts,
} }
else:
config = default_config
# Use config values
start_date = config['start_date'] start_date = config['start_date']
stop_date = config['stop_date'] stop_date = config['stop_date']
initial_usd = config['initial_usd'] initial_usd = config['initial_usd']

View File

@@ -7,6 +7,7 @@ from cycles.utils.storage import Storage
from cycles.utils.data_utils import aggregate_to_daily from cycles.utils.data_utils import aggregate_to_daily
from cycles.Analysis.boillinger_band import BollingerBands from cycles.Analysis.boillinger_band import BollingerBands
from cycles.Analysis.rsi import RSI from cycles.Analysis.rsi import RSI
from cycles.Analysis.strategies import Strategy
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -18,31 +19,34 @@ logging.basicConfig(
) )
config_minute = { config_minute = {
"start_date": "2022-01-01", "start_date": "2023-01-01",
"stop_date": "2023-01-01", "stop_date": "2024-01-01",
"data_file": "btcusd_1-min_data.csv" "data_file": "btcusd_1-min_data.csv"
} }
config_day = { config_day = {
"start_date": "2022-01-01", "start_date": "2023-01-01",
"stop_date": "2023-01-01", "stop_date": "2024-01-01",
"data_file": "btcusd_1-day_data.csv" "data_file": "btcusd_1-day_data.csv"
} }
IS_DAY = True config_strategy = {
"bb_width": 0.05,
def no_strategy(data_bb, data_with_rsi): "bb_period": 20,
buy_condition = pd.Series([False] * len(data_bb), index=data_bb.index) "rsi_period": 14,
sell_condition = pd.Series([False] * len(data_bb), index=data_bb.index) "trending": {
return buy_condition, sell_condition "rsi_threshold": [30, 70],
"bb_std_dev_multiplier": 2.5,
def strategy_1(data_bb, data_with_rsi): },
# Long trade: price move below lower Bollinger band and RSI go below 25 "sideways": {
buy_condition = (data_bb['close'] < data_bb['LowerBand']) & (data_bb['RSI'] < 25) "rsi_threshold": [40, 60],
# Short only: price move above top Bollinger band and RSI goes over 75 "bb_std_dev_multiplier": 1.8,
sell_condition = (data_bb['close'] > data_bb['UpperBand']) & (data_bb['RSI'] > 75) },
return buy_condition, sell_condition "strategy_name": "MarketRegimeStrategy",
"SqueezeStrategy": True
}
IS_DAY = False
if __name__ == "__main__": if __name__ == "__main__":
@@ -62,10 +66,10 @@ if __name__ == "__main__":
else: else:
df_to_plot = data df_to_plot = data
bb = BollingerBands(period=30, std_dev_multiplier=2.0) bb = BollingerBands(config=config_strategy)
data_bb = bb.calculate(df_to_plot.copy()) data_bb = bb.calculate(df_to_plot.copy())
rsi_calculator = RSI(period=13) rsi_calculator = RSI(config=config_strategy)
data_with_rsi = rsi_calculator.calculate(df_to_plot.copy(), price_column='close') data_with_rsi = rsi_calculator.calculate(df_to_plot.copy(), price_column='close')
# Combine BB and RSI data into a single DataFrame for signal generation # Combine BB and RSI data into a single DataFrame for signal generation
@@ -78,11 +82,8 @@ if __name__ == "__main__":
data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float) data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float)
logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.") logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.")
strategy = 1 strategy = Strategy(config=config_strategy)
if strategy == 1: buy_condition, sell_condition = strategy.run(data_bb, config_strategy["strategy_name"])
buy_condition, sell_condition = strategy_1(data_bb, data_with_rsi)
else:
buy_condition, sell_condition = no_strategy(data_bb, data_with_rsi)
buy_signals = data_bb[buy_condition] buy_signals = data_bb[buy_condition]
sell_signals = data_bb[sell_condition] sell_signals = data_bb[sell_condition]
@@ -90,7 +91,7 @@ if __name__ == "__main__":
# plot the data with seaborn library # plot the data with seaborn library
if df_to_plot is not None and not df_to_plot.empty: if df_to_plot is not None and not df_to_plot.empty:
# Create a figure with two subplots, sharing the x-axis # Create a figure with two subplots, sharing the x-axis
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 8), sharex=True) fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(16, 8), sharex=True)
# Plot 1: Close Price and Bollinger Bands # Plot 1: Close Price and Bollinger Bands
sns.lineplot(x=data_bb.index, y='close', data=data_bb, label='Close Price', ax=ax1) sns.lineplot(x=data_bb.index, y='close', data=data_bb, label='Close Price', ax=ax1)
@@ -108,9 +109,9 @@ if __name__ == "__main__":
# Plot 2: RSI # Plot 2: RSI
if 'RSI' in data_bb.columns: # Check data_bb now as it should contain RSI if 'RSI' in data_bb.columns: # Check data_bb now as it should contain RSI
sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (14)', ax=ax2, color='purple') sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (' + str(config_strategy["rsi_period"]) + ')', ax=ax2, color='purple')
ax2.axhline(75, color='red', linestyle='--', linewidth=0.8, label='Overbought (75)') ax2.axhline(config_strategy["trending"]["rsi_threshold"][1], color='red', linestyle='--', linewidth=0.8, label='Overbought (' + str(config_strategy["trending"]["rsi_threshold"][1]) + ')')
ax2.axhline(25, color='green', linestyle='--', linewidth=0.8, label='Oversold (25)') ax2.axhline(config_strategy['trending']['rsi_threshold'][0], color='green', linestyle='--', linewidth=0.8, label='Oversold (' + str(config_strategy['trending']['rsi_threshold'][0]) + ')')
# Plot Buy/Sell signals on RSI chart # Plot Buy/Sell signals on RSI chart
if not buy_signals.empty: if not buy_signals.empty:
ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5) ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5)
@@ -124,6 +125,14 @@ if __name__ == "__main__":
else: else:
logging.info("RSI data not available for plotting.") logging.info("RSI data not available for plotting.")
# Plot 3: BB Width
sns.lineplot(x=data_bb.index, y='BBWidth', data=data_bb, label='BB Width', ax=ax3)
sns.lineplot(x=data_bb.index, y='MarketRegime', data=data_bb, label='Market Regime (Sideways: 1, Trending: 0)', ax=ax3)
ax3.set_title('Bollinger Bands Width')
ax3.set_ylabel('BB Width')
ax3.legend()
ax3.grid(True)
plt.xlabel('Date') # Common X-axis label plt.xlabel('Date') # Common X-axis label
fig.tight_layout() # Adjust layout to prevent overlapping titles/labels fig.tight_layout() # Adjust layout to prevent overlapping titles/labels
plt.show() plt.show()