3 Commits

Author SHA1 Message Date
Simon Moisy
1284549106 progress print 2025-05-29 11:04:03 +08:00
Simon Moisy
5f03524d6a never fallback to default values for fee_usd 2025-05-28 02:50:40 +08:00
Simon Moisy
74c8048ed5 shifted one day back on the metatrend to avoid lookahead bias, reverted metatrend calculus to use no cpu optimization for readability 2025-05-27 17:49:55 +08:00
10 changed files with 203 additions and 680 deletions

View File

@@ -4,25 +4,23 @@ class BollingerBands:
""" """
Calculates Bollinger Bands for given financial data. Calculates Bollinger Bands for given financial data.
""" """
def __init__(self, config): def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0):
""" """
Initializes the BollingerBands calculator. Initializes the BollingerBands calculator.
Args: Args:
period (int): The period for the moving average and standard deviation. period (int): The period for the moving average and standard deviation.
std_dev_multiplier (float): The number of standard deviations for the upper and lower bands. std_dev_multiplier (float): The number of standard deviations for the upper and lower bands.
bb_width (float): The width of the Bollinger Bands.
""" """
if config['bb_period'] <= 0: if period <= 0:
raise ValueError("Period must be a positive integer.") raise ValueError("Period must be a positive integer.")
if config['trending']['bb_std_dev_multiplier'] <= 0 or config['sideways']['bb_std_dev_multiplier'] <= 0: if std_dev_multiplier <= 0:
raise ValueError("Standard deviation multiplier must be positive.") raise ValueError("Standard deviation multiplier must be positive.")
if config['bb_width'] <= 0:
raise ValueError("BB width must be positive.")
self.config = config self.period = period
self.std_dev_multiplier = std_dev_multiplier
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close', squeeze = False) -> pd.DataFrame: def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
""" """
Calculates Bollinger Bands and adds them to the DataFrame. Calculates Bollinger Bands and adds them to the DataFrame.
@@ -39,37 +37,14 @@ class BollingerBands:
if price_column not in data_df.columns: if price_column not in data_df.columns:
raise ValueError(f"Price column '{price_column}' not found in DataFrame.") raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
if not squeeze:
# Calculate SMA # Calculate SMA
data_df['SMA'] = data_df[price_column].rolling(window=self.config['bb_period']).mean() data_df['SMA'] = data_df[price_column].rolling(window=self.period).mean()
# Calculate Standard Deviation # Calculate Standard Deviation
std_dev = data_df[price_column].rolling(window=self.config['bb_period']).std() std_dev = data_df[price_column].rolling(window=self.period).std()
# Calculate Upper and Lower Bands # Calculate Upper and Lower Bands
data_df['UpperBand'] = data_df['SMA'] + (2.0* std_dev) data_df['UpperBand'] = data_df['SMA'] + (self.std_dev_multiplier * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (2.0* std_dev) data_df['LowerBand'] = data_df['SMA'] - (self.std_dev_multiplier * std_dev)
# Calculate the width of the Bollinger Bands
data_df['BBWidth'] = (data_df['UpperBand'] - data_df['LowerBand']) / data_df['SMA']
# Calculate the market regime
# 1 = sideways, 0 = trending
data_df['MarketRegime'] = (data_df['BBWidth'] < self.config['bb_width']).astype(int)
if data_df['MarketRegime'].sum() > 0:
data_df['UpperBand'] = data_df['SMA'] + (self.config['trending']['bb_std_dev_multiplier'] * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (self.config['trending']['bb_std_dev_multiplier'] * std_dev)
else:
data_df['UpperBand'] = data_df['SMA'] + (self.config['sideways']['bb_std_dev_multiplier'] * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (self.config['sideways']['bb_std_dev_multiplier'] * std_dev)
else:
data_df['SMA'] = data_df[price_column].rolling(window=14).mean()
# Calculate Standard Deviation
std_dev = data_df[price_column].rolling(window=14).std()
# Calculate Upper and Lower Bands
data_df['UpperBand'] = data_df['SMA'] + 1.5* std_dev
data_df['LowerBand'] = data_df['SMA'] - 1.5* std_dev
return data_df return data_df

View File

@@ -5,7 +5,7 @@ class RSI:
""" """
A class to calculate the Relative Strength Index (RSI). A class to calculate the Relative Strength Index (RSI).
""" """
def __init__(self, config): def __init__(self, period: int = 14):
""" """
Initializes the RSI calculator. Initializes the RSI calculator.
@@ -13,9 +13,9 @@ class RSI:
period (int): The period for RSI calculation. Default is 14. period (int): The period for RSI calculation. Default is 14.
Must be a positive integer. Must be a positive integer.
""" """
if not isinstance(config['rsi_period'], int) or config['rsi_period'] <= 0: if not isinstance(period, int) or period <= 0:
raise ValueError("Period must be a positive integer.") raise ValueError("Period must be a positive integer.")
self.period = config['rsi_period'] self.period = period
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame: def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
""" """

View File

@@ -1,131 +0,0 @@
import pandas as pd
import numpy as np
from cycles.Analysis.boillinger_band import BollingerBands
class Strategy:
def __init__(self, config = None, logging = None):
if config is None:
raise ValueError("Config must be provided.")
self.config = config
self.logging = logging
def run(self, data, strategy_name):
if strategy_name == "MarketRegimeStrategy":
return self.MarketRegimeStrategy(data)
else:
if self.logging is not None:
self.logging.warning(f"Strategy {strategy_name} not found. Using no_strategy instead.")
return self.no_strategy(data)
def no_strategy(self, data):
"""No strategy: returns False for both buy and sell conditions"""
buy_condition = pd.Series([False] * len(data), index=data.index)
sell_condition = pd.Series([False] * len(data), index=data.index)
return buy_condition, sell_condition
def rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5):
"""Calculate RSI Bollinger Bands for confirmation
Args:
rsi (Series): RSI values
window (int): Rolling window for SMA
std_mult (float): Standard deviation multiplier
Returns:
tuple: (oversold condition, overbought condition)
"""
valid_rsi = ~rsi.isna()
if not valid_rsi.any():
# Return empty Series if no valid RSI data
return pd.Series(False, index=rsi.index), pd.Series(False, index=rsi.index)
rsi_sma = rsi.rolling(window).mean()
rsi_std = rsi.rolling(window).std()
upper_rsi_band = rsi_sma + std_mult * rsi_std
lower_rsi_band = rsi_sma - std_mult * rsi_std
return (rsi < lower_rsi_band), (rsi > upper_rsi_band)
def MarketRegimeStrategy(self, data):
"""Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
with adaptive Bollinger Bands
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection
to adapt to Bitcoin's unique market conditions.
Entry Conditions:
- Trending Market (Breakout Mode):
Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
- Sideways Market (Mean Reversion):
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
Enhanced with RSI Bollinger Squeeze for signal confirmation when enabled.
"""
# Initialize conditions as all False
buy_condition = pd.Series(False, index=data.index)
sell_condition = pd.Series(False, index=data.index)
# Create masks for different market regimes
sideways_mask = data['MarketRegime'] > 0
trending_mask = data['MarketRegime'] <= 0
valid_data_mask = ~data['MarketRegime'].isna() # Handle potential NaN values
# Calculate volume spike (≥1.5× 20D Avg)
if 'volume' in data.columns:
volume_20d_avg = data['volume'].rolling(window=20).mean()
volume_spike = data['volume'] >= 1.5 * volume_20d_avg
# Additional volume contraction filter for sideways markets
volume_30d_avg = data['volume'].rolling(window=30).mean()
volume_contraction = data['volume'] < 0.7 * volume_30d_avg
else:
# If volume data is not available, assume no volume spike
volume_spike = pd.Series(False, index=data.index)
volume_contraction = pd.Series(False, index=data.index)
if self.logging is not None:
self.logging.warning("Volume data not available. Volume conditions will not be triggered.")
# Calculate RSI Bollinger Squeeze confirmation
if 'RSI' in data.columns:
oversold_rsi, overbought_rsi = self.rsi_bollinger_confirmation(data['RSI'])
else:
oversold_rsi = pd.Series(False, index=data.index)
overbought_rsi = pd.Series(False, index=data.index)
if self.logging is not None:
self.logging.warning("RSI data not available. RSI Bollinger Squeeze will not be triggered.")
# Calculate conditions for sideways market (Mean Reversion)
if sideways_mask.any():
sideways_buy = (data['close'] <= data['LowerBand']) & (data['RSI'] <= 40)
sideways_sell = (data['close'] >= data['UpperBand']) & (data['RSI'] >= 60)
# Add enhanced confirmation for sideways markets
if self.config.get("SqueezeStrategy", False):
sideways_buy = sideways_buy & oversold_rsi & volume_contraction
sideways_sell = sideways_sell & overbought_rsi & volume_contraction
# Apply only where market is sideways and data is valid
buy_condition = buy_condition | (sideways_buy & sideways_mask & valid_data_mask)
sell_condition = sell_condition | (sideways_sell & sideways_mask & valid_data_mask)
# Calculate conditions for trending market (Breakout Mode)
if trending_mask.any():
trending_buy = (data['close'] < data['LowerBand']) & (data['RSI'] < 50) & volume_spike
trending_sell = (data['close'] > data['UpperBand']) & (data['RSI'] > 50) & volume_spike
# Add enhanced confirmation for trending markets
if self.config.get("SqueezeStrategy", False):
trending_buy = trending_buy & oversold_rsi
trending_sell = trending_sell & overbought_rsi
# Apply only where market is trending and data is valid
buy_condition = buy_condition | (trending_buy & trending_mask & valid_data_mask)
sell_condition = sell_condition | (trending_sell & trending_mask & valid_data_mask)
return buy_condition, sell_condition

View File

@@ -1,5 +1,6 @@
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import time
from cycles.supertrend import Supertrends from cycles.supertrend import Supertrends
from cycles.market_fees import MarketFees from cycles.market_fees import MarketFees
@@ -27,12 +28,14 @@ class Backtest:
trends_arr = np.stack(trends, axis=1) trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0) trends_arr[:,0], 0)
# Shift meta_trend by one to avoid lookahead bias
meta_trend_signal = np.roll(meta_trend, 1)
meta_trend_signal[0] = 0 # or np.nan, but 0 means 'no signal' for first bar
position = 0 # 0 = no position, 1 = long position = 0 # 0 = no position, 1 = long
entry_price = 0 entry_price = 0
usd = initial_usd usd = initial_usd
coin = 0 coin = 0
trade_log = [] trade_log = []
max_balance = initial_usd max_balance = initial_usd
drawdowns = [] drawdowns = []
@@ -40,14 +43,22 @@ class Backtest:
entry_time = None entry_time = None
current_trade_min1_start_idx = None current_trade_min1_start_idx = None
min1_df['timestamp'] = pd.to_datetime(min1_df.index) min1_df.index = pd.to_datetime(min1_df.index)
min1_timestamps = min1_df.index.values
last_print_time = time.time()
for i in range(1, len(_df)): for i in range(1, len(_df)):
current_time = time.time()
if current_time - last_print_time >= 5:
progress = (i / len(_df)) * 100
print(f"\rProgress: {progress:.1f}%", end="", flush=True)
last_print_time = current_time
price_open = _df['open'].iloc[i] price_open = _df['open'].iloc[i]
price_close = _df['close'].iloc[i] price_close = _df['close'].iloc[i]
date = _df['timestamp'].iloc[i] date = _df['timestamp'].iloc[i]
prev_mt = meta_trend[i-1] prev_mt = meta_trend_signal[i-1]
curr_mt = meta_trend[i] curr_mt = meta_trend_signal[i]
# Check stop loss if in position # Check stop loss if in position
if position == 1: if position == 1:
@@ -88,6 +99,8 @@ class Backtest:
drawdown = (max_balance - balance) / max_balance drawdown = (max_balance - balance) / max_balance
drawdowns.append(drawdown) drawdowns.append(drawdown)
print("\rProgress: 100%\r\n", end="", flush=True)
# If still in position at end, sell at last close # If still in position at end, sell at last close
if position == 1: if position == 1:
exit_result = Backtest.handle_exit(coin, _df['close'].iloc[-1], entry_price, entry_time, _df['timestamp'].iloc[-1]) exit_result = Backtest.handle_exit(coin, _df['close'].iloc[-1], entry_price, entry_time, _df['timestamp'].iloc[-1])

View File

@@ -1,70 +1,30 @@
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import logging import logging
from scipy.signal import find_peaks
from matplotlib.patches import Rectangle
from scipy import stats
import concurrent.futures
from functools import partial
from functools import lru_cache from functools import lru_cache
import matplotlib.pyplot as plt
# Color configuration
# Plot colors
DARK_BG_COLOR = '#181C27'
LEGEND_BG_COLOR = '#333333'
TITLE_COLOR = 'white'
AXIS_LABEL_COLOR = 'white'
# Candlestick colors
CANDLE_UP_COLOR = '#089981' # Green
CANDLE_DOWN_COLOR = '#F23645' # Red
# Marker colors
MIN_COLOR = 'red'
MAX_COLOR = 'green'
# Line style colors
MIN_LINE_STYLE = 'g--' # Green dashed
MAX_LINE_STYLE = 'r--' # Red dashed
SMA7_LINE_STYLE = 'y-' # Yellow solid
SMA15_LINE_STYLE = 'm-' # Magenta solid
# SuperTrend colors
ST_COLOR_UP = 'g-'
ST_COLOR_DOWN = 'r-'
# Cache the calculation results by function parameters
@lru_cache(maxsize=32) @lru_cache(maxsize=32)
def cached_supertrend_calculation(period, multiplier, data_tuple): def cached_supertrend_calculation(period, multiplier, data_tuple):
# Convert tuple back to numpy arrays
high = np.array(data_tuple[0]) high = np.array(data_tuple[0])
low = np.array(data_tuple[1]) low = np.array(data_tuple[1])
close = np.array(data_tuple[2]) close = np.array(data_tuple[2])
# Calculate TR and ATR using vectorized operations
tr = np.zeros_like(close) tr = np.zeros_like(close)
tr[0] = high[0] - low[0] tr[0] = high[0] - low[0]
hc_range = np.abs(high[1:] - close[:-1]) hc_range = np.abs(high[1:] - close[:-1])
lc_range = np.abs(low[1:] - close[:-1]) lc_range = np.abs(low[1:] - close[:-1])
hl_range = high[1:] - low[1:] hl_range = high[1:] - low[1:]
tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range]) tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range])
# Use numpy's exponential moving average
atr = np.zeros_like(tr) atr = np.zeros_like(tr)
atr[0] = tr[0] atr[0] = tr[0]
multiplier_ema = 2.0 / (period + 1) multiplier_ema = 2.0 / (period + 1)
for i in range(1, len(tr)): for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema)) atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema))
# Calculate bands
upper_band = np.zeros_like(close) upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close) lower_band = np.zeros_like(close)
for i in range(len(close)): for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2 hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i]) upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i]) lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close) final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close) final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close) supertrend = np.zeros_like(close)
@@ -106,76 +66,18 @@ def cached_supertrend_calculation(period, multiplier, data_tuple):
} }
def calculate_supertrend_external(data, period, multiplier): def calculate_supertrend_external(data, period, multiplier):
# Convert DataFrame columns to hashable tuples
high_tuple = tuple(data['high']) high_tuple = tuple(data['high'])
low_tuple = tuple(data['low']) low_tuple = tuple(data['low'])
close_tuple = tuple(data['close']) close_tuple = tuple(data['close'])
# Call the cached function
return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple)) return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple))
class Supertrends: class Supertrends:
def __init__(self, data, verbose=False, display=False): def __init__(self, data, verbose=False, display=False):
"""
Initialize the TrendDetectorSimple class.
Parameters:
- data: pandas DataFrame containing price data
- verbose: boolean, whether to display detailed logging information
- display: boolean, whether to enable display/plotting features
"""
self.data = data self.data = data
self.verbose = verbose self.verbose = verbose
self.display = display
# Only define display-related variables if display is True
if self.display:
# Plot style configuration
self.plot_style = 'dark_background'
self.bg_color = DARK_BG_COLOR
self.plot_size = (12, 8)
# Candlestick configuration
self.candle_width = 0.6
self.candle_up_color = CANDLE_UP_COLOR
self.candle_down_color = CANDLE_DOWN_COLOR
self.candle_alpha = 0.8
self.wick_width = 1
# Marker configuration
self.min_marker = '^'
self.min_color = MIN_COLOR
self.min_size = 100
self.max_marker = 'v'
self.max_color = MAX_COLOR
self.max_size = 100
self.marker_zorder = 100
# Line configuration
self.line_width = 1
self.min_line_style = MIN_LINE_STYLE
self.max_line_style = MAX_LINE_STYLE
self.sma7_line_style = SMA7_LINE_STYLE
self.sma15_line_style = SMA15_LINE_STYLE
# Text configuration
self.title_size = 14
self.title_color = TITLE_COLOR
self.axis_label_size = 12
self.axis_label_color = AXIS_LABEL_COLOR
# Legend configuration
self.legend_loc = 'best'
self.legend_bg_color = LEGEND_BG_COLOR
# Configure logging
logging.basicConfig(level=logging.INFO if verbose else logging.WARNING, logging.basicConfig(level=logging.INFO if verbose else logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s') format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger('TrendDetectorSimple') self.logger = logging.getLogger('TrendDetectorSimple')
# Convert data to pandas DataFrame if it's not already
if not isinstance(self.data, pd.DataFrame): if not isinstance(self.data, pd.DataFrame):
if isinstance(self.data, list): if isinstance(self.data, list):
self.data = pd.DataFrame({'close': self.data}) self.data = pd.DataFrame({'close': self.data})
@@ -183,154 +85,101 @@ class Supertrends:
raise ValueError("Data must be a pandas DataFrame or a list") raise ValueError("Data must be a pandas DataFrame or a list")
def calculate_tr(self): def calculate_tr(self):
df = self.data.copy()
high = df['high'].values
low = df['low'].values
close = df['close'].values
tr = np.zeros_like(close)
tr[0] = high[0] - low[0]
for i in range(1, len(close)):
hl_range = high[i] - low[i]
hc_range = abs(high[i] - close[i-1])
lc_range = abs(low[i] - close[i-1])
tr[i] = max(hl_range, hc_range, lc_range)
return tr
def calculate_atr(self, period=14):
tr = self.calculate_tr()
atr = np.zeros_like(tr)
atr[0] = tr[0]
multiplier = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier))
return atr
def calculate_supertrend(self, period=10, multiplier=3.0):
""" """
Calculate True Range (TR) for the price data. Calculate SuperTrend indicator for the price data.
SuperTrend is a trend-following indicator that uses ATR to determine the trend direction.
True Range is the greatest of: Parameters:
1. Current high - current low - period: int, the period for the ATR calculation (default: 10)
2. |Current high - previous close| - multiplier: float, the multiplier for the ATR (default: 3.0)
3. |Current low - previous close|
Returns: Returns:
- Numpy array of TR values - Dictionary containing SuperTrend values, trend direction, and upper/lower bands
""" """
df = self.data.copy() df = self.data.copy()
high = df['high'].values high = df['high'].values
low = df['low'].values low = df['low'].values
close = df['close'].values close = df['close'].values
atr = self.calculate_atr(period)
tr = np.zeros_like(close) upper_band = np.zeros_like(close)
tr[0] = high[0] - low[0] # First TR is just the first day's range lower_band = np.zeros_like(close)
for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close)
trend = np.zeros_like(close)
final_upper[0] = upper_band[0]
final_lower[0] = lower_band[0]
if close[0] <= upper_band[0]:
supertrend[0] = upper_band[0]
trend[0] = -1
else:
supertrend[0] = lower_band[0]
trend[0] = 1
for i in range(1, len(close)): for i in range(1, len(close)):
# Current high - current low if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
hl_range = high[i] - low[i] final_upper[i] = upper_band[i]
# |Current high - previous close| else:
hc_range = abs(high[i] - close[i-1]) final_upper[i] = final_upper[i-1]
# |Current low - previous close| if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
lc_range = abs(low[i] - close[i-1]) final_lower[i] = lower_band[i]
else:
# TR is the maximum of these three values final_lower[i] = final_lower[i-1]
tr[i] = max(hl_range, hc_range, lc_range) if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
supertrend[i] = final_upper[i]
return tr trend[i] = -1
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
def calculate_atr(self, period=14): supertrend[i] = final_lower[i]
""" trend[i] = 1
Calculate Average True Range (ATR) for the price data. elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
supertrend[i] = final_lower[i]
ATR is the exponential moving average of the True Range over a specified period. trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
Parameters: supertrend[i] = final_upper[i]
- period: int, the period for the ATR calculation (default: 14) trend[i] = -1
supertrend_results = {
Returns: 'supertrend': supertrend,
- Numpy array of ATR values 'trend': trend,
""" 'upper_band': final_upper,
'lower_band': final_lower
tr = self.calculate_tr() }
atr = np.zeros_like(tr) return supertrend_results
# First ATR value is just the first TR
atr[0] = tr[0]
# Calculate exponential moving average (EMA) of TR
multiplier = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier))
return atr
def detect_trends(self):
"""
Detect trends by identifying local minima and maxima in the price data
using scipy.signal.find_peaks.
Parameters:
- prominence: float, required prominence of peaks (relative to the price range)
- width: int, required width of peaks in data points
Returns:
- DataFrame with columns for timestamps, prices, and trend indicators
- Dictionary containing analysis results including linear regression, SMAs, and SuperTrend indicators
"""
df = self.data
# close_prices = df['close'].values
# max_peaks, _ = find_peaks(close_prices)
# min_peaks, _ = find_peaks(-close_prices)
# df['is_min'] = False
# df['is_max'] = False
# for peak in max_peaks:
# df.at[peak, 'is_max'] = True
# for peak in min_peaks:
# df.at[peak, 'is_min'] = True
# result = df[['timestamp', 'close', 'is_min', 'is_max']].copy()
# Perform linear regression on min_peaks and max_peaks
# min_prices = df['close'].iloc[min_peaks].values
# max_prices = df['close'].iloc[max_peaks].values
# Linear regression for min peaks if we have at least 2 points
# min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices)
# Linear regression for max peaks if we have at least 2 points
# max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices)
# Calculate Simple Moving Averages (SMA) for 7 and 15 periods
# sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values
# sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values
analysis_results = {}
# analysis_results['linear_regression'] = {
# 'min': {
# 'slope': min_slope,
# 'intercept': min_intercept,
# 'r_squared': min_r_value ** 2
# },
# 'max': {
# 'slope': max_slope,
# 'intercept': max_intercept,
# 'r_squared': max_r_value ** 2
# }
# }
# analysis_results['sma'] = {
# '7': sma_7,
# '15': sma_15
# }
# Calculate SuperTrend indicators
supertrend_results_list = self._calculate_supertrend_indicators()
analysis_results['supertrend'] = supertrend_results_list
return analysis_results
def calculate_supertrend_indicators(self): def calculate_supertrend_indicators(self):
"""
Calculate SuperTrend indicators with different parameter sets in parallel.
Returns:
- list, the SuperTrend results
"""
supertrend_params = [ supertrend_params = [
{"period": 12, "multiplier": 3.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, {"period": 12, "multiplier": 3.0},
{"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}, {"period": 10, "multiplier": 1.0},
{"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN} {"period": 11, "multiplier": 2.0}
] ]
data = self.data.copy()
# For just 3 calculations, direct calculation might be faster than process pool
results = [] results = []
for p in supertrend_params: for p in supertrend_params:
result = calculate_supertrend_external(data, p["period"], p["multiplier"]) result = self.calculate_supertrend(period=p["period"], multiplier=p["multiplier"])
results.append(result) results.append({
supertrend_results_list = []
for params, result in zip(supertrend_params, results):
supertrend_results_list.append({
"results": result, "results": result,
"params": params "params": p
}) })
return supertrend_results_list return results

View File

@@ -1,80 +1,5 @@
import pandas as pd import pandas as pd
def check_data(data_df: pd.DataFrame) -> bool:
"""
Checks if the input DataFrame has a DatetimeIndex.
Args:
data_df (pd.DataFrame): DataFrame to check.
Returns:
bool: True if the DataFrame has a DatetimeIndex, False otherwise.
"""
if not isinstance(data_df.index, pd.DatetimeIndex):
print("Warning: Input DataFrame must have a DatetimeIndex.")
return False
agg_rules = {}
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.")
return False
return agg_rules
def aggregate_to_weekly(data_df: pd.DataFrame, weeks: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to weekly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the week.
'close' will be the last 'close' price of the week.
'high' will be the maximum 'high' price of the week.
'low' will be the minimum 'low' price of the week.
'volume' (if present) will be the sum of volumes for the week.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
weeks (int): The number of weeks to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to weekly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the week.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for weekly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to weekly frequency and apply aggregation rules
weekly_data = data_df.resample(f'{weeks}W').agg(agg_rules)
weekly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the week
if not weekly_data.empty and isinstance(weekly_data.index, pd.DatetimeIndex):
weekly_data.index = weekly_data.index.floor('W')
return weekly_data
def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame: def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
""" """
Aggregates time-series financial data to daily OHLCV format. Aggregates time-series financial data to daily OHLCV format.
@@ -99,8 +24,22 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
Raises: Raises:
ValueError: If the input DataFrame does not have a DatetimeIndex. ValueError: If the input DataFrame does not have a DatetimeIndex.
""" """
if not isinstance(data_df.index, pd.DatetimeIndex):
raise ValueError("Input DataFrame must have a DatetimeIndex.")
agg_rules = check_data(data_df) agg_rules = {}
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
if not agg_rules: if not agg_rules:
# Log a warning or raise an error if no relevant columns are found # Log a warning or raise an error if no relevant columns are found
@@ -119,43 +58,3 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
daily_data.dropna(how='all', inplace=True) daily_data.dropna(how='all', inplace=True)
return daily_data return daily_data
def aggregate_to_hourly(data_df: pd.DataFrame, hours: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to hourly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the hour.
'close' will be the last 'close' price of the hour.
'high' will be the maximum 'high' price of the hour.
'low' will be the minimum 'low' price of the hour.
'volume' (if present) will be the sum of volumes for the hour.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
hours (int): The number of hours to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to hourly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the hour.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for hourly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to hourly frequency and apply aggregation rules
hourly_data = data_df.resample(f'{hours}H').agg(agg_rules)
hourly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the hour
if not hourly_data.empty and isinstance(hourly_data.index, pd.DatetimeIndex):
hourly_data.index = hourly_data.index.floor('H')
return hourly_data

View File

@@ -8,7 +8,6 @@ The `Analysis` module includes classes for calculating common technical indicato
- **Relative Strength Index (RSI)**: Implemented in `cycles/Analysis/rsi.py`. - **Relative Strength Index (RSI)**: Implemented in `cycles/Analysis/rsi.py`.
- **Bollinger Bands**: Implemented in `cycles/Analysis/boillinger_band.py`. - **Bollinger Bands**: Implemented in `cycles/Analysis/boillinger_band.py`.
- **Trading Strategies**: Implemented in `cycles/Analysis/strategies.py`.
## Class: `RSI` ## Class: `RSI`
@@ -77,65 +76,3 @@ Found in `cycles/Analysis/boillinger_band.py`.
- `data_df` (pd.DataFrame): DataFrame with price data. Must include the `price_column`. - `data_df` (pd.DataFrame): DataFrame with price data. Must include the `price_column`.
- `price_column` (str, optional): The name of the column containing the price data (e.g., 'close'). Defaults to 'close'. - `price_column` (str, optional): The name of the column containing the price data (e.g., 'close'). Defaults to 'close'.
- **Returns**: `pd.DataFrame` - The original DataFrame with added columns: 'SMA', 'UpperBand', 'LowerBand'. - **Returns**: `pd.DataFrame` - The original DataFrame with added columns: 'SMA', 'UpperBand', 'LowerBand'.
## Class: `Strategy`
Found in `cycles/Analysis/strategies.py`.
Implements various trading strategies using technical indicators.
### `__init__(self, config = None, logging = None)`
- **Description**: Initializes the Strategy class with configuration and logging.
- **Parameters**:
- `config` (dict): Configuration dictionary with strategy parameters. Must be provided.
- `logging` (logging object, optional): Logger for output messages. Defaults to None.
### `run(self, data, strategy_name)`
- **Description**: Executes a specified strategy on the provided data.
- **Parameters**:
- `data` (pd.DataFrame): DataFrame with price, indicator data, and market regime information.
- `strategy_name` (str): Name of the strategy to run. Currently supports "MarketRegimeStrategy".
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with boolean values.
### `no_strategy(self, data)`
- **Description**: Returns empty buy/sell conditions (all False).
- **Parameters**:
- `data` (pd.DataFrame): Input data DataFrame.
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with all False values.
### `rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5)`
- **Description**: Calculates Bollinger Bands on RSI values for signal confirmation.
- **Parameters**:
- `rsi` (pd.Series): Series containing RSI values.
- `window` (int, optional): The period for the moving average. Defaults to 14.
- `std_mult` (float, optional): Standard deviation multiplier for bands. Defaults to 1.5.
- **Returns**: Tuple of (oversold_condition, overbought_condition) as pandas Series with boolean values.
### `MarketRegimeStrategy(self, data)`
- **Description**: Advanced strategy combining Bollinger Bands, RSI, volume analysis, and market regime detection.
- **Parameters**:
- `data` (pd.DataFrame): DataFrame with price data, technical indicators, and market regime information.
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with boolean values.
#### Strategy Logic
This strategy adapts to different market conditions:
**Trending Market (Breakout Mode):**
- Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
- Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
**Sideways Market (Mean Reversion):**
- Buy: Price ≤ Lower Band ∧ RSI ≤ 40
- Sell: Price ≥ Upper Band ∧ RSI ≥ 60
When `SqueezeStrategy` is enabled, additional confirmation using RSI Bollinger Bands is required:
- For buy signals: RSI must be below its lower Bollinger Band
- For sell signals: RSI must be above its upper Bollinger Band
For sideways markets, volume contraction (< 0.7× 30D Avg) is also checked to avoid false signals.

View File

@@ -1,43 +0,0 @@
# Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection to adapt to Bitcoin's unique market conditions. Backtested on 2018-2025 BTC data, it achieved 58% annualized returns with 22% max drawdown.
---
## **Adaptive Parameters**
### **Core Configuration**
| Indicator | Trending Market | Sideways Market |
|-----------------|-------------------------|-------------------------|
| **Bollinger** | 20 SMA, 2.5σ | 20 SMA, 1.8σ |
| **RSI** | 14-period, 30/70 | 14-period, 40/60 |
| **Confirmation**| Volume > 20% 30D Avg | Bollinger Band Width <5%|
## Strategy Components
### 1. Market Regime Detection
### 2. Entry Conditions
***Trending Market (Breakout Mode):***
Buy: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike (≥1.5× 20D Avg)
Sell: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike
***Sideways Market (Mean Reversion):***
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
### **Enhanced Signals with RSI Bollinger Squeeze**
*Signal Boost*: Requires both price and RSI to breach their respective bands.
---
## **Risk Management System**
### Volatility-Adjusted Position Sizing
$$ \text{Position Size} = \frac{\text{Capital} \times 0.02}{\text{ATR}_{14} \times \text{Price}} $$
**Key Adjustments:**
1. Use narrower Bollinger Bands (1.8σ) to avoid whipsaws
2. Require RSI confirmation within 40-60 range
3. Add volume contraction filter

55
main.py
View File

@@ -6,7 +6,6 @@ import os
import datetime import datetime
import argparse import argparse
import json import json
import ast
from cycles.utils.storage import Storage from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils from cycles.utils.system import SystemUtils
@@ -48,6 +47,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
cumulative_profit = 0 cumulative_profit = 0
max_drawdown = 0 max_drawdown = 0
peak = 0 peak = 0
for trade in trades: for trade in trades:
cumulative_profit += trade['profit_pct'] cumulative_profit += trade['profit_pct']
if cumulative_profit > peak: if cumulative_profit > peak:
@@ -55,10 +55,14 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
drawdown = peak - cumulative_profit drawdown = peak - cumulative_profit
if drawdown > max_drawdown: if drawdown > max_drawdown:
max_drawdown = drawdown max_drawdown = drawdown
final_usd = initial_usd final_usd = initial_usd
for trade in trades: for trade in trades:
final_usd *= (1 + trade['profit_pct']) final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
total_fees_usd = sum(trade['fee_usd'] for trade in trades)
row = { row = {
"timeframe": rule_name, "timeframe": rule_name,
"stop_loss_pct": stop_loss_pct, "stop_loss_pct": stop_loss_pct,
@@ -75,6 +79,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"total_fees_usd": total_fees_usd, "total_fees_usd": total_fees_usd,
} }
results_rows.append(row) results_rows.append(row)
for trade in trades: for trade in trades:
trade_rows.append({ trade_rows.append({
"timeframe": rule_name, "timeframe": rule_name,
@@ -87,7 +92,9 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"type": trade.get("type"), "type": trade.get("type"),
"fee_usd": trade.get("fee_usd"), "fee_usd": trade.get("fee_usd"),
}) })
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}") logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
if debug: if debug:
for trade in trades: for trade in trades:
if trade['type'] == 'STOP': if trade['type'] == 'STOP':
@@ -95,13 +102,16 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
for trade in trades: for trade in trades:
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10 if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
print("Large loss trade:", trade) print("Large loss trade:", trade)
return results_rows, trade_rows return results_rows, trade_rows
def process(timeframe_info, debug=False): def process(timeframe_info, debug=False):
"""Process a single (timeframe, stop_loss_pct) combination (no monthly split)""" from cycles.utils.storage import Storage # import inside function for safety
storage = Storage(logging=None) # or pass a logger if you want, but None is safest for multiprocessing
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
if rule == "1T": if rule == "1T" or rule == "1min":
df = data_1min.copy() df = data_1min.copy()
else: else:
df = data_1min.resample(rule).agg({ df = data_1min.resample(rule).agg({
@@ -112,7 +122,33 @@ def process(timeframe_info, debug=False):
'volume': 'sum' 'volume': 'sum'
}).dropna() }).dropna()
df = df.reset_index() df = df.reset_index()
results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug) results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug)
if all_trade_rows:
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
# Prepare header
summary_fields = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "final_usd"]
summary_row = results_rows[0]
header_line = "\t".join(summary_fields) + "\n"
value_line = "\t".join(str(summary_row.get(f, "")) for f in summary_fields) + "\n"
# File name
tf = summary_row["timeframe"]
sl = summary_row["stop_loss_pct"]
sl_percent = int(round(sl * 100))
trades_filename = os.path.join(storage.results_dir, f"trades_{tf}_ST{sl_percent}pct.csv")
# Write header
with open(trades_filename, "w") as f:
f.write(header_line)
f.write(value_line)
# Now write trades (append mode, skip header)
with open(trades_filename, "a", newline="") as f:
import csv
writer = csv.DictWriter(f, fieldnames=trades_fieldnames)
writer.writeheader()
for trade in all_trade_rows:
writer.writerow({k: trade.get(k, "") for k in trades_fieldnames})
return results_rows, all_trade_rows return results_rows, all_trade_rows
def aggregate_results(all_rows): def aggregate_results(all_rows):
@@ -126,7 +162,6 @@ def aggregate_results(all_rows):
summary_rows = [] summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items(): for (rule, stop_loss_pct), rows in grouped.items():
n_months = len(rows)
total_trades = sum(r['n_trades'] for r in rows) total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows) total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows]) avg_win_rate = np.mean([r['win_rate'] for r in rows])
@@ -171,11 +206,11 @@ if __name__ == "__main__":
# Default values (from config.json) # Default values (from config.json)
default_config = { default_config = {
"start_date": "2024-05-15", "start_date": "2025-05-01",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'), "stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"initial_usd": 10000, "initial_usd": 10000,
"timeframes": ["1D"], "timeframes": ["1D", "6h", "3h", "1h", "30m", "15m", "5m", "1m"],
"stop_loss_pcts": [0.01, 0.02, 0.03], "stop_loss_pcts": [0.01, 0.02, 0.03, 0.05],
} }
if args.config: if args.config:
@@ -238,6 +273,7 @@ if __name__ == "__main__":
if debug: if debug:
all_results_rows = [] all_results_rows = []
all_trade_rows = [] all_trade_rows = []
for task in tasks: for task in tasks:
results, trades = process(task, debug) results, trades = process(task, debug)
if results or trades: if results or trades:
@@ -263,7 +299,4 @@ if __name__ == "__main__":
] ]
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines) storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
storage.write_trades(all_trade_rows, trades_fieldnames)

View File

@@ -7,7 +7,6 @@ from cycles.utils.storage import Storage
from cycles.utils.data_utils import aggregate_to_daily from cycles.utils.data_utils import aggregate_to_daily
from cycles.Analysis.boillinger_band import BollingerBands from cycles.Analysis.boillinger_band import BollingerBands
from cycles.Analysis.rsi import RSI from cycles.Analysis.rsi import RSI
from cycles.Analysis.strategies import Strategy
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -19,34 +18,31 @@ logging.basicConfig(
) )
config_minute = { config_minute = {
"start_date": "2023-01-01", "start_date": "2022-01-01",
"stop_date": "2024-01-01", "stop_date": "2023-01-01",
"data_file": "btcusd_1-min_data.csv" "data_file": "btcusd_1-min_data.csv"
} }
config_day = { config_day = {
"start_date": "2023-01-01", "start_date": "2022-01-01",
"stop_date": "2024-01-01", "stop_date": "2023-01-01",
"data_file": "btcusd_1-day_data.csv" "data_file": "btcusd_1-day_data.csv"
} }
config_strategy = { IS_DAY = True
"bb_width": 0.05,
"bb_period": 20, def no_strategy(data_bb, data_with_rsi):
"rsi_period": 14, buy_condition = pd.Series([False] * len(data_bb), index=data_bb.index)
"trending": { sell_condition = pd.Series([False] * len(data_bb), index=data_bb.index)
"rsi_threshold": [30, 70], return buy_condition, sell_condition
"bb_std_dev_multiplier": 2.5,
}, def strategy_1(data_bb, data_with_rsi):
"sideways": { # Long trade: price move below lower Bollinger band and RSI go below 25
"rsi_threshold": [40, 60], buy_condition = (data_bb['close'] < data_bb['LowerBand']) & (data_bb['RSI'] < 25)
"bb_std_dev_multiplier": 1.8, # Short only: price move above top Bollinger band and RSI goes over 75
}, sell_condition = (data_bb['close'] > data_bb['UpperBand']) & (data_bb['RSI'] > 75)
"strategy_name": "MarketRegimeStrategy", return buy_condition, sell_condition
"SqueezeStrategy": True
}
IS_DAY = False
if __name__ == "__main__": if __name__ == "__main__":
@@ -66,10 +62,10 @@ if __name__ == "__main__":
else: else:
df_to_plot = data df_to_plot = data
bb = BollingerBands(config=config_strategy) bb = BollingerBands(period=30, std_dev_multiplier=2.0)
data_bb = bb.calculate(df_to_plot.copy()) data_bb = bb.calculate(df_to_plot.copy())
rsi_calculator = RSI(config=config_strategy) rsi_calculator = RSI(period=13)
data_with_rsi = rsi_calculator.calculate(df_to_plot.copy(), price_column='close') data_with_rsi = rsi_calculator.calculate(df_to_plot.copy(), price_column='close')
# Combine BB and RSI data into a single DataFrame for signal generation # Combine BB and RSI data into a single DataFrame for signal generation
@@ -82,8 +78,11 @@ if __name__ == "__main__":
data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float) data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float)
logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.") logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.")
strategy = Strategy(config=config_strategy) strategy = 1
buy_condition, sell_condition = strategy.run(data_bb, config_strategy["strategy_name"]) if strategy == 1:
buy_condition, sell_condition = strategy_1(data_bb, data_with_rsi)
else:
buy_condition, sell_condition = no_strategy(data_bb, data_with_rsi)
buy_signals = data_bb[buy_condition] buy_signals = data_bb[buy_condition]
sell_signals = data_bb[sell_condition] sell_signals = data_bb[sell_condition]
@@ -91,7 +90,7 @@ if __name__ == "__main__":
# plot the data with seaborn library # plot the data with seaborn library
if df_to_plot is not None and not df_to_plot.empty: if df_to_plot is not None and not df_to_plot.empty:
# Create a figure with two subplots, sharing the x-axis # Create a figure with two subplots, sharing the x-axis
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(16, 8), sharex=True) fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 8), sharex=True)
# Plot 1: Close Price and Bollinger Bands # Plot 1: Close Price and Bollinger Bands
sns.lineplot(x=data_bb.index, y='close', data=data_bb, label='Close Price', ax=ax1) sns.lineplot(x=data_bb.index, y='close', data=data_bb, label='Close Price', ax=ax1)
@@ -109,9 +108,9 @@ if __name__ == "__main__":
# Plot 2: RSI # Plot 2: RSI
if 'RSI' in data_bb.columns: # Check data_bb now as it should contain RSI if 'RSI' in data_bb.columns: # Check data_bb now as it should contain RSI
sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (' + str(config_strategy["rsi_period"]) + ')', ax=ax2, color='purple') sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (14)', ax=ax2, color='purple')
ax2.axhline(config_strategy["trending"]["rsi_threshold"][1], color='red', linestyle='--', linewidth=0.8, label='Overbought (' + str(config_strategy["trending"]["rsi_threshold"][1]) + ')') ax2.axhline(75, color='red', linestyle='--', linewidth=0.8, label='Overbought (75)')
ax2.axhline(config_strategy['trending']['rsi_threshold'][0], color='green', linestyle='--', linewidth=0.8, label='Oversold (' + str(config_strategy['trending']['rsi_threshold'][0]) + ')') ax2.axhline(25, color='green', linestyle='--', linewidth=0.8, label='Oversold (25)')
# Plot Buy/Sell signals on RSI chart # Plot Buy/Sell signals on RSI chart
if not buy_signals.empty: if not buy_signals.empty:
ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5) ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5)
@@ -125,14 +124,6 @@ if __name__ == "__main__":
else: else:
logging.info("RSI data not available for plotting.") logging.info("RSI data not available for plotting.")
# Plot 3: BB Width
sns.lineplot(x=data_bb.index, y='BBWidth', data=data_bb, label='BB Width', ax=ax3)
sns.lineplot(x=data_bb.index, y='MarketRegime', data=data_bb, label='Market Regime (Sideways: 1, Trending: 0)', ax=ax3)
ax3.set_title('Bollinger Bands Width')
ax3.set_ylabel('BB Width')
ax3.legend()
ax3.grid(True)
plt.xlabel('Date') # Common X-axis label plt.xlabel('Date') # Common X-axis label
fig.tight_layout() # Adjust layout to prevent overlapping titles/labels fig.tight_layout() # Adjust layout to prevent overlapping titles/labels
plt.show() plt.show()