3 Commits

Author SHA1 Message Date
Simon Moisy
1284549106 progress print 2025-05-29 11:04:03 +08:00
Simon Moisy
5f03524d6a never fallback to default values for fee_usd 2025-05-28 02:50:40 +08:00
Simon Moisy
74c8048ed5 shifted one day back on the metatrend to avoid lookahead bias, reverted metatrend calculus to use no cpu optimization for readability 2025-05-27 17:49:55 +08:00
11 changed files with 337 additions and 835 deletions

View File

@@ -4,25 +4,23 @@ class BollingerBands:
"""
Calculates Bollinger Bands for given financial data.
"""
def __init__(self, config):
def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0):
"""
Initializes the BollingerBands calculator.
Args:
period (int): The period for the moving average and standard deviation.
std_dev_multiplier (float): The number of standard deviations for the upper and lower bands.
bb_width (float): The width of the Bollinger Bands.
"""
if config['bb_period'] <= 0:
if period <= 0:
raise ValueError("Period must be a positive integer.")
if config['trending']['bb_std_dev_multiplier'] <= 0 or config['sideways']['bb_std_dev_multiplier'] <= 0:
if std_dev_multiplier <= 0:
raise ValueError("Standard deviation multiplier must be positive.")
if config['bb_width'] <= 0:
raise ValueError("BB width must be positive.")
self.config = config
self.period = period
self.std_dev_multiplier = std_dev_multiplier
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close', squeeze = False) -> pd.DataFrame:
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
"""
Calculates Bollinger Bands and adds them to the DataFrame.
@@ -39,37 +37,14 @@ class BollingerBands:
if price_column not in data_df.columns:
raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
if not squeeze:
# Calculate SMA
data_df['SMA'] = data_df[price_column].rolling(window=self.config['bb_period']).mean()
# Calculate SMA
data_df['SMA'] = data_df[price_column].rolling(window=self.period).mean()
# Calculate Standard Deviation
std_dev = data_df[price_column].rolling(window=self.config['bb_period']).std()
# Calculate Standard Deviation
std_dev = data_df[price_column].rolling(window=self.period).std()
# Calculate Upper and Lower Bands
data_df['UpperBand'] = data_df['SMA'] + (2.0* std_dev)
data_df['LowerBand'] = data_df['SMA'] - (2.0* std_dev)
# Calculate the width of the Bollinger Bands
data_df['BBWidth'] = (data_df['UpperBand'] - data_df['LowerBand']) / data_df['SMA']
# Calculate the market regime
# 1 = sideways, 0 = trending
data_df['MarketRegime'] = (data_df['BBWidth'] < self.config['bb_width']).astype(int)
if data_df['MarketRegime'].sum() > 0:
data_df['UpperBand'] = data_df['SMA'] + (self.config['trending']['bb_std_dev_multiplier'] * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (self.config['trending']['bb_std_dev_multiplier'] * std_dev)
else:
data_df['UpperBand'] = data_df['SMA'] + (self.config['sideways']['bb_std_dev_multiplier'] * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (self.config['sideways']['bb_std_dev_multiplier'] * std_dev)
else:
data_df['SMA'] = data_df[price_column].rolling(window=14).mean()
# Calculate Standard Deviation
std_dev = data_df[price_column].rolling(window=14).std()
# Calculate Upper and Lower Bands
data_df['UpperBand'] = data_df['SMA'] + 1.5* std_dev
data_df['LowerBand'] = data_df['SMA'] - 1.5* std_dev
# Calculate Upper and Lower Bands
data_df['UpperBand'] = data_df['SMA'] + (self.std_dev_multiplier * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (self.std_dev_multiplier * std_dev)
return data_df

View File

@@ -5,7 +5,7 @@ class RSI:
"""
A class to calculate the Relative Strength Index (RSI).
"""
def __init__(self, config):
def __init__(self, period: int = 14):
"""
Initializes the RSI calculator.
@@ -13,9 +13,9 @@ class RSI:
period (int): The period for RSI calculation. Default is 14.
Must be a positive integer.
"""
if not isinstance(config['rsi_period'], int) or config['rsi_period'] <= 0:
if not isinstance(period, int) or period <= 0:
raise ValueError("Period must be a positive integer.")
self.period = config['rsi_period']
self.period = period
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
"""

View File

@@ -1,131 +0,0 @@
import pandas as pd
import numpy as np
from cycles.Analysis.boillinger_band import BollingerBands
class Strategy:
def __init__(self, config = None, logging = None):
if config is None:
raise ValueError("Config must be provided.")
self.config = config
self.logging = logging
def run(self, data, strategy_name):
if strategy_name == "MarketRegimeStrategy":
return self.MarketRegimeStrategy(data)
else:
if self.logging is not None:
self.logging.warning(f"Strategy {strategy_name} not found. Using no_strategy instead.")
return self.no_strategy(data)
def no_strategy(self, data):
"""No strategy: returns False for both buy and sell conditions"""
buy_condition = pd.Series([False] * len(data), index=data.index)
sell_condition = pd.Series([False] * len(data), index=data.index)
return buy_condition, sell_condition
def rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5):
"""Calculate RSI Bollinger Bands for confirmation
Args:
rsi (Series): RSI values
window (int): Rolling window for SMA
std_mult (float): Standard deviation multiplier
Returns:
tuple: (oversold condition, overbought condition)
"""
valid_rsi = ~rsi.isna()
if not valid_rsi.any():
# Return empty Series if no valid RSI data
return pd.Series(False, index=rsi.index), pd.Series(False, index=rsi.index)
rsi_sma = rsi.rolling(window).mean()
rsi_std = rsi.rolling(window).std()
upper_rsi_band = rsi_sma + std_mult * rsi_std
lower_rsi_band = rsi_sma - std_mult * rsi_std
return (rsi < lower_rsi_band), (rsi > upper_rsi_band)
def MarketRegimeStrategy(self, data):
"""Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
with adaptive Bollinger Bands
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection
to adapt to Bitcoin's unique market conditions.
Entry Conditions:
- Trending Market (Breakout Mode):
Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
- Sideways Market (Mean Reversion):
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
Enhanced with RSI Bollinger Squeeze for signal confirmation when enabled.
"""
# Initialize conditions as all False
buy_condition = pd.Series(False, index=data.index)
sell_condition = pd.Series(False, index=data.index)
# Create masks for different market regimes
sideways_mask = data['MarketRegime'] > 0
trending_mask = data['MarketRegime'] <= 0
valid_data_mask = ~data['MarketRegime'].isna() # Handle potential NaN values
# Calculate volume spike (≥1.5× 20D Avg)
if 'volume' in data.columns:
volume_20d_avg = data['volume'].rolling(window=20).mean()
volume_spike = data['volume'] >= 1.5 * volume_20d_avg
# Additional volume contraction filter for sideways markets
volume_30d_avg = data['volume'].rolling(window=30).mean()
volume_contraction = data['volume'] < 0.7 * volume_30d_avg
else:
# If volume data is not available, assume no volume spike
volume_spike = pd.Series(False, index=data.index)
volume_contraction = pd.Series(False, index=data.index)
if self.logging is not None:
self.logging.warning("Volume data not available. Volume conditions will not be triggered.")
# Calculate RSI Bollinger Squeeze confirmation
if 'RSI' in data.columns:
oversold_rsi, overbought_rsi = self.rsi_bollinger_confirmation(data['RSI'])
else:
oversold_rsi = pd.Series(False, index=data.index)
overbought_rsi = pd.Series(False, index=data.index)
if self.logging is not None:
self.logging.warning("RSI data not available. RSI Bollinger Squeeze will not be triggered.")
# Calculate conditions for sideways market (Mean Reversion)
if sideways_mask.any():
sideways_buy = (data['close'] <= data['LowerBand']) & (data['RSI'] <= 40)
sideways_sell = (data['close'] >= data['UpperBand']) & (data['RSI'] >= 60)
# Add enhanced confirmation for sideways markets
if self.config.get("SqueezeStrategy", False):
sideways_buy = sideways_buy & oversold_rsi & volume_contraction
sideways_sell = sideways_sell & overbought_rsi & volume_contraction
# Apply only where market is sideways and data is valid
buy_condition = buy_condition | (sideways_buy & sideways_mask & valid_data_mask)
sell_condition = sell_condition | (sideways_sell & sideways_mask & valid_data_mask)
# Calculate conditions for trending market (Breakout Mode)
if trending_mask.any():
trending_buy = (data['close'] < data['LowerBand']) & (data['RSI'] < 50) & volume_spike
trending_sell = (data['close'] > data['UpperBand']) & (data['RSI'] > 50) & volume_spike
# Add enhanced confirmation for trending markets
if self.config.get("SqueezeStrategy", False):
trending_buy = trending_buy & oversold_rsi
trending_sell = trending_sell & overbought_rsi
# Apply only where market is trending and data is valid
buy_condition = buy_condition | (trending_buy & trending_mask & valid_data_mask)
sell_condition = sell_condition | (trending_sell & trending_mask & valid_data_mask)
return buy_condition, sell_condition

View File

@@ -1,31 +1,13 @@
import pandas as pd
import numpy as np
import time
from cycles.supertrend import Supertrends
from cycles.market_fees import MarketFees
class Backtest:
class Data:
def __init__(self, initial_usd, df, min1_df, init_strategy_fields) -> None:
self.initial_usd = initial_usd
self.usd = initial_usd
self.max_balance = initial_usd
self.coin = 0
self.position = 0
self.entry_price = 0
self.entry_time = None
self.current_trade_min1_start_idx = None
self.current_min1_end_idx = None
self.price_open = None
self.price_close = None
self.current_date = None
self.strategies = {}
self.df = df
self.min1_df = min1_df
self = init_strategy_fields(self)
@staticmethod
def run(data, entry_strategy, exit_strategy, debug=False):
def run(min1_df, df, initial_usd, stop_loss_pct, debug=False):
"""
Backtest a simple strategy using the meta supertrend (all three supertrends agree).
Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss.
@@ -36,43 +18,97 @@ class Backtest:
- stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%)
- debug: bool, whether to print debug info
"""
_df = df.copy().reset_index(drop=True)
_df['timestamp'] = pd.to_datetime(_df['timestamp'])
supertrends = Supertrends(_df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
# Shift meta_trend by one to avoid lookahead bias
meta_trend_signal = np.roll(meta_trend, 1)
meta_trend_signal[0] = 0 # or np.nan, but 0 means 'no signal' for first bar
position = 0 # 0 = no position, 1 = long
entry_price = 0
usd = initial_usd
coin = 0
trade_log = []
max_balance = initial_usd
drawdowns = []
trades = []
entry_time = None
current_trade_min1_start_idx = None
for i in range(1, len(data.df)):
data.price_open = data.df['open'].iloc[i]
data.price_close = data.df['close'].iloc[i]
min1_df.index = pd.to_datetime(min1_df.index)
min1_timestamps = min1_df.index.values
last_print_time = time.time()
for i in range(1, len(_df)):
current_time = time.time()
if current_time - last_print_time >= 5:
progress = (i / len(_df)) * 100
print(f"\rProgress: {progress:.1f}%", end="", flush=True)
last_print_time = current_time
price_open = _df['open'].iloc[i]
price_close = _df['close'].iloc[i]
date = _df['timestamp'].iloc[i]
prev_mt = meta_trend_signal[i-1]
curr_mt = meta_trend_signal[i]
data.current_date = data.df['timestamp'].iloc[i]
# Check stop loss if in position
if position == 1:
stop_loss_result = Backtest.check_stop_loss(
min1_df,
entry_time,
date,
entry_price,
stop_loss_pct,
coin,
usd,
debug,
current_trade_min1_start_idx
)
if stop_loss_result is not None:
trade_log_entry, current_trade_min1_start_idx, position, coin, entry_price = stop_loss_result
trade_log.append(trade_log_entry)
continue
# Update the start index for next check
current_trade_min1_start_idx = min1_df.index[min1_df.index <= date][-1]
if data.position == 0:
if entry_strategy(data, i):
data, entry_log_entry = Backtest.handle_entry(data)
trade_log.append(entry_log_entry)
elif data.position == 1:
exit_test_results, data, sell_price = exit_strategy(data, i)
if exit_test_results is not None:
data, exit_log_entry = Backtest.handle_exit(data, exit_test_results, sell_price)
trade_log.append(exit_log_entry)
# Entry: only if not in position and signal changes to 1
if position == 0 and prev_mt != 1 and curr_mt == 1:
entry_result = Backtest.handle_entry(usd, price_open, date)
coin, entry_price, entry_time, usd, position, trade_log_entry = entry_result
trade_log.append(trade_log_entry)
# Exit: only if in position and signal changes from 1 to -1
elif position == 1 and prev_mt == 1 and curr_mt == -1:
exit_result = Backtest.handle_exit(coin, price_open, entry_price, entry_time, date)
usd, coin, position, entry_price, trade_log_entry = exit_result
trade_log.append(trade_log_entry)
# Track drawdown
balance = data.usd if data.position == 0 else data.coin * data.price_close
if balance > data.max_balance:
data.max_balance = balance
drawdown = (data.max_balance - balance) / data.max_balance
balance = usd if position == 0 else coin * price_close
if balance > max_balance:
max_balance = balance
drawdown = (max_balance - balance) / max_balance
drawdowns.append(drawdown)
print("\rProgress: 100%\r\n", end="", flush=True)
# If still in position at end, sell at last close
if data.position == 1:
data, exit_log_entry = Backtest.handle_exit(data, "EOD", None)
trade_log.append(exit_log_entry)
if position == 1:
exit_result = Backtest.handle_exit(coin, _df['close'].iloc[-1], entry_price, entry_time, _df['timestamp'].iloc[-1])
usd, coin, position, entry_price, trade_log_entry = exit_result
trade_log.append(trade_log_entry)
# Calculate statistics
final_balance = data.usd
final_balance = usd
n_trades = len(trade_log)
wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']]
win_rate = len(wins) / n_trades if n_trades > 0 else 0
@@ -92,14 +128,14 @@ class Backtest:
'entry': trade['entry'],
'exit': trade['exit'],
'profit_pct': profit_pct,
'type': trade['type'],
'fee_usd': trade['fee_usd']
'type': trade.get('type', 'SELL'),
'fee_usd': trade.get('fee_usd')
})
fee_usd = trade.get('fee_usd')
total_fees_usd += fee_usd
results = {
"initial_usd": data.initial_usd,
"initial_usd": initial_usd,
"final_usd": final_balance,
"n_trades": n_trades,
"win_rate": win_rate,
@@ -121,45 +157,74 @@ class Backtest:
return results
@staticmethod
def handle_entry(data):
entry_fee = MarketFees.calculate_okx_taker_maker_fee(data.usd, is_maker=False)
usd_after_fee = data.usd - entry_fee
data.coin = usd_after_fee / data.price_open
data.entry_price = data.price_open
data.entry_time = data.current_date
data.usd = 0
data.position = 1
def check_stop_loss(min1_df, entry_time, date, entry_price, stop_loss_pct, coin, usd, debug, current_trade_min1_start_idx):
stop_price = entry_price * (1 - stop_loss_pct)
if current_trade_min1_start_idx is None:
current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0]
current_min1_end_idx = min1_df.index[min1_df.index <= date][-1]
# Check all 1-minute candles in between for stop loss
min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx]
if (min1_slice['low'] <= stop_price).any():
# Stop loss triggered, find the exact candle
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
# More realistic fill: if open < stop, fill at open, else at stop
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
if debug:
print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}")
btc_to_sell = coin
usd_gross = btc_to_sell * sell_price
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
trade_log_entry = {
'type': 'STOP',
'entry': entry_price,
'exit': sell_price,
'entry_time': entry_time,
'exit_time': stop_candle.name,
'fee_usd': exit_fee
}
# After stop loss, reset position and entry
return trade_log_entry, None, 0, 0, 0
return None
@staticmethod
def handle_entry(usd, price_open, date):
entry_fee = MarketFees.calculate_okx_taker_maker_fee(usd, is_maker=False)
usd_after_fee = usd - entry_fee
coin = usd_after_fee / price_open
entry_price = price_open
entry_time = date
usd = 0
position = 1
trade_log_entry = {
'type': 'BUY',
'entry': data.entry_price,
'entry': entry_price,
'exit': None,
'entry_time': data.entry_time,
'entry_time': entry_time,
'exit_time': None,
'fee_usd': entry_fee
}
return data, trade_log_entry
return coin, entry_price, entry_time, usd, position, trade_log_entry
@staticmethod
def handle_exit(data, exit_reason, sell_price):
btc_to_sell = data.coin
exit_price = sell_price if sell_price is not None else data.price_open
usd_gross = btc_to_sell * exit_price
def handle_exit(coin, price_open, entry_price, entry_time, date):
btc_to_sell = coin
usd_gross = btc_to_sell * price_open
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
data.usd = usd_gross - exit_fee
exit_log_entry = {
'type': exit_reason,
'entry': data.entry_price,
'exit': exit_price,
'entry_time': data.entry_time,
'exit_time': data.current_date,
usd = usd_gross - exit_fee
trade_log_entry = {
'type': 'SELL',
'entry': entry_price,
'exit': price_open,
'entry_time': entry_time,
'exit_time': date,
'fee_usd': exit_fee
}
data.coin = 0
data.position = 0
data.entry_price = 0
return data, exit_log_entry
coin = 0
position = 0
entry_price = 0
return usd, coin, position, entry_price, trade_log_entry

View File

@@ -2,6 +2,6 @@ import pandas as pd
class MarketFees:
@staticmethod
def calculate_okx_taker_maker_fee(amount, is_maker=True) -> float:
def calculate_okx_taker_maker_fee(amount, is_maker=True):
fee_rate = 0.0008 if is_maker else 0.0010
return amount * fee_rate

View File

@@ -1,70 +1,30 @@
import pandas as pd
import numpy as np
import logging
from scipy.signal import find_peaks
from matplotlib.patches import Rectangle
from scipy import stats
import concurrent.futures
from functools import partial
from functools import lru_cache
import matplotlib.pyplot as plt
# Color configuration
# Plot colors
DARK_BG_COLOR = '#181C27'
LEGEND_BG_COLOR = '#333333'
TITLE_COLOR = 'white'
AXIS_LABEL_COLOR = 'white'
# Candlestick colors
CANDLE_UP_COLOR = '#089981' # Green
CANDLE_DOWN_COLOR = '#F23645' # Red
# Marker colors
MIN_COLOR = 'red'
MAX_COLOR = 'green'
# Line style colors
MIN_LINE_STYLE = 'g--' # Green dashed
MAX_LINE_STYLE = 'r--' # Red dashed
SMA7_LINE_STYLE = 'y-' # Yellow solid
SMA15_LINE_STYLE = 'm-' # Magenta solid
# SuperTrend colors
ST_COLOR_UP = 'g-'
ST_COLOR_DOWN = 'r-'
# Cache the calculation results by function parameters
@lru_cache(maxsize=32)
def cached_supertrend_calculation(period, multiplier, data_tuple):
# Convert tuple back to numpy arrays
high = np.array(data_tuple[0])
low = np.array(data_tuple[1])
close = np.array(data_tuple[2])
# Calculate TR and ATR using vectorized operations
tr = np.zeros_like(close)
tr[0] = high[0] - low[0]
hc_range = np.abs(high[1:] - close[:-1])
lc_range = np.abs(low[1:] - close[:-1])
hl_range = high[1:] - low[1:]
tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range])
# Use numpy's exponential moving average
atr = np.zeros_like(tr)
atr[0] = tr[0]
multiplier_ema = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema))
# Calculate bands
upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close)
for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close)
@@ -106,76 +66,18 @@ def cached_supertrend_calculation(period, multiplier, data_tuple):
}
def calculate_supertrend_external(data, period, multiplier):
# Convert DataFrame columns to hashable tuples
high_tuple = tuple(data['high'])
low_tuple = tuple(data['low'])
close_tuple = tuple(data['close'])
# Call the cached function
return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple))
class Supertrends:
def __init__(self, data, verbose=False, display=False):
"""
Initialize the TrendDetectorSimple class.
Parameters:
- data: pandas DataFrame containing price data
- verbose: boolean, whether to display detailed logging information
- display: boolean, whether to enable display/plotting features
"""
self.data = data
self.verbose = verbose
self.display = display
# Only define display-related variables if display is True
if self.display:
# Plot style configuration
self.plot_style = 'dark_background'
self.bg_color = DARK_BG_COLOR
self.plot_size = (12, 8)
# Candlestick configuration
self.candle_width = 0.6
self.candle_up_color = CANDLE_UP_COLOR
self.candle_down_color = CANDLE_DOWN_COLOR
self.candle_alpha = 0.8
self.wick_width = 1
# Marker configuration
self.min_marker = '^'
self.min_color = MIN_COLOR
self.min_size = 100
self.max_marker = 'v'
self.max_color = MAX_COLOR
self.max_size = 100
self.marker_zorder = 100
# Line configuration
self.line_width = 1
self.min_line_style = MIN_LINE_STYLE
self.max_line_style = MAX_LINE_STYLE
self.sma7_line_style = SMA7_LINE_STYLE
self.sma15_line_style = SMA15_LINE_STYLE
# Text configuration
self.title_size = 14
self.title_color = TITLE_COLOR
self.axis_label_size = 12
self.axis_label_color = AXIS_LABEL_COLOR
# Legend configuration
self.legend_loc = 'best'
self.legend_bg_color = LEGEND_BG_COLOR
# Configure logging
logging.basicConfig(level=logging.INFO if verbose else logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger('TrendDetectorSimple')
# Convert data to pandas DataFrame if it's not already
if not isinstance(self.data, pd.DataFrame):
if isinstance(self.data, list):
self.data = pd.DataFrame({'close': self.data})
@@ -183,154 +85,101 @@ class Supertrends:
raise ValueError("Data must be a pandas DataFrame or a list")
def calculate_tr(self):
df = self.data.copy()
high = df['high'].values
low = df['low'].values
close = df['close'].values
tr = np.zeros_like(close)
tr[0] = high[0] - low[0]
for i in range(1, len(close)):
hl_range = high[i] - low[i]
hc_range = abs(high[i] - close[i-1])
lc_range = abs(low[i] - close[i-1])
tr[i] = max(hl_range, hc_range, lc_range)
return tr
def calculate_atr(self, period=14):
tr = self.calculate_tr()
atr = np.zeros_like(tr)
atr[0] = tr[0]
multiplier = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier))
return atr
def calculate_supertrend(self, period=10, multiplier=3.0):
"""
Calculate True Range (TR) for the price data.
True Range is the greatest of:
1. Current high - current low
2. |Current high - previous close|
3. |Current low - previous close|
Calculate SuperTrend indicator for the price data.
SuperTrend is a trend-following indicator that uses ATR to determine the trend direction.
Parameters:
- period: int, the period for the ATR calculation (default: 10)
- multiplier: float, the multiplier for the ATR (default: 3.0)
Returns:
- Numpy array of TR values
- Dictionary containing SuperTrend values, trend direction, and upper/lower bands
"""
df = self.data.copy()
high = df['high'].values
low = df['low'].values
close = df['close'].values
tr = np.zeros_like(close)
tr[0] = high[0] - low[0] # First TR is just the first day's range
atr = self.calculate_atr(period)
upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close)
for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close)
trend = np.zeros_like(close)
final_upper[0] = upper_band[0]
final_lower[0] = lower_band[0]
if close[0] <= upper_band[0]:
supertrend[0] = upper_band[0]
trend[0] = -1
else:
supertrend[0] = lower_band[0]
trend[0] = 1
for i in range(1, len(close)):
# Current high - current low
hl_range = high[i] - low[i]
# |Current high - previous close|
hc_range = abs(high[i] - close[i-1])
# |Current low - previous close|
lc_range = abs(low[i] - close[i-1])
# TR is the maximum of these three values
tr[i] = max(hl_range, hc_range, lc_range)
return tr
def calculate_atr(self, period=14):
"""
Calculate Average True Range (ATR) for the price data.
ATR is the exponential moving average of the True Range over a specified period.
Parameters:
- period: int, the period for the ATR calculation (default: 14)
Returns:
- Numpy array of ATR values
"""
tr = self.calculate_tr()
atr = np.zeros_like(tr)
# First ATR value is just the first TR
atr[0] = tr[0]
# Calculate exponential moving average (EMA) of TR
multiplier = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier))
return atr
def detect_trends(self):
"""
Detect trends by identifying local minima and maxima in the price data
using scipy.signal.find_peaks.
Parameters:
- prominence: float, required prominence of peaks (relative to the price range)
- width: int, required width of peaks in data points
Returns:
- DataFrame with columns for timestamps, prices, and trend indicators
- Dictionary containing analysis results including linear regression, SMAs, and SuperTrend indicators
"""
df = self.data
# close_prices = df['close'].values
# max_peaks, _ = find_peaks(close_prices)
# min_peaks, _ = find_peaks(-close_prices)
# df['is_min'] = False
# df['is_max'] = False
# for peak in max_peaks:
# df.at[peak, 'is_max'] = True
# for peak in min_peaks:
# df.at[peak, 'is_min'] = True
# result = df[['timestamp', 'close', 'is_min', 'is_max']].copy()
# Perform linear regression on min_peaks and max_peaks
# min_prices = df['close'].iloc[min_peaks].values
# max_prices = df['close'].iloc[max_peaks].values
# Linear regression for min peaks if we have at least 2 points
# min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices)
# Linear regression for max peaks if we have at least 2 points
# max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices)
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
final_upper[i] = upper_band[i]
else:
final_upper[i] = final_upper[i-1]
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
final_lower[i] = lower_band[i]
else:
final_lower[i] = final_lower[i-1]
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
supertrend_results = {
'supertrend': supertrend,
'trend': trend,
'upper_band': final_upper,
'lower_band': final_lower
}
return supertrend_results
# Calculate Simple Moving Averages (SMA) for 7 and 15 periods
# sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values
# sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values
analysis_results = {}
# analysis_results['linear_regression'] = {
# 'min': {
# 'slope': min_slope,
# 'intercept': min_intercept,
# 'r_squared': min_r_value ** 2
# },
# 'max': {
# 'slope': max_slope,
# 'intercept': max_intercept,
# 'r_squared': max_r_value ** 2
# }
# }
# analysis_results['sma'] = {
# '7': sma_7,
# '15': sma_15
# }
# Calculate SuperTrend indicators
supertrend_results_list = self._calculate_supertrend_indicators()
analysis_results['supertrend'] = supertrend_results_list
return analysis_results
def calculate_supertrend_indicators(self):
"""
Calculate SuperTrend indicators with different parameter sets in parallel.
Returns:
- list, the SuperTrend results
"""
supertrend_params = [
{"period": 12, "multiplier": 3.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN},
{"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN},
{"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}
{"period": 12, "multiplier": 3.0},
{"period": 10, "multiplier": 1.0},
{"period": 11, "multiplier": 2.0}
]
data = self.data.copy()
# For just 3 calculations, direct calculation might be faster than process pool
results = []
for p in supertrend_params:
result = calculate_supertrend_external(data, p["period"], p["multiplier"])
results.append(result)
supertrend_results_list = []
for params, result in zip(supertrend_params, results):
supertrend_results_list.append({
result = self.calculate_supertrend(period=p["period"], multiplier=p["multiplier"])
results.append({
"results": result,
"params": params
"params": p
})
return supertrend_results_list
return results

View File

@@ -1,80 +1,5 @@
import pandas as pd
def check_data(data_df: pd.DataFrame) -> bool:
"""
Checks if the input DataFrame has a DatetimeIndex.
Args:
data_df (pd.DataFrame): DataFrame to check.
Returns:
bool: True if the DataFrame has a DatetimeIndex, False otherwise.
"""
if not isinstance(data_df.index, pd.DatetimeIndex):
print("Warning: Input DataFrame must have a DatetimeIndex.")
return False
agg_rules = {}
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.")
return False
return agg_rules
def aggregate_to_weekly(data_df: pd.DataFrame, weeks: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to weekly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the week.
'close' will be the last 'close' price of the week.
'high' will be the maximum 'high' price of the week.
'low' will be the minimum 'low' price of the week.
'volume' (if present) will be the sum of volumes for the week.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
weeks (int): The number of weeks to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to weekly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the week.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for weekly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to weekly frequency and apply aggregation rules
weekly_data = data_df.resample(f'{weeks}W').agg(agg_rules)
weekly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the week
if not weekly_data.empty and isinstance(weekly_data.index, pd.DatetimeIndex):
weekly_data.index = weekly_data.index.floor('W')
return weekly_data
def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
"""
Aggregates time-series financial data to daily OHLCV format.
@@ -99,9 +24,23 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
Raises:
ValueError: If the input DataFrame does not have a DatetimeIndex.
"""
agg_rules = check_data(data_df)
if not isinstance(data_df.index, pd.DatetimeIndex):
raise ValueError("Input DataFrame must have a DatetimeIndex.")
agg_rules = {}
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
if not agg_rules:
# Log a warning or raise an error if no relevant columns are found
# For now, returning an empty DataFrame with a message might be suitable for some cases
@@ -119,43 +58,3 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
daily_data.dropna(how='all', inplace=True)
return daily_data
def aggregate_to_hourly(data_df: pd.DataFrame, hours: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to hourly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the hour.
'close' will be the last 'close' price of the hour.
'high' will be the maximum 'high' price of the hour.
'low' will be the minimum 'low' price of the hour.
'volume' (if present) will be the sum of volumes for the hour.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
hours (int): The number of hours to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to hourly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the hour.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for hourly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to hourly frequency and apply aggregation rules
hourly_data = data_df.resample(f'{hours}H').agg(agg_rules)
hourly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the hour
if not hourly_data.empty and isinstance(hourly_data.index, pd.DatetimeIndex):
hourly_data.index = hourly_data.index.floor('H')
return hourly_data

View File

@@ -8,7 +8,6 @@ The `Analysis` module includes classes for calculating common technical indicato
- **Relative Strength Index (RSI)**: Implemented in `cycles/Analysis/rsi.py`.
- **Bollinger Bands**: Implemented in `cycles/Analysis/boillinger_band.py`.
- **Trading Strategies**: Implemented in `cycles/Analysis/strategies.py`.
## Class: `RSI`
@@ -77,65 +76,3 @@ Found in `cycles/Analysis/boillinger_band.py`.
- `data_df` (pd.DataFrame): DataFrame with price data. Must include the `price_column`.
- `price_column` (str, optional): The name of the column containing the price data (e.g., 'close'). Defaults to 'close'.
- **Returns**: `pd.DataFrame` - The original DataFrame with added columns: 'SMA', 'UpperBand', 'LowerBand'.
## Class: `Strategy`
Found in `cycles/Analysis/strategies.py`.
Implements various trading strategies using technical indicators.
### `__init__(self, config = None, logging = None)`
- **Description**: Initializes the Strategy class with configuration and logging.
- **Parameters**:
- `config` (dict): Configuration dictionary with strategy parameters. Must be provided.
- `logging` (logging object, optional): Logger for output messages. Defaults to None.
### `run(self, data, strategy_name)`
- **Description**: Executes a specified strategy on the provided data.
- **Parameters**:
- `data` (pd.DataFrame): DataFrame with price, indicator data, and market regime information.
- `strategy_name` (str): Name of the strategy to run. Currently supports "MarketRegimeStrategy".
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with boolean values.
### `no_strategy(self, data)`
- **Description**: Returns empty buy/sell conditions (all False).
- **Parameters**:
- `data` (pd.DataFrame): Input data DataFrame.
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with all False values.
### `rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5)`
- **Description**: Calculates Bollinger Bands on RSI values for signal confirmation.
- **Parameters**:
- `rsi` (pd.Series): Series containing RSI values.
- `window` (int, optional): The period for the moving average. Defaults to 14.
- `std_mult` (float, optional): Standard deviation multiplier for bands. Defaults to 1.5.
- **Returns**: Tuple of (oversold_condition, overbought_condition) as pandas Series with boolean values.
### `MarketRegimeStrategy(self, data)`
- **Description**: Advanced strategy combining Bollinger Bands, RSI, volume analysis, and market regime detection.
- **Parameters**:
- `data` (pd.DataFrame): DataFrame with price data, technical indicators, and market regime information.
- **Returns**: Tuple of (buy_condition, sell_condition) as pandas Series with boolean values.
#### Strategy Logic
This strategy adapts to different market conditions:
**Trending Market (Breakout Mode):**
- Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
- Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
**Sideways Market (Mean Reversion):**
- Buy: Price ≤ Lower Band ∧ RSI ≤ 40
- Sell: Price ≥ Upper Band ∧ RSI ≥ 60
When `SqueezeStrategy` is enabled, additional confirmation using RSI Bollinger Bands is required:
- For buy signals: RSI must be below its lower Bollinger Band
- For sell signals: RSI must be above its upper Bollinger Band
For sideways markets, volume contraction (< 0.7× 30D Avg) is also checked to avoid false signals.

View File

@@ -1,43 +0,0 @@
# Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection to adapt to Bitcoin's unique market conditions. Backtested on 2018-2025 BTC data, it achieved 58% annualized returns with 22% max drawdown.
---
## **Adaptive Parameters**
### **Core Configuration**
| Indicator | Trending Market | Sideways Market |
|-----------------|-------------------------|-------------------------|
| **Bollinger** | 20 SMA, 2.5σ | 20 SMA, 1.8σ |
| **RSI** | 14-period, 30/70 | 14-period, 40/60 |
| **Confirmation**| Volume > 20% 30D Avg | Bollinger Band Width <5%|
## Strategy Components
### 1. Market Regime Detection
### 2. Entry Conditions
***Trending Market (Breakout Mode):***
Buy: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike (≥1.5× 20D Avg)
Sell: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike
***Sideways Market (Mean Reversion):***
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
### **Enhanced Signals with RSI Bollinger Squeeze**
*Signal Boost*: Requires both price and RSI to breach their respective bands.
---
## **Risk Management System**
### Volatility-Adjusted Position Sizing
$$ \text{Position Size} = \frac{\text{Capital} \times 0.02}{\text{ATR}_{14} \times \text{Price}} $$
**Key Adjustments:**
1. Use narrower Bollinger Bands (1.8σ) to avoid whipsaws
2. Require RSI confirmation within 40-60 range
3. Add volume contraction filter

140
main.py
View File

@@ -10,7 +10,6 @@ import json
from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils
from cycles.backtest import Backtest
from cycles.supertrend import Supertrends
logging.basicConfig(
level=logging.INFO,
@@ -21,68 +20,6 @@ logging.basicConfig(
]
)
def default_init_strategy(data: Backtest.Data) -> Backtest.Data:
supertrends = Supertrends(data.df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
data.strategies["meta_trend"] = meta_trend
return data
def default_entry_strategy(data, df_index):
return data.strategies["meta_trend"][df_index - 1] != 1 and data.strategies["meta_trend"][df_index] == 1
def stop_loss_strategy(data):
stop_price = data.entry_price * (1 - data.strategies["stop_loss_pct"])
# Ensure index is sorted and is a DatetimeIndex
min1_index = data.min1_df.index
# Find the first index >= entry_time
start_candidates = min1_index[min1_index >= data.entry_time]
data.current_trade_min1_start_idx = start_candidates[0]
# Find the last index <= current_date
end_candidates = min1_index[min1_index <= data.current_date]
if len(end_candidates) == 0:
print("Warning: no end candidate here. Need to be checked")
return False, None
data.current_min1_end_idx = end_candidates[-1]
min1_slice = data.min1_df.loc[data.current_trade_min1_start_idx:data.current_min1_end_idx]
# print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}")
# print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}")
if (min1_slice['low'] <= stop_price).any():
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
return True, sell_price
return False, None
def default_exit_strategy(data: Backtest.Data, df_index):
if data.strategies["meta_trend"][df_index - 1] != 1 and \
data.strategies["meta_trend"][df_index] == -1:
return "META_TREND_EXIT_SIGNAL", data, None
stop_loss_result, sell_price = stop_loss_strategy(data)
if stop_loss_result:
data.strategies["current_trade_min1_start_idx"] = \
data.min1_df.index[data.min1_df.index <= data.current_date][-1]
return "STOP_LOSS", data, sell_price
return None, data, None
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
"""Process the entire timeframe with all stop loss values (no monthly split)"""
df = df.copy().reset_index(drop=True)
@@ -90,17 +27,13 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
results_rows = []
trade_rows = []
min1_df['timestamp'] = pd.to_datetime(min1_df.index) # need ?
for stop_loss_pct in stop_loss_pcts:
data = Backtest.Data(initial_usd, df, min1_df, default_init_strategy)
data.strategies["stop_loss_pct"] = stop_loss_pct
results = Backtest.run(
data,
default_entry_strategy,
default_exit_strategy,
debug
min1_df,
df,
initial_usd=initial_usd,
stop_loss_pct=stop_loss_pct,
debug=debug
)
n_trades = results["n_trades"]
trades = results.get('trades', [])
@@ -117,11 +50,9 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
@@ -130,13 +61,13 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
for trade in trades:
final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
total_fees_usd = sum(trade['fee_usd'] for trade in trades)
row = {
"timeframe": rule_name,
"stop_loss_pct": stop_loss_pct,
"n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'),
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
@@ -161,19 +92,26 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"type": trade.get("type"),
"fee_usd": trade.get("fee_usd"),
})
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
if debug:
for trade in trades:
print(trade)
if trade['type'] == 'STOP':
print(trade)
for trade in trades:
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
print("Large loss trade:", trade)
return results_rows, trade_rows
def process(timeframe_info, debug=False):
"""Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
from cycles.utils.storage import Storage # import inside function for safety
storage = Storage(logging=None) # or pass a logger if you want, but None is safest for multiprocessing
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
if rule == "1min":
if rule == "1T" or rule == "1min":
df = data_1min.copy()
else:
df = data_1min.resample(rule).agg({
@@ -184,7 +122,33 @@ def process(timeframe_info, debug=False):
'volume': 'sum'
}).dropna()
df = df.reset_index()
results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug)
if all_trade_rows:
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
# Prepare header
summary_fields = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "final_usd"]
summary_row = results_rows[0]
header_line = "\t".join(summary_fields) + "\n"
value_line = "\t".join(str(summary_row.get(f, "")) for f in summary_fields) + "\n"
# File name
tf = summary_row["timeframe"]
sl = summary_row["stop_loss_pct"]
sl_percent = int(round(sl * 100))
trades_filename = os.path.join(storage.results_dir, f"trades_{tf}_ST{sl_percent}pct.csv")
# Write header
with open(trades_filename, "w") as f:
f.write(header_line)
f.write(value_line)
# Now write trades (append mode, skip header)
with open(trades_filename, "a", newline="") as f:
import csv
writer = csv.DictWriter(f, fieldnames=trades_fieldnames)
writer.writeheader()
for trade in all_trade_rows:
writer.writerow({k: trade.get(k, "") for k in trades_fieldnames})
return results_rows, all_trade_rows
def aggregate_results(all_rows):
@@ -198,7 +162,6 @@ def aggregate_results(all_rows):
summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items():
n_months = len(rows)
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows])
@@ -243,17 +206,17 @@ if __name__ == "__main__":
# Default values (from config.json)
default_config = {
"start_date": "2024-05-15",
"start_date": "2025-05-01",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"initial_usd": 10000,
"timeframes": ["15min"],
"stop_loss_pcts": [0.03],
"timeframes": ["1D", "6h", "3h", "1h", "30m", "15m", "5m", "1m"],
"stop_loss_pcts": [0.01, 0.02, 0.03, 0.05],
}
if args.config:
with open(args.config, 'r') as f:
config = json.load(f)
elif not debug:
else:
print("No config file provided. Please enter the following values (press Enter to use default):")
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
@@ -275,9 +238,8 @@ if __name__ == "__main__":
'timeframes': timeframes,
'stop_loss_pcts': stop_loss_pcts,
}
else:
config = default_config
# Use config values
start_date = config['start_date']
stop_date = config['stop_date']
initial_usd = config['initial_usd']
@@ -311,6 +273,7 @@ if __name__ == "__main__":
if debug:
all_results_rows = []
all_trade_rows = []
for task in tasks:
results, trades = process(task, debug)
if results or trades:
@@ -336,7 +299,4 @@ if __name__ == "__main__":
]
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
storage.write_trades(all_trade_rows, trades_fieldnames)

View File

@@ -7,7 +7,6 @@ from cycles.utils.storage import Storage
from cycles.utils.data_utils import aggregate_to_daily
from cycles.Analysis.boillinger_band import BollingerBands
from cycles.Analysis.rsi import RSI
from cycles.Analysis.strategies import Strategy
logging.basicConfig(
level=logging.INFO,
@@ -19,34 +18,31 @@ logging.basicConfig(
)
config_minute = {
"start_date": "2023-01-01",
"stop_date": "2024-01-01",
"start_date": "2022-01-01",
"stop_date": "2023-01-01",
"data_file": "btcusd_1-min_data.csv"
}
config_day = {
"start_date": "2023-01-01",
"stop_date": "2024-01-01",
"start_date": "2022-01-01",
"stop_date": "2023-01-01",
"data_file": "btcusd_1-day_data.csv"
}
config_strategy = {
"bb_width": 0.05,
"bb_period": 20,
"rsi_period": 14,
"trending": {
"rsi_threshold": [30, 70],
"bb_std_dev_multiplier": 2.5,
},
"sideways": {
"rsi_threshold": [40, 60],
"bb_std_dev_multiplier": 1.8,
},
"strategy_name": "MarketRegimeStrategy",
"SqueezeStrategy": True
}
IS_DAY = True
def no_strategy(data_bb, data_with_rsi):
buy_condition = pd.Series([False] * len(data_bb), index=data_bb.index)
sell_condition = pd.Series([False] * len(data_bb), index=data_bb.index)
return buy_condition, sell_condition
def strategy_1(data_bb, data_with_rsi):
# Long trade: price move below lower Bollinger band and RSI go below 25
buy_condition = (data_bb['close'] < data_bb['LowerBand']) & (data_bb['RSI'] < 25)
# Short only: price move above top Bollinger band and RSI goes over 75
sell_condition = (data_bb['close'] > data_bb['UpperBand']) & (data_bb['RSI'] > 75)
return buy_condition, sell_condition
IS_DAY = False
if __name__ == "__main__":
@@ -66,10 +62,10 @@ if __name__ == "__main__":
else:
df_to_plot = data
bb = BollingerBands(config=config_strategy)
bb = BollingerBands(period=30, std_dev_multiplier=2.0)
data_bb = bb.calculate(df_to_plot.copy())
rsi_calculator = RSI(config=config_strategy)
rsi_calculator = RSI(period=13)
data_with_rsi = rsi_calculator.calculate(df_to_plot.copy(), price_column='close')
# Combine BB and RSI data into a single DataFrame for signal generation
@@ -82,8 +78,11 @@ if __name__ == "__main__":
data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float)
logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.")
strategy = Strategy(config=config_strategy)
buy_condition, sell_condition = strategy.run(data_bb, config_strategy["strategy_name"])
strategy = 1
if strategy == 1:
buy_condition, sell_condition = strategy_1(data_bb, data_with_rsi)
else:
buy_condition, sell_condition = no_strategy(data_bb, data_with_rsi)
buy_signals = data_bb[buy_condition]
sell_signals = data_bb[sell_condition]
@@ -91,7 +90,7 @@ if __name__ == "__main__":
# plot the data with seaborn library
if df_to_plot is not None and not df_to_plot.empty:
# Create a figure with two subplots, sharing the x-axis
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(16, 8), sharex=True)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 8), sharex=True)
# Plot 1: Close Price and Bollinger Bands
sns.lineplot(x=data_bb.index, y='close', data=data_bb, label='Close Price', ax=ax1)
@@ -109,9 +108,9 @@ if __name__ == "__main__":
# Plot 2: RSI
if 'RSI' in data_bb.columns: # Check data_bb now as it should contain RSI
sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (' + str(config_strategy["rsi_period"]) + ')', ax=ax2, color='purple')
ax2.axhline(config_strategy["trending"]["rsi_threshold"][1], color='red', linestyle='--', linewidth=0.8, label='Overbought (' + str(config_strategy["trending"]["rsi_threshold"][1]) + ')')
ax2.axhline(config_strategy['trending']['rsi_threshold'][0], color='green', linestyle='--', linewidth=0.8, label='Oversold (' + str(config_strategy['trending']['rsi_threshold'][0]) + ')')
sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (14)', ax=ax2, color='purple')
ax2.axhline(75, color='red', linestyle='--', linewidth=0.8, label='Overbought (75)')
ax2.axhline(25, color='green', linestyle='--', linewidth=0.8, label='Oversold (25)')
# Plot Buy/Sell signals on RSI chart
if not buy_signals.empty:
ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5)
@@ -125,14 +124,6 @@ if __name__ == "__main__":
else:
logging.info("RSI data not available for plotting.")
# Plot 3: BB Width
sns.lineplot(x=data_bb.index, y='BBWidth', data=data_bb, label='BB Width', ax=ax3)
sns.lineplot(x=data_bb.index, y='MarketRegime', data=data_bb, label='Market Regime (Sideways: 1, Trending: 0)', ax=ax3)
ax3.set_title('Bollinger Bands Width')
ax3.set_ylabel('BB Width')
ax3.legend()
ax3.grid(True)
plt.xlabel('Date') # Common X-axis label
fig.tight_layout() # Adjust layout to prevent overlapping titles/labels
plt.show()