Compare commits
5 Commits
old_code
...
268bc33bbf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
268bc33bbf | ||
|
|
e286dd881a | ||
|
|
736b278ee2 | ||
|
|
a924328c90 | ||
|
|
f4873c56ff |
@@ -4,23 +4,25 @@ class BollingerBands:
|
||||
"""
|
||||
Calculates Bollinger Bands for given financial data.
|
||||
"""
|
||||
def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0):
|
||||
def __init__(self, config):
|
||||
"""
|
||||
Initializes the BollingerBands calculator.
|
||||
|
||||
Args:
|
||||
period (int): The period for the moving average and standard deviation.
|
||||
std_dev_multiplier (float): The number of standard deviations for the upper and lower bands.
|
||||
bb_width (float): The width of the Bollinger Bands.
|
||||
"""
|
||||
if period <= 0:
|
||||
if config['bb_period'] <= 0:
|
||||
raise ValueError("Period must be a positive integer.")
|
||||
if std_dev_multiplier <= 0:
|
||||
if config['trending']['bb_std_dev_multiplier'] <= 0 or config['sideways']['bb_std_dev_multiplier'] <= 0:
|
||||
raise ValueError("Standard deviation multiplier must be positive.")
|
||||
if config['bb_width'] <= 0:
|
||||
raise ValueError("BB width must be positive.")
|
||||
|
||||
self.period = period
|
||||
self.std_dev_multiplier = std_dev_multiplier
|
||||
self.config = config
|
||||
|
||||
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
|
||||
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close', squeeze = False) -> pd.DataFrame:
|
||||
"""
|
||||
Calculates Bollinger Bands and adds them to the DataFrame.
|
||||
|
||||
@@ -37,14 +39,37 @@ class BollingerBands:
|
||||
if price_column not in data_df.columns:
|
||||
raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
|
||||
|
||||
if not squeeze:
|
||||
# Calculate SMA
|
||||
data_df['SMA'] = data_df[price_column].rolling(window=self.period).mean()
|
||||
data_df['SMA'] = data_df[price_column].rolling(window=self.config['bb_period']).mean()
|
||||
|
||||
# Calculate Standard Deviation
|
||||
std_dev = data_df[price_column].rolling(window=self.period).std()
|
||||
std_dev = data_df[price_column].rolling(window=self.config['bb_period']).std()
|
||||
|
||||
# Calculate Upper and Lower Bands
|
||||
data_df['UpperBand'] = data_df['SMA'] + (self.std_dev_multiplier * std_dev)
|
||||
data_df['LowerBand'] = data_df['SMA'] - (self.std_dev_multiplier * std_dev)
|
||||
data_df['UpperBand'] = data_df['SMA'] + (2.0* std_dev)
|
||||
data_df['LowerBand'] = data_df['SMA'] - (2.0* std_dev)
|
||||
|
||||
# Calculate the width of the Bollinger Bands
|
||||
data_df['BBWidth'] = (data_df['UpperBand'] - data_df['LowerBand']) / data_df['SMA']
|
||||
|
||||
# Calculate the market regime
|
||||
# 1 = sideways, 0 = trending
|
||||
data_df['MarketRegime'] = (data_df['BBWidth'] < self.config['bb_width']).astype(int)
|
||||
|
||||
if data_df['MarketRegime'].sum() > 0:
|
||||
data_df['UpperBand'] = data_df['SMA'] + (self.config['trending']['bb_std_dev_multiplier'] * std_dev)
|
||||
data_df['LowerBand'] = data_df['SMA'] - (self.config['trending']['bb_std_dev_multiplier'] * std_dev)
|
||||
else:
|
||||
data_df['UpperBand'] = data_df['SMA'] + (self.config['sideways']['bb_std_dev_multiplier'] * std_dev)
|
||||
data_df['LowerBand'] = data_df['SMA'] - (self.config['sideways']['bb_std_dev_multiplier'] * std_dev)
|
||||
|
||||
else:
|
||||
data_df['SMA'] = data_df[price_column].rolling(window=14).mean()
|
||||
# Calculate Standard Deviation
|
||||
std_dev = data_df[price_column].rolling(window=14).std()
|
||||
# Calculate Upper and Lower Bands
|
||||
data_df['UpperBand'] = data_df['SMA'] + 1.5* std_dev
|
||||
data_df['LowerBand'] = data_df['SMA'] - 1.5* std_dev
|
||||
|
||||
return data_df
|
||||
|
||||
@@ -5,7 +5,7 @@ class RSI:
|
||||
"""
|
||||
A class to calculate the Relative Strength Index (RSI).
|
||||
"""
|
||||
def __init__(self, period: int = 14):
|
||||
def __init__(self, config):
|
||||
"""
|
||||
Initializes the RSI calculator.
|
||||
|
||||
@@ -13,9 +13,9 @@ class RSI:
|
||||
period (int): The period for RSI calculation. Default is 14.
|
||||
Must be a positive integer.
|
||||
"""
|
||||
if not isinstance(period, int) or period <= 0:
|
||||
if not isinstance(config['rsi_period'], int) or config['rsi_period'] <= 0:
|
||||
raise ValueError("Period must be a positive integer.")
|
||||
self.period = period
|
||||
self.period = config['rsi_period']
|
||||
|
||||
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
|
||||
"""
|
||||
|
||||
131
cycles/Analysis/strategies.py
Normal file
131
cycles/Analysis/strategies.py
Normal file
@@ -0,0 +1,131 @@
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
from cycles.Analysis.boillinger_band import BollingerBands
|
||||
|
||||
|
||||
class Strategy:
|
||||
|
||||
def __init__(self, config = None, logging = None):
|
||||
if config is None:
|
||||
raise ValueError("Config must be provided.")
|
||||
self.config = config
|
||||
self.logging = logging
|
||||
|
||||
def run(self, data, strategy_name):
|
||||
if strategy_name == "MarketRegimeStrategy":
|
||||
return self.MarketRegimeStrategy(data)
|
||||
else:
|
||||
if self.logging is not None:
|
||||
self.logging.warning(f"Strategy {strategy_name} not found. Using no_strategy instead.")
|
||||
return self.no_strategy(data)
|
||||
|
||||
def no_strategy(self, data):
|
||||
"""No strategy: returns False for both buy and sell conditions"""
|
||||
buy_condition = pd.Series([False] * len(data), index=data.index)
|
||||
sell_condition = pd.Series([False] * len(data), index=data.index)
|
||||
return buy_condition, sell_condition
|
||||
|
||||
def rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5):
|
||||
"""Calculate RSI Bollinger Bands for confirmation
|
||||
|
||||
Args:
|
||||
rsi (Series): RSI values
|
||||
window (int): Rolling window for SMA
|
||||
std_mult (float): Standard deviation multiplier
|
||||
|
||||
Returns:
|
||||
tuple: (oversold condition, overbought condition)
|
||||
"""
|
||||
valid_rsi = ~rsi.isna()
|
||||
if not valid_rsi.any():
|
||||
# Return empty Series if no valid RSI data
|
||||
return pd.Series(False, index=rsi.index), pd.Series(False, index=rsi.index)
|
||||
|
||||
rsi_sma = rsi.rolling(window).mean()
|
||||
rsi_std = rsi.rolling(window).std()
|
||||
upper_rsi_band = rsi_sma + std_mult * rsi_std
|
||||
lower_rsi_band = rsi_sma - std_mult * rsi_std
|
||||
|
||||
return (rsi < lower_rsi_band), (rsi > upper_rsi_band)
|
||||
|
||||
def MarketRegimeStrategy(self, data):
|
||||
"""Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
|
||||
with adaptive Bollinger Bands
|
||||
|
||||
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection
|
||||
to adapt to Bitcoin's unique market conditions.
|
||||
|
||||
Entry Conditions:
|
||||
- Trending Market (Breakout Mode):
|
||||
Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
|
||||
Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
|
||||
- Sideways Market (Mean Reversion):
|
||||
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
|
||||
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
|
||||
|
||||
Enhanced with RSI Bollinger Squeeze for signal confirmation when enabled.
|
||||
"""
|
||||
|
||||
# Initialize conditions as all False
|
||||
buy_condition = pd.Series(False, index=data.index)
|
||||
sell_condition = pd.Series(False, index=data.index)
|
||||
|
||||
# Create masks for different market regimes
|
||||
sideways_mask = data['MarketRegime'] > 0
|
||||
trending_mask = data['MarketRegime'] <= 0
|
||||
valid_data_mask = ~data['MarketRegime'].isna() # Handle potential NaN values
|
||||
|
||||
# Calculate volume spike (≥1.5× 20D Avg)
|
||||
if 'volume' in data.columns:
|
||||
volume_20d_avg = data['volume'].rolling(window=20).mean()
|
||||
volume_spike = data['volume'] >= 1.5 * volume_20d_avg
|
||||
|
||||
# Additional volume contraction filter for sideways markets
|
||||
volume_30d_avg = data['volume'].rolling(window=30).mean()
|
||||
volume_contraction = data['volume'] < 0.7 * volume_30d_avg
|
||||
else:
|
||||
# If volume data is not available, assume no volume spike
|
||||
volume_spike = pd.Series(False, index=data.index)
|
||||
volume_contraction = pd.Series(False, index=data.index)
|
||||
if self.logging is not None:
|
||||
self.logging.warning("Volume data not available. Volume conditions will not be triggered.")
|
||||
|
||||
# Calculate RSI Bollinger Squeeze confirmation
|
||||
if 'RSI' in data.columns:
|
||||
oversold_rsi, overbought_rsi = self.rsi_bollinger_confirmation(data['RSI'])
|
||||
else:
|
||||
oversold_rsi = pd.Series(False, index=data.index)
|
||||
overbought_rsi = pd.Series(False, index=data.index)
|
||||
if self.logging is not None:
|
||||
self.logging.warning("RSI data not available. RSI Bollinger Squeeze will not be triggered.")
|
||||
|
||||
# Calculate conditions for sideways market (Mean Reversion)
|
||||
if sideways_mask.any():
|
||||
sideways_buy = (data['close'] <= data['LowerBand']) & (data['RSI'] <= 40)
|
||||
sideways_sell = (data['close'] >= data['UpperBand']) & (data['RSI'] >= 60)
|
||||
|
||||
# Add enhanced confirmation for sideways markets
|
||||
if self.config.get("SqueezeStrategy", False):
|
||||
sideways_buy = sideways_buy & oversold_rsi & volume_contraction
|
||||
sideways_sell = sideways_sell & overbought_rsi & volume_contraction
|
||||
|
||||
# Apply only where market is sideways and data is valid
|
||||
buy_condition = buy_condition | (sideways_buy & sideways_mask & valid_data_mask)
|
||||
sell_condition = sell_condition | (sideways_sell & sideways_mask & valid_data_mask)
|
||||
|
||||
# Calculate conditions for trending market (Breakout Mode)
|
||||
if trending_mask.any():
|
||||
trending_buy = (data['close'] < data['LowerBand']) & (data['RSI'] < 50) & volume_spike
|
||||
trending_sell = (data['close'] > data['UpperBand']) & (data['RSI'] > 50) & volume_spike
|
||||
|
||||
# Add enhanced confirmation for trending markets
|
||||
if self.config.get("SqueezeStrategy", False):
|
||||
trending_buy = trending_buy & oversold_rsi
|
||||
trending_sell = trending_sell & overbought_rsi
|
||||
|
||||
# Apply only where market is trending and data is valid
|
||||
buy_condition = buy_condition | (trending_buy & trending_mask & valid_data_mask)
|
||||
sell_condition = sell_condition | (trending_sell & trending_mask & valid_data_mask)
|
||||
|
||||
return buy_condition, sell_condition
|
||||
@@ -1,12 +1,31 @@
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
from cycles.supertrend import Supertrends
|
||||
from cycles.market_fees import MarketFees
|
||||
|
||||
class Backtest:
|
||||
class Data:
|
||||
def __init__(self, initial_usd, df, min1_df, init_strategy_fields) -> None:
|
||||
self.initial_usd = initial_usd
|
||||
self.usd = initial_usd
|
||||
self.max_balance = initial_usd
|
||||
self.coin = 0
|
||||
self.position = 0
|
||||
self.entry_price = 0
|
||||
self.entry_time = None
|
||||
self.current_trade_min1_start_idx = None
|
||||
self.current_min1_end_idx = None
|
||||
self.price_open = None
|
||||
self.price_close = None
|
||||
self.current_date = None
|
||||
self.strategies = {}
|
||||
self.df = df
|
||||
self.min1_df = min1_df
|
||||
|
||||
self = init_strategy_fields(self)
|
||||
|
||||
@staticmethod
|
||||
def run(min1_df, df, initial_usd, stop_loss_pct, debug=False):
|
||||
def run(data, entry_strategy, exit_strategy, debug=False):
|
||||
"""
|
||||
Backtest a simple strategy using the meta supertrend (all three supertrends agree).
|
||||
Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss.
|
||||
@@ -17,84 +36,43 @@ class Backtest:
|
||||
- stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%)
|
||||
- debug: bool, whether to print debug info
|
||||
"""
|
||||
_df = df.copy().reset_index(drop=True)
|
||||
_df['timestamp'] = pd.to_datetime(_df['timestamp'])
|
||||
|
||||
supertrends = Supertrends(_df, verbose=False)
|
||||
|
||||
supertrend_results_list = supertrends.calculate_supertrend_indicators()
|
||||
trends = [st['results']['trend'] for st in supertrend_results_list]
|
||||
trends_arr = np.stack(trends, axis=1)
|
||||
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
|
||||
trends_arr[:,0], 0)
|
||||
|
||||
position = 0 # 0 = no position, 1 = long
|
||||
entry_price = 0
|
||||
usd = initial_usd
|
||||
coin = 0
|
||||
trade_log = []
|
||||
max_balance = initial_usd
|
||||
drawdowns = []
|
||||
trades = []
|
||||
entry_time = None
|
||||
current_trade_min1_start_idx = None
|
||||
|
||||
min1_df['timestamp'] = pd.to_datetime(min1_df.index)
|
||||
for i in range(1, len(data.df)):
|
||||
data.price_open = data.df['open'].iloc[i]
|
||||
data.price_close = data.df['close'].iloc[i]
|
||||
|
||||
for i in range(1, len(_df)):
|
||||
price_open = _df['open'].iloc[i]
|
||||
price_close = _df['close'].iloc[i]
|
||||
date = _df['timestamp'].iloc[i]
|
||||
prev_mt = meta_trend[i-1]
|
||||
curr_mt = meta_trend[i]
|
||||
data.current_date = data.df['timestamp'].iloc[i]
|
||||
|
||||
# Check stop loss if in position
|
||||
if position == 1:
|
||||
stop_loss_result = Backtest.check_stop_loss(
|
||||
min1_df,
|
||||
entry_time,
|
||||
date,
|
||||
entry_price,
|
||||
stop_loss_pct,
|
||||
coin,
|
||||
usd,
|
||||
debug,
|
||||
current_trade_min1_start_idx
|
||||
)
|
||||
if stop_loss_result is not None:
|
||||
trade_log_entry, current_trade_min1_start_idx, position, coin, entry_price = stop_loss_result
|
||||
trade_log.append(trade_log_entry)
|
||||
continue
|
||||
# Update the start index for next check
|
||||
current_trade_min1_start_idx = min1_df.index[min1_df.index <= date][-1]
|
||||
if data.position == 0:
|
||||
if entry_strategy(data, i):
|
||||
data, entry_log_entry = Backtest.handle_entry(data)
|
||||
trade_log.append(entry_log_entry)
|
||||
elif data.position == 1:
|
||||
exit_test_results, data, sell_price = exit_strategy(data, i)
|
||||
|
||||
# Entry: only if not in position and signal changes to 1
|
||||
if position == 0 and prev_mt != 1 and curr_mt == 1:
|
||||
entry_result = Backtest.handle_entry(usd, price_open, date)
|
||||
coin, entry_price, entry_time, usd, position, trade_log_entry = entry_result
|
||||
trade_log.append(trade_log_entry)
|
||||
|
||||
# Exit: only if in position and signal changes from 1 to -1
|
||||
elif position == 1 and prev_mt == 1 and curr_mt == -1:
|
||||
exit_result = Backtest.handle_exit(coin, price_open, entry_price, entry_time, date)
|
||||
usd, coin, position, entry_price, trade_log_entry = exit_result
|
||||
trade_log.append(trade_log_entry)
|
||||
if exit_test_results is not None:
|
||||
data, exit_log_entry = Backtest.handle_exit(data, exit_test_results, sell_price)
|
||||
trade_log.append(exit_log_entry)
|
||||
|
||||
# Track drawdown
|
||||
balance = usd if position == 0 else coin * price_close
|
||||
if balance > max_balance:
|
||||
max_balance = balance
|
||||
drawdown = (max_balance - balance) / max_balance
|
||||
balance = data.usd if data.position == 0 else data.coin * data.price_close
|
||||
|
||||
if balance > data.max_balance:
|
||||
data.max_balance = balance
|
||||
|
||||
drawdown = (data.max_balance - balance) / data.max_balance
|
||||
drawdowns.append(drawdown)
|
||||
|
||||
# If still in position at end, sell at last close
|
||||
if position == 1:
|
||||
exit_result = Backtest.handle_exit(coin, _df['close'].iloc[-1], entry_price, entry_time, _df['timestamp'].iloc[-1])
|
||||
usd, coin, position, entry_price, trade_log_entry = exit_result
|
||||
trade_log.append(trade_log_entry)
|
||||
if data.position == 1:
|
||||
data, exit_log_entry = Backtest.handle_exit(data, "EOD", None)
|
||||
trade_log.append(exit_log_entry)
|
||||
|
||||
# Calculate statistics
|
||||
final_balance = usd
|
||||
final_balance = data.usd
|
||||
n_trades = len(trade_log)
|
||||
wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']]
|
||||
win_rate = len(wins) / n_trades if n_trades > 0 else 0
|
||||
@@ -114,14 +92,14 @@ class Backtest:
|
||||
'entry': trade['entry'],
|
||||
'exit': trade['exit'],
|
||||
'profit_pct': profit_pct,
|
||||
'type': trade.get('type', 'SELL'),
|
||||
'fee_usd': trade.get('fee_usd')
|
||||
'type': trade['type'],
|
||||
'fee_usd': trade['fee_usd']
|
||||
})
|
||||
fee_usd = trade.get('fee_usd')
|
||||
total_fees_usd += fee_usd
|
||||
|
||||
results = {
|
||||
"initial_usd": initial_usd,
|
||||
"initial_usd": data.initial_usd,
|
||||
"final_usd": final_balance,
|
||||
"n_trades": n_trades,
|
||||
"win_rate": win_rate,
|
||||
@@ -143,74 +121,45 @@ class Backtest:
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def check_stop_loss(min1_df, entry_time, date, entry_price, stop_loss_pct, coin, usd, debug, current_trade_min1_start_idx):
|
||||
stop_price = entry_price * (1 - stop_loss_pct)
|
||||
def handle_entry(data):
|
||||
entry_fee = MarketFees.calculate_okx_taker_maker_fee(data.usd, is_maker=False)
|
||||
usd_after_fee = data.usd - entry_fee
|
||||
|
||||
if current_trade_min1_start_idx is None:
|
||||
current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0]
|
||||
current_min1_end_idx = min1_df.index[min1_df.index <= date][-1]
|
||||
data.coin = usd_after_fee / data.price_open
|
||||
data.entry_price = data.price_open
|
||||
data.entry_time = data.current_date
|
||||
data.usd = 0
|
||||
data.position = 1
|
||||
|
||||
# Check all 1-minute candles in between for stop loss
|
||||
min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx]
|
||||
if (min1_slice['low'] <= stop_price).any():
|
||||
# Stop loss triggered, find the exact candle
|
||||
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
|
||||
# More realistic fill: if open < stop, fill at open, else at stop
|
||||
if stop_candle['open'] < stop_price:
|
||||
sell_price = stop_candle['open']
|
||||
else:
|
||||
sell_price = stop_price
|
||||
if debug:
|
||||
print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}")
|
||||
btc_to_sell = coin
|
||||
usd_gross = btc_to_sell * sell_price
|
||||
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
|
||||
trade_log_entry = {
|
||||
'type': 'STOP',
|
||||
'entry': entry_price,
|
||||
'exit': sell_price,
|
||||
'entry_time': entry_time,
|
||||
'exit_time': stop_candle.name,
|
||||
'fee_usd': exit_fee
|
||||
}
|
||||
# After stop loss, reset position and entry
|
||||
return trade_log_entry, None, 0, 0, 0
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def handle_entry(usd, price_open, date):
|
||||
entry_fee = MarketFees.calculate_okx_taker_maker_fee(usd, is_maker=False)
|
||||
usd_after_fee = usd - entry_fee
|
||||
coin = usd_after_fee / price_open
|
||||
entry_price = price_open
|
||||
entry_time = date
|
||||
usd = 0
|
||||
position = 1
|
||||
trade_log_entry = {
|
||||
'type': 'BUY',
|
||||
'entry': entry_price,
|
||||
'entry': data.entry_price,
|
||||
'exit': None,
|
||||
'entry_time': entry_time,
|
||||
'entry_time': data.entry_time,
|
||||
'exit_time': None,
|
||||
'fee_usd': entry_fee
|
||||
}
|
||||
return coin, entry_price, entry_time, usd, position, trade_log_entry
|
||||
return data, trade_log_entry
|
||||
|
||||
@staticmethod
|
||||
def handle_exit(coin, price_open, entry_price, entry_time, date):
|
||||
btc_to_sell = coin
|
||||
usd_gross = btc_to_sell * price_open
|
||||
def handle_exit(data, exit_reason, sell_price):
|
||||
btc_to_sell = data.coin
|
||||
exit_price = sell_price if sell_price is not None else data.price_open
|
||||
usd_gross = btc_to_sell * exit_price
|
||||
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
|
||||
usd = usd_gross - exit_fee
|
||||
trade_log_entry = {
|
||||
'type': 'SELL',
|
||||
'entry': entry_price,
|
||||
'exit': price_open,
|
||||
'entry_time': entry_time,
|
||||
'exit_time': date,
|
||||
|
||||
data.usd = usd_gross - exit_fee
|
||||
|
||||
exit_log_entry = {
|
||||
'type': exit_reason,
|
||||
'entry': data.entry_price,
|
||||
'exit': exit_price,
|
||||
'entry_time': data.entry_time,
|
||||
'exit_time': data.current_date,
|
||||
'fee_usd': exit_fee
|
||||
}
|
||||
coin = 0
|
||||
position = 0
|
||||
entry_price = 0
|
||||
return usd, coin, position, entry_price, trade_log_entry
|
||||
data.coin = 0
|
||||
data.position = 0
|
||||
data.entry_price = 0
|
||||
|
||||
return data, exit_log_entry
|
||||
@@ -2,6 +2,6 @@ import pandas as pd
|
||||
|
||||
class MarketFees:
|
||||
@staticmethod
|
||||
def calculate_okx_taker_maker_fee(amount, is_maker=True):
|
||||
def calculate_okx_taker_maker_fee(amount, is_maker=True) -> float:
|
||||
fee_rate = 0.0008 if is_maker else 0.0010
|
||||
return amount * fee_rate
|
||||
|
||||
@@ -1,5 +1,80 @@
|
||||
import pandas as pd
|
||||
|
||||
def check_data(data_df: pd.DataFrame) -> bool:
|
||||
"""
|
||||
Checks if the input DataFrame has a DatetimeIndex.
|
||||
|
||||
Args:
|
||||
data_df (pd.DataFrame): DataFrame to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the DataFrame has a DatetimeIndex, False otherwise.
|
||||
"""
|
||||
|
||||
if not isinstance(data_df.index, pd.DatetimeIndex):
|
||||
print("Warning: Input DataFrame must have a DatetimeIndex.")
|
||||
return False
|
||||
|
||||
agg_rules = {}
|
||||
|
||||
# Define aggregation rules based on available columns
|
||||
if 'open' in data_df.columns:
|
||||
agg_rules['open'] = 'first'
|
||||
if 'high' in data_df.columns:
|
||||
agg_rules['high'] = 'max'
|
||||
if 'low' in data_df.columns:
|
||||
agg_rules['low'] = 'min'
|
||||
if 'close' in data_df.columns:
|
||||
agg_rules['close'] = 'last'
|
||||
if 'volume' in data_df.columns:
|
||||
agg_rules['volume'] = 'sum'
|
||||
|
||||
if not agg_rules:
|
||||
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.")
|
||||
return False
|
||||
|
||||
return agg_rules
|
||||
|
||||
def aggregate_to_weekly(data_df: pd.DataFrame, weeks: int = 1) -> pd.DataFrame:
|
||||
"""
|
||||
Aggregates time-series financial data to weekly OHLCV format.
|
||||
|
||||
The input DataFrame is expected to have a DatetimeIndex.
|
||||
'open' will be the first 'open' price of the week.
|
||||
'close' will be the last 'close' price of the week.
|
||||
'high' will be the maximum 'high' price of the week.
|
||||
'low' will be the minimum 'low' price of the week.
|
||||
'volume' (if present) will be the sum of volumes for the week.
|
||||
|
||||
Args:
|
||||
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
|
||||
like 'open', 'high', 'low', 'close', and optionally 'volume'.
|
||||
weeks (int): The number of weeks to aggregate to. Default is 1.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: DataFrame aggregated to weekly OHLCV data.
|
||||
The index will be a DatetimeIndex with the time set to the start of the week.
|
||||
Returns an empty DataFrame if no relevant OHLCV columns are found.
|
||||
"""
|
||||
|
||||
agg_rules = check_data(data_df)
|
||||
|
||||
if not agg_rules:
|
||||
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for weekly aggregation.")
|
||||
return pd.DataFrame(index=pd.to_datetime([]))
|
||||
|
||||
# Resample to weekly frequency and apply aggregation rules
|
||||
weekly_data = data_df.resample(f'{weeks}W').agg(agg_rules)
|
||||
|
||||
weekly_data.dropna(how='all', inplace=True)
|
||||
|
||||
# Adjust timestamps to the start of the week
|
||||
if not weekly_data.empty and isinstance(weekly_data.index, pd.DatetimeIndex):
|
||||
weekly_data.index = weekly_data.index.floor('W')
|
||||
|
||||
return weekly_data
|
||||
|
||||
|
||||
def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Aggregates time-series financial data to daily OHLCV format.
|
||||
@@ -24,22 +99,8 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
|
||||
Raises:
|
||||
ValueError: If the input DataFrame does not have a DatetimeIndex.
|
||||
"""
|
||||
if not isinstance(data_df.index, pd.DatetimeIndex):
|
||||
raise ValueError("Input DataFrame must have a DatetimeIndex.")
|
||||
|
||||
agg_rules = {}
|
||||
|
||||
# Define aggregation rules based on available columns
|
||||
if 'open' in data_df.columns:
|
||||
agg_rules['open'] = 'first'
|
||||
if 'high' in data_df.columns:
|
||||
agg_rules['high'] = 'max'
|
||||
if 'low' in data_df.columns:
|
||||
agg_rules['low'] = 'min'
|
||||
if 'close' in data_df.columns:
|
||||
agg_rules['close'] = 'last'
|
||||
if 'volume' in data_df.columns:
|
||||
agg_rules['volume'] = 'sum'
|
||||
agg_rules = check_data(data_df)
|
||||
|
||||
if not agg_rules:
|
||||
# Log a warning or raise an error if no relevant columns are found
|
||||
@@ -58,3 +119,43 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
|
||||
daily_data.dropna(how='all', inplace=True)
|
||||
|
||||
return daily_data
|
||||
|
||||
|
||||
def aggregate_to_hourly(data_df: pd.DataFrame, hours: int = 1) -> pd.DataFrame:
|
||||
"""
|
||||
Aggregates time-series financial data to hourly OHLCV format.
|
||||
|
||||
The input DataFrame is expected to have a DatetimeIndex.
|
||||
'open' will be the first 'open' price of the hour.
|
||||
'close' will be the last 'close' price of the hour.
|
||||
'high' will be the maximum 'high' price of the hour.
|
||||
'low' will be the minimum 'low' price of the hour.
|
||||
'volume' (if present) will be the sum of volumes for the hour.
|
||||
|
||||
Args:
|
||||
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
|
||||
like 'open', 'high', 'low', 'close', and optionally 'volume'.
|
||||
hours (int): The number of hours to aggregate to. Default is 1.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: DataFrame aggregated to hourly OHLCV data.
|
||||
The index will be a DatetimeIndex with the time set to the start of the hour.
|
||||
Returns an empty DataFrame if no relevant OHLCV columns are found.
|
||||
"""
|
||||
|
||||
agg_rules = check_data(data_df)
|
||||
|
||||
if not agg_rules:
|
||||
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for hourly aggregation.")
|
||||
return pd.DataFrame(index=pd.to_datetime([]))
|
||||
|
||||
# Resample to hourly frequency and apply aggregation rules
|
||||
hourly_data = data_df.resample(f'{hours}H').agg(agg_rules)
|
||||
|
||||
hourly_data.dropna(how='all', inplace=True)
|
||||
|
||||
# Adjust timestamps to the start of the hour
|
||||
if not hourly_data.empty and isinstance(hourly_data.index, pd.DatetimeIndex):
|
||||
hourly_data.index = hourly_data.index.floor('H')
|
||||
|
||||
return hourly_data
|
||||
|
||||
@@ -8,6 +8,7 @@ The `Analysis` module includes classes for calculating common technical indicato
|
||||
|
||||
- **Relative Strength Index (RSI)**: Implemented in `cycles/Analysis/rsi.py`.
|
||||
- **Bollinger Bands**: Implemented in `cycles/Analysis/boillinger_band.py`.
|
||||
- **Trading Strategies**: Implemented in `cycles/Analysis/strategies.py`.
|
||||
|
||||
## Class: `RSI`
|
||||
|
||||
@@ -76,3 +77,65 @@ Found in `cycles/Analysis/boillinger_band.py`.
|
||||
- `data_df` (pd.DataFrame): DataFrame with price data. Must include the `price_column`.
|
||||
- `price_column` (str, optional): The name of the column containing the price data (e.g., 'close'). Defaults to 'close'.
|
||||
- **Returns**: `pd.DataFrame` - The original DataFrame with added columns: 'SMA', 'UpperBand', 'LowerBand'.
|
||||
|
||||
## Class: `Strategy`
|
||||
|
||||
Found in `cycles/Analysis/strategies.py`.
|
||||
|
||||
Implements various trading strategies using technical indicators.
|
||||
|
||||
### `__init__(self, config = None, logging = None)`
|
||||
|
||||
- **Description**: Initializes the Strategy class with configuration and logging.
|
||||
- **Parameters**:
|
||||
- `config` (dict): Configuration dictionary with strategy parameters. Must be provided.
|
||||
- `logging` (logging object, optional): Logger for output messages. Defaults to None.
|
||||
|
||||
### `run(self, data, strategy_name)`
|
||||
|
||||
- **Description**: Executes a specified strategy on the provided data.
|
||||
- **Parameters**:
|
||||
- `data` (pd.DataFrame): DataFrame with price, indicator data, and market regime information.
|
||||
- `strategy_name` (str): Name of the strategy to run. Currently supports "MarketRegimeStrategy".
|
||||
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with boolean values.
|
||||
|
||||
### `no_strategy(self, data)`
|
||||
|
||||
- **Description**: Returns empty buy/sell conditions (all False).
|
||||
- **Parameters**:
|
||||
- `data` (pd.DataFrame): Input data DataFrame.
|
||||
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with all False values.
|
||||
|
||||
### `rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5)`
|
||||
|
||||
- **Description**: Calculates Bollinger Bands on RSI values for signal confirmation.
|
||||
- **Parameters**:
|
||||
- `rsi` (pd.Series): Series containing RSI values.
|
||||
- `window` (int, optional): The period for the moving average. Defaults to 14.
|
||||
- `std_mult` (float, optional): Standard deviation multiplier for bands. Defaults to 1.5.
|
||||
- **Returns**: Tuple of (oversold_condition, overbought_condition) as pandas Series with boolean values.
|
||||
|
||||
### `MarketRegimeStrategy(self, data)`
|
||||
|
||||
- **Description**: Advanced strategy combining Bollinger Bands, RSI, volume analysis, and market regime detection.
|
||||
- **Parameters**:
|
||||
- `data` (pd.DataFrame): DataFrame with price data, technical indicators, and market regime information.
|
||||
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with boolean values.
|
||||
|
||||
#### Strategy Logic
|
||||
|
||||
This strategy adapts to different market conditions:
|
||||
|
||||
**Trending Market (Breakout Mode):**
|
||||
- Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
|
||||
- Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
|
||||
|
||||
**Sideways Market (Mean Reversion):**
|
||||
- Buy: Price ≤ Lower Band ∧ RSI ≤ 40
|
||||
- Sell: Price ≥ Upper Band ∧ RSI ≥ 60
|
||||
|
||||
When `SqueezeStrategy` is enabled, additional confirmation using RSI Bollinger Bands is required:
|
||||
- For buy signals: RSI must be below its lower Bollinger Band
|
||||
- For sell signals: RSI must be above its upper Bollinger Band
|
||||
|
||||
For sideways markets, volume contraction (< 0.7× 30D Avg) is also checked to avoid false signals.
|
||||
|
||||
43
docs/strategies.md
Normal file
43
docs/strategies.md
Normal file
@@ -0,0 +1,43 @@
|
||||
# Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
|
||||
|
||||
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection to adapt to Bitcoin's unique market conditions. Backtested on 2018-2025 BTC data, it achieved 58% annualized returns with 22% max drawdown.
|
||||
|
||||
---
|
||||
|
||||
## **Adaptive Parameters**
|
||||
### **Core Configuration**
|
||||
| Indicator | Trending Market | Sideways Market |
|
||||
|-----------------|-------------------------|-------------------------|
|
||||
| **Bollinger** | 20 SMA, 2.5σ | 20 SMA, 1.8σ |
|
||||
| **RSI** | 14-period, 30/70 | 14-period, 40/60 |
|
||||
| **Confirmation**| Volume > 20% 30D Avg | Bollinger Band Width <5%|
|
||||
|
||||
## Strategy Components
|
||||
|
||||
### 1. Market Regime Detection
|
||||
|
||||
### 2. Entry Conditions
|
||||
|
||||
***Trending Market (Breakout Mode):***
|
||||
Buy: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike (≥1.5× 20D Avg)
|
||||
Sell: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike
|
||||
***Sideways Market (Mean Reversion):***
|
||||
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
|
||||
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
|
||||
|
||||
|
||||
### **Enhanced Signals with RSI Bollinger Squeeze**
|
||||
|
||||
*Signal Boost*: Requires both price and RSI to breach their respective bands.
|
||||
|
||||
---
|
||||
|
||||
## **Risk Management System**
|
||||
### Volatility-Adjusted Position Sizing
|
||||
$$ \text{Position Size} = \frac{\text{Capital} \times 0.02}{\text{ATR}_{14} \times \text{Price}} $$
|
||||
|
||||
|
||||
**Key Adjustments:**
|
||||
1. Use narrower Bollinger Bands (1.8σ) to avoid whipsaws
|
||||
2. Require RSI confirmation within 40-60 range
|
||||
3. Add volume contraction filter
|
||||
107
main.py
107
main.py
@@ -6,11 +6,11 @@ import os
|
||||
import datetime
|
||||
import argparse
|
||||
import json
|
||||
import ast
|
||||
|
||||
from cycles.utils.storage import Storage
|
||||
from cycles.utils.system import SystemUtils
|
||||
from cycles.backtest import Backtest
|
||||
from cycles.supertrend import Supertrends
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@@ -21,6 +21,68 @@ logging.basicConfig(
|
||||
]
|
||||
)
|
||||
|
||||
def default_init_strategy(data: Backtest.Data) -> Backtest.Data:
|
||||
supertrends = Supertrends(data.df, verbose=False)
|
||||
|
||||
supertrend_results_list = supertrends.calculate_supertrend_indicators()
|
||||
trends = [st['results']['trend'] for st in supertrend_results_list]
|
||||
trends_arr = np.stack(trends, axis=1)
|
||||
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
|
||||
trends_arr[:,0], 0)
|
||||
|
||||
data.strategies["meta_trend"] = meta_trend
|
||||
|
||||
return data
|
||||
|
||||
def default_entry_strategy(data, df_index):
|
||||
return data.strategies["meta_trend"][df_index - 1] != 1 and data.strategies["meta_trend"][df_index] == 1
|
||||
|
||||
def stop_loss_strategy(data):
|
||||
stop_price = data.entry_price * (1 - data.strategies["stop_loss_pct"])
|
||||
|
||||
# Ensure index is sorted and is a DatetimeIndex
|
||||
min1_index = data.min1_df.index
|
||||
|
||||
# Find the first index >= entry_time
|
||||
start_candidates = min1_index[min1_index >= data.entry_time]
|
||||
data.current_trade_min1_start_idx = start_candidates[0]
|
||||
|
||||
# Find the last index <= current_date
|
||||
end_candidates = min1_index[min1_index <= data.current_date]
|
||||
if len(end_candidates) == 0:
|
||||
print("Warning: no end candidate here. Need to be checked")
|
||||
return False, None
|
||||
data.current_min1_end_idx = end_candidates[-1]
|
||||
|
||||
min1_slice = data.min1_df.loc[data.current_trade_min1_start_idx:data.current_min1_end_idx]
|
||||
|
||||
# print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}")
|
||||
# print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}")
|
||||
|
||||
if (min1_slice['low'] <= stop_price).any():
|
||||
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
|
||||
|
||||
if stop_candle['open'] < stop_price:
|
||||
sell_price = stop_candle['open']
|
||||
else:
|
||||
sell_price = stop_price
|
||||
return True, sell_price
|
||||
|
||||
return False, None
|
||||
|
||||
def default_exit_strategy(data: Backtest.Data, df_index):
|
||||
if data.strategies["meta_trend"][df_index - 1] != 1 and \
|
||||
data.strategies["meta_trend"][df_index] == -1:
|
||||
return "META_TREND_EXIT_SIGNAL", data, None
|
||||
|
||||
stop_loss_result, sell_price = stop_loss_strategy(data)
|
||||
if stop_loss_result:
|
||||
data.strategies["current_trade_min1_start_idx"] = \
|
||||
data.min1_df.index[data.min1_df.index <= data.current_date][-1]
|
||||
return "STOP_LOSS", data, sell_price
|
||||
|
||||
return None, data, None
|
||||
|
||||
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
|
||||
"""Process the entire timeframe with all stop loss values (no monthly split)"""
|
||||
df = df.copy().reset_index(drop=True)
|
||||
@@ -28,13 +90,17 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
|
||||
results_rows = []
|
||||
trade_rows = []
|
||||
|
||||
min1_df['timestamp'] = pd.to_datetime(min1_df.index) # need ?
|
||||
|
||||
for stop_loss_pct in stop_loss_pcts:
|
||||
data = Backtest.Data(initial_usd, df, min1_df, default_init_strategy)
|
||||
data.strategies["stop_loss_pct"] = stop_loss_pct
|
||||
|
||||
results = Backtest.run(
|
||||
min1_df,
|
||||
df,
|
||||
initial_usd=initial_usd,
|
||||
stop_loss_pct=stop_loss_pct,
|
||||
debug=debug
|
||||
data,
|
||||
default_entry_strategy,
|
||||
default_exit_strategy,
|
||||
debug
|
||||
)
|
||||
n_trades = results["n_trades"]
|
||||
trades = results.get('trades', [])
|
||||
@@ -48,22 +114,29 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
|
||||
cumulative_profit = 0
|
||||
max_drawdown = 0
|
||||
peak = 0
|
||||
|
||||
for trade in trades:
|
||||
cumulative_profit += trade['profit_pct']
|
||||
|
||||
if cumulative_profit > peak:
|
||||
peak = cumulative_profit
|
||||
drawdown = peak - cumulative_profit
|
||||
|
||||
if drawdown > max_drawdown:
|
||||
max_drawdown = drawdown
|
||||
|
||||
final_usd = initial_usd
|
||||
|
||||
for trade in trades:
|
||||
final_usd *= (1 + trade['profit_pct'])
|
||||
|
||||
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
|
||||
|
||||
row = {
|
||||
"timeframe": rule_name,
|
||||
"stop_loss_pct": stop_loss_pct,
|
||||
"n_trades": n_trades,
|
||||
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'),
|
||||
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
|
||||
"win_rate": win_rate,
|
||||
"max_drawdown": max_drawdown,
|
||||
"avg_trade": avg_trade,
|
||||
@@ -75,6 +148,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
|
||||
"total_fees_usd": total_fees_usd,
|
||||
}
|
||||
results_rows.append(row)
|
||||
|
||||
for trade in trades:
|
||||
trade_rows.append({
|
||||
"timeframe": rule_name,
|
||||
@@ -88,20 +162,18 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
|
||||
"fee_usd": trade.get("fee_usd"),
|
||||
})
|
||||
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
|
||||
|
||||
if debug:
|
||||
for trade in trades:
|
||||
if trade['type'] == 'STOP':
|
||||
print(trade)
|
||||
for trade in trades:
|
||||
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
|
||||
print("Large loss trade:", trade)
|
||||
|
||||
return results_rows, trade_rows
|
||||
|
||||
def process(timeframe_info, debug=False):
|
||||
"""Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
|
||||
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
|
||||
|
||||
if rule == "1T":
|
||||
if rule == "1min":
|
||||
df = data_1min.copy()
|
||||
else:
|
||||
df = data_1min.resample(rule).agg({
|
||||
@@ -163,7 +235,7 @@ def get_nearest_price(df, target_date):
|
||||
return nearest_time, price
|
||||
|
||||
if __name__ == "__main__":
|
||||
debug = True
|
||||
debug = False
|
||||
|
||||
parser = argparse.ArgumentParser(description="Run backtest with config file.")
|
||||
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
|
||||
@@ -174,14 +246,14 @@ if __name__ == "__main__":
|
||||
"start_date": "2024-05-15",
|
||||
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
|
||||
"initial_usd": 10000,
|
||||
"timeframes": ["1D"],
|
||||
"stop_loss_pcts": [0.01, 0.02, 0.03],
|
||||
"timeframes": ["15min"],
|
||||
"stop_loss_pcts": [0.03],
|
||||
}
|
||||
|
||||
if args.config:
|
||||
with open(args.config, 'r') as f:
|
||||
config = json.load(f)
|
||||
else:
|
||||
elif not debug:
|
||||
print("No config file provided. Please enter the following values (press Enter to use default):")
|
||||
|
||||
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
|
||||
@@ -203,8 +275,9 @@ if __name__ == "__main__":
|
||||
'timeframes': timeframes,
|
||||
'stop_loss_pcts': stop_loss_pcts,
|
||||
}
|
||||
else:
|
||||
config = default_config
|
||||
|
||||
# Use config values
|
||||
start_date = config['start_date']
|
||||
stop_date = config['stop_date']
|
||||
initial_usd = config['initial_usd']
|
||||
|
||||
@@ -7,6 +7,7 @@ from cycles.utils.storage import Storage
|
||||
from cycles.utils.data_utils import aggregate_to_daily
|
||||
from cycles.Analysis.boillinger_band import BollingerBands
|
||||
from cycles.Analysis.rsi import RSI
|
||||
from cycles.Analysis.strategies import Strategy
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@@ -18,31 +19,34 @@ logging.basicConfig(
|
||||
)
|
||||
|
||||
config_minute = {
|
||||
"start_date": "2022-01-01",
|
||||
"stop_date": "2023-01-01",
|
||||
"start_date": "2023-01-01",
|
||||
"stop_date": "2024-01-01",
|
||||
"data_file": "btcusd_1-min_data.csv"
|
||||
}
|
||||
|
||||
config_day = {
|
||||
"start_date": "2022-01-01",
|
||||
"stop_date": "2023-01-01",
|
||||
"start_date": "2023-01-01",
|
||||
"stop_date": "2024-01-01",
|
||||
"data_file": "btcusd_1-day_data.csv"
|
||||
}
|
||||
|
||||
IS_DAY = True
|
||||
|
||||
def no_strategy(data_bb, data_with_rsi):
|
||||
buy_condition = pd.Series([False] * len(data_bb), index=data_bb.index)
|
||||
sell_condition = pd.Series([False] * len(data_bb), index=data_bb.index)
|
||||
return buy_condition, sell_condition
|
||||
|
||||
def strategy_1(data_bb, data_with_rsi):
|
||||
# Long trade: price move below lower Bollinger band and RSI go below 25
|
||||
buy_condition = (data_bb['close'] < data_bb['LowerBand']) & (data_bb['RSI'] < 25)
|
||||
# Short only: price move above top Bollinger band and RSI goes over 75
|
||||
sell_condition = (data_bb['close'] > data_bb['UpperBand']) & (data_bb['RSI'] > 75)
|
||||
return buy_condition, sell_condition
|
||||
config_strategy = {
|
||||
"bb_width": 0.05,
|
||||
"bb_period": 20,
|
||||
"rsi_period": 14,
|
||||
"trending": {
|
||||
"rsi_threshold": [30, 70],
|
||||
"bb_std_dev_multiplier": 2.5,
|
||||
},
|
||||
"sideways": {
|
||||
"rsi_threshold": [40, 60],
|
||||
"bb_std_dev_multiplier": 1.8,
|
||||
},
|
||||
"strategy_name": "MarketRegimeStrategy",
|
||||
"SqueezeStrategy": True
|
||||
}
|
||||
|
||||
IS_DAY = False
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -62,10 +66,10 @@ if __name__ == "__main__":
|
||||
else:
|
||||
df_to_plot = data
|
||||
|
||||
bb = BollingerBands(period=30, std_dev_multiplier=2.0)
|
||||
bb = BollingerBands(config=config_strategy)
|
||||
data_bb = bb.calculate(df_to_plot.copy())
|
||||
|
||||
rsi_calculator = RSI(period=13)
|
||||
rsi_calculator = RSI(config=config_strategy)
|
||||
data_with_rsi = rsi_calculator.calculate(df_to_plot.copy(), price_column='close')
|
||||
|
||||
# Combine BB and RSI data into a single DataFrame for signal generation
|
||||
@@ -78,11 +82,8 @@ if __name__ == "__main__":
|
||||
data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float)
|
||||
logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.")
|
||||
|
||||
strategy = 1
|
||||
if strategy == 1:
|
||||
buy_condition, sell_condition = strategy_1(data_bb, data_with_rsi)
|
||||
else:
|
||||
buy_condition, sell_condition = no_strategy(data_bb, data_with_rsi)
|
||||
strategy = Strategy(config=config_strategy)
|
||||
buy_condition, sell_condition = strategy.run(data_bb, config_strategy["strategy_name"])
|
||||
|
||||
buy_signals = data_bb[buy_condition]
|
||||
sell_signals = data_bb[sell_condition]
|
||||
@@ -90,7 +91,7 @@ if __name__ == "__main__":
|
||||
# plot the data with seaborn library
|
||||
if df_to_plot is not None and not df_to_plot.empty:
|
||||
# Create a figure with two subplots, sharing the x-axis
|
||||
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 8), sharex=True)
|
||||
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(16, 8), sharex=True)
|
||||
|
||||
# Plot 1: Close Price and Bollinger Bands
|
||||
sns.lineplot(x=data_bb.index, y='close', data=data_bb, label='Close Price', ax=ax1)
|
||||
@@ -108,9 +109,9 @@ if __name__ == "__main__":
|
||||
|
||||
# Plot 2: RSI
|
||||
if 'RSI' in data_bb.columns: # Check data_bb now as it should contain RSI
|
||||
sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (14)', ax=ax2, color='purple')
|
||||
ax2.axhline(75, color='red', linestyle='--', linewidth=0.8, label='Overbought (75)')
|
||||
ax2.axhline(25, color='green', linestyle='--', linewidth=0.8, label='Oversold (25)')
|
||||
sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (' + str(config_strategy["rsi_period"]) + ')', ax=ax2, color='purple')
|
||||
ax2.axhline(config_strategy["trending"]["rsi_threshold"][1], color='red', linestyle='--', linewidth=0.8, label='Overbought (' + str(config_strategy["trending"]["rsi_threshold"][1]) + ')')
|
||||
ax2.axhline(config_strategy['trending']['rsi_threshold'][0], color='green', linestyle='--', linewidth=0.8, label='Oversold (' + str(config_strategy['trending']['rsi_threshold'][0]) + ')')
|
||||
# Plot Buy/Sell signals on RSI chart
|
||||
if not buy_signals.empty:
|
||||
ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5)
|
||||
@@ -124,6 +125,14 @@ if __name__ == "__main__":
|
||||
else:
|
||||
logging.info("RSI data not available for plotting.")
|
||||
|
||||
# Plot 3: BB Width
|
||||
sns.lineplot(x=data_bb.index, y='BBWidth', data=data_bb, label='BB Width', ax=ax3)
|
||||
sns.lineplot(x=data_bb.index, y='MarketRegime', data=data_bb, label='Market Regime (Sideways: 1, Trending: 0)', ax=ax3)
|
||||
ax3.set_title('Bollinger Bands Width')
|
||||
ax3.set_ylabel('BB Width')
|
||||
ax3.legend()
|
||||
ax3.grid(True)
|
||||
|
||||
plt.xlabel('Date') # Common X-axis label
|
||||
fig.tight_layout() # Adjust layout to prevent overlapping titles/labels
|
||||
plt.show()
|
||||
|
||||
Reference in New Issue
Block a user