From e286dd881a45cf87ec9e9892566282b4f6cbdebc Mon Sep 17 00:00:00 2001 From: Simon Moisy Date: Thu, 22 May 2025 17:05:19 +0800 Subject: [PATCH] - Refactored the Backtest class for strategy modularity - Updated entry and exit strategy functions --- cycles/backtest.py | 210 ++++++++++++++++-------------------------- cycles/market_fees.py | 2 +- main.py | 107 +++++++++++++++++---- 3 files changed, 170 insertions(+), 149 deletions(-) diff --git a/cycles/backtest.py b/cycles/backtest.py index 756a699..ec27abc 100644 --- a/cycles/backtest.py +++ b/cycles/backtest.py @@ -1,12 +1,31 @@ import pandas as pd import numpy as np -from cycles.supertrend import Supertrends from cycles.market_fees import MarketFees class Backtest: + class Data: + def __init__(self, initial_usd, df, min1_df, init_strategy_fields) -> None: + self.initial_usd = initial_usd + self.usd = initial_usd + self.max_balance = initial_usd + self.coin = 0 + self.position = 0 + self.entry_price = 0 + self.entry_time = None + self.current_trade_min1_start_idx = None + self.current_min1_end_idx = None + self.price_open = None + self.price_close = None + self.current_date = None + self.strategies = {} + self.df = df + self.min1_df = min1_df + + self = init_strategy_fields(self) + @staticmethod - def run(min1_df, df, initial_usd, stop_loss_pct, debug=False): + def run(data, entry_strategy, exit_strategy, debug=False): """ Backtest a simple strategy using the meta supertrend (all three supertrends agree). Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss. @@ -17,85 +36,43 @@ class Backtest: - stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%) - debug: bool, whether to print debug info """ - _df = df.copy().reset_index(drop=True) - _df['timestamp'] = pd.to_datetime(_df['timestamp']) - - supertrends = Supertrends(_df, verbose=False) - - supertrend_results_list = supertrends.calculate_supertrend_indicators() - trends = [st['results']['trend'] for st in supertrend_results_list] - trends_arr = np.stack(trends, axis=1) - meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), - trends_arr[:,0], 0) - - position = 0 # 0 = no position, 1 = long - entry_price = 0 - usd = initial_usd - coin = 0 - trade_log = [] - max_balance = initial_usd drawdowns = [] trades = [] - entry_time = None - current_trade_min1_start_idx = None - min1_df['timestamp'] = pd.to_datetime(min1_df.index) - - for i in range(1, len(_df)): - price_open = _df['open'].iloc[i] - price_close = _df['close'].iloc[i] - date = _df['timestamp'].iloc[i] - prev_mt = meta_trend[i-1] - curr_mt = meta_trend[i] + for i in range(1, len(data.df)): + data.price_open = data.df['open'].iloc[i] + data.price_close = data.df['close'].iloc[i] - # Check stop loss if in position - if position == 1: - stop_loss_result = Backtest.check_stop_loss( - min1_df, - entry_time, - date, - entry_price, - stop_loss_pct, - coin, - usd, - debug, - current_trade_min1_start_idx - ) - if stop_loss_result is not None: - trade_log_entry, current_trade_min1_start_idx, position, coin, entry_price = stop_loss_result - trade_log.append(trade_log_entry) - continue - # Update the start index for next check - current_trade_min1_start_idx = min1_df.index[min1_df.index <= date][-1] + data.current_date = data.df['timestamp'].iloc[i] - # Entry: only if not in position and signal changes to 1 - if position == 0 and prev_mt != 1 and curr_mt == 1: - entry_result = Backtest.handle_entry(usd, price_open, date) - coin, entry_price, entry_time, usd, position, trade_log_entry = entry_result - trade_log.append(trade_log_entry) - - # Exit: only if in position and signal changes from 1 to -1 - elif position == 1 and prev_mt == 1 and curr_mt == -1: - exit_result = Backtest.handle_exit(coin, price_open, entry_price, entry_time, date) - usd, coin, position, entry_price, trade_log_entry = exit_result - trade_log.append(trade_log_entry) + if data.position == 0: + if entry_strategy(data, i): + data, entry_log_entry = Backtest.handle_entry(data) + trade_log.append(entry_log_entry) + elif data.position == 1: + exit_test_results, data, sell_price = exit_strategy(data, i) + + if exit_test_results is not None: + data, exit_log_entry = Backtest.handle_exit(data, exit_test_results, sell_price) + trade_log.append(exit_log_entry) # Track drawdown - balance = usd if position == 0 else coin * price_close - if balance > max_balance: - max_balance = balance - drawdown = (max_balance - balance) / max_balance + balance = data.usd if data.position == 0 else data.coin * data.price_close + + if balance > data.max_balance: + data.max_balance = balance + + drawdown = (data.max_balance - balance) / data.max_balance drawdowns.append(drawdown) # If still in position at end, sell at last close - if position == 1: - exit_result = Backtest.handle_exit(coin, _df['close'].iloc[-1], entry_price, entry_time, _df['timestamp'].iloc[-1]) - usd, coin, position, entry_price, trade_log_entry = exit_result - trade_log.append(trade_log_entry) + if data.position == 1: + data, exit_log_entry = Backtest.handle_exit(data, "EOD", None) + trade_log.append(exit_log_entry) # Calculate statistics - final_balance = usd + final_balance = data.usd n_trades = len(trade_log) wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']] win_rate = len(wins) / n_trades if n_trades > 0 else 0 @@ -115,14 +92,14 @@ class Backtest: 'entry': trade['entry'], 'exit': trade['exit'], 'profit_pct': profit_pct, - 'type': trade.get('type', 'SELL'), - 'fee_usd': trade.get('fee_usd') + 'type': trade['type'], + 'fee_usd': trade['fee_usd'] }) fee_usd = trade.get('fee_usd') total_fees_usd += fee_usd results = { - "initial_usd": initial_usd, + "initial_usd": data.initial_usd, "final_usd": final_balance, "n_trades": n_trades, "win_rate": win_rate, @@ -144,74 +121,45 @@ class Backtest: return results @staticmethod - def check_stop_loss(min1_df, entry_time, date, entry_price, stop_loss_pct, coin, usd, debug, current_trade_min1_start_idx): - stop_price = entry_price * (1 - stop_loss_pct) - - if current_trade_min1_start_idx is None: - current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0] - current_min1_end_idx = min1_df.index[min1_df.index <= date][-1] - - # Check all 1-minute candles in between for stop loss - min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx] - if (min1_slice['low'] <= stop_price).any(): - # Stop loss triggered, find the exact candle - stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0] - # More realistic fill: if open < stop, fill at open, else at stop - if stop_candle['open'] < stop_price: - sell_price = stop_candle['open'] - else: - sell_price = stop_price - if debug: - print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}") - btc_to_sell = coin - usd_gross = btc_to_sell * sell_price - exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False) - trade_log_entry = { - 'type': 'STOP', - 'entry': entry_price, - 'exit': sell_price, - 'entry_time': entry_time, - 'exit_time': stop_candle.name, - 'fee_usd': exit_fee - } - # After stop loss, reset position and entry - return trade_log_entry, None, 0, 0, 0 - return None - - @staticmethod - def handle_entry(usd, price_open, date): - entry_fee = MarketFees.calculate_okx_taker_maker_fee(usd, is_maker=False) - usd_after_fee = usd - entry_fee - coin = usd_after_fee / price_open - entry_price = price_open - entry_time = date - usd = 0 - position = 1 + def handle_entry(data): + entry_fee = MarketFees.calculate_okx_taker_maker_fee(data.usd, is_maker=False) + usd_after_fee = data.usd - entry_fee + + data.coin = usd_after_fee / data.price_open + data.entry_price = data.price_open + data.entry_time = data.current_date + data.usd = 0 + data.position = 1 + trade_log_entry = { 'type': 'BUY', - 'entry': entry_price, + 'entry': data.entry_price, 'exit': None, - 'entry_time': entry_time, + 'entry_time': data.entry_time, 'exit_time': None, 'fee_usd': entry_fee } - return coin, entry_price, entry_time, usd, position, trade_log_entry + return data, trade_log_entry @staticmethod - def handle_exit(coin, price_open, entry_price, entry_time, date): - btc_to_sell = coin - usd_gross = btc_to_sell * price_open + def handle_exit(data, exit_reason, sell_price): + btc_to_sell = data.coin + exit_price = sell_price if sell_price is not None else data.price_open + usd_gross = btc_to_sell * exit_price exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False) - usd = usd_gross - exit_fee - trade_log_entry = { - 'type': 'SELL', - 'entry': entry_price, - 'exit': price_open, - 'entry_time': entry_time, - 'exit_time': date, + + data.usd = usd_gross - exit_fee + + exit_log_entry = { + 'type': exit_reason, + 'entry': data.entry_price, + 'exit': exit_price, + 'entry_time': data.entry_time, + 'exit_time': data.current_date, 'fee_usd': exit_fee } - coin = 0 - position = 0 - entry_price = 0 - return usd, coin, position, entry_price, trade_log_entry \ No newline at end of file + data.coin = 0 + data.position = 0 + data.entry_price = 0 + + return data, exit_log_entry \ No newline at end of file diff --git a/cycles/market_fees.py b/cycles/market_fees.py index f1d6053..79c99d3 100644 --- a/cycles/market_fees.py +++ b/cycles/market_fees.py @@ -2,6 +2,6 @@ import pandas as pd class MarketFees: @staticmethod - def calculate_okx_taker_maker_fee(amount, is_maker=True): + def calculate_okx_taker_maker_fee(amount, is_maker=True) -> float: fee_rate = 0.0008 if is_maker else 0.0010 return amount * fee_rate diff --git a/main.py b/main.py index b80ec37..2704440 100644 --- a/main.py +++ b/main.py @@ -6,11 +6,11 @@ import os import datetime import argparse import json -import ast from cycles.utils.storage import Storage from cycles.utils.system import SystemUtils from cycles.backtest import Backtest +from cycles.supertrend import Supertrends logging.basicConfig( level=logging.INFO, @@ -21,6 +21,68 @@ logging.basicConfig( ] ) +def default_init_strategy(data: Backtest.Data) -> Backtest.Data: + supertrends = Supertrends(data.df, verbose=False) + + supertrend_results_list = supertrends.calculate_supertrend_indicators() + trends = [st['results']['trend'] for st in supertrend_results_list] + trends_arr = np.stack(trends, axis=1) + meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), + trends_arr[:,0], 0) + + data.strategies["meta_trend"] = meta_trend + + return data + +def default_entry_strategy(data, df_index): + return data.strategies["meta_trend"][df_index - 1] != 1 and data.strategies["meta_trend"][df_index] == 1 + +def stop_loss_strategy(data): + stop_price = data.entry_price * (1 - data.strategies["stop_loss_pct"]) + + # Ensure index is sorted and is a DatetimeIndex + min1_index = data.min1_df.index + + # Find the first index >= entry_time + start_candidates = min1_index[min1_index >= data.entry_time] + data.current_trade_min1_start_idx = start_candidates[0] + + # Find the last index <= current_date + end_candidates = min1_index[min1_index <= data.current_date] + if len(end_candidates) == 0: + print("Warning: no end candidate here. Need to be checked") + return False, None + data.current_min1_end_idx = end_candidates[-1] + + min1_slice = data.min1_df.loc[data.current_trade_min1_start_idx:data.current_min1_end_idx] + + # print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}") + # print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}") + + if (min1_slice['low'] <= stop_price).any(): + stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0] + + if stop_candle['open'] < stop_price: + sell_price = stop_candle['open'] + else: + sell_price = stop_price + return True, sell_price + + return False, None + +def default_exit_strategy(data: Backtest.Data, df_index): + if data.strategies["meta_trend"][df_index - 1] != 1 and \ + data.strategies["meta_trend"][df_index] == -1: + return "META_TREND_EXIT_SIGNAL", data, None + + stop_loss_result, sell_price = stop_loss_strategy(data) + if stop_loss_result: + data.strategies["current_trade_min1_start_idx"] = \ + data.min1_df.index[data.min1_df.index <= data.current_date][-1] + return "STOP_LOSS", data, sell_price + + return None, data, None + def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False): """Process the entire timeframe with all stop loss values (no monthly split)""" df = df.copy().reset_index(drop=True) @@ -28,13 +90,17 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, results_rows = [] trade_rows = [] + min1_df['timestamp'] = pd.to_datetime(min1_df.index) # need ? + for stop_loss_pct in stop_loss_pcts: + data = Backtest.Data(initial_usd, df, min1_df, default_init_strategy) + data.strategies["stop_loss_pct"] = stop_loss_pct + results = Backtest.run( - min1_df, - df, - initial_usd=initial_usd, - stop_loss_pct=stop_loss_pct, - debug=debug + data, + default_entry_strategy, + default_exit_strategy, + debug ) n_trades = results["n_trades"] trades = results.get('trades', []) @@ -48,22 +114,29 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, cumulative_profit = 0 max_drawdown = 0 peak = 0 + for trade in trades: cumulative_profit += trade['profit_pct'] + if cumulative_profit > peak: peak = cumulative_profit drawdown = peak - cumulative_profit + if drawdown > max_drawdown: max_drawdown = drawdown + final_usd = initial_usd + for trade in trades: final_usd *= (1 + trade['profit_pct']) + total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades) + row = { "timeframe": rule_name, "stop_loss_pct": stop_loss_pct, "n_trades": n_trades, - "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'), + "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'), "win_rate": win_rate, "max_drawdown": max_drawdown, "avg_trade": avg_trade, @@ -75,6 +148,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, "total_fees_usd": total_fees_usd, } results_rows.append(row) + for trade in trades: trade_rows.append({ "timeframe": rule_name, @@ -88,20 +162,18 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, "fee_usd": trade.get("fee_usd"), }) logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}") + if debug: for trade in trades: - if trade['type'] == 'STOP': - print(trade) - for trade in trades: - if trade['profit_pct'] < -0.09: # or whatever is close to -0.10 - print("Large loss trade:", trade) + print(trade) + return results_rows, trade_rows def process(timeframe_info, debug=False): """Process a single (timeframe, stop_loss_pct) combination (no monthly split)""" rule, data_1min, stop_loss_pct, initial_usd = timeframe_info - if rule == "1T": + if rule == "1min": df = data_1min.copy() else: df = data_1min.resample(rule).agg({ @@ -174,14 +246,14 @@ if __name__ == "__main__": "start_date": "2024-05-15", "stop_date": datetime.datetime.today().strftime('%Y-%m-%d'), "initial_usd": 10000, - "timeframes": ["1D"], - "stop_loss_pcts": [0.01, 0.02, 0.03], + "timeframes": ["15min"], + "stop_loss_pcts": [0.03], } if args.config: with open(args.config, 'r') as f: config = json.load(f) - else: + elif not debug: print("No config file provided. Please enter the following values (press Enter to use default):") start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date'] @@ -203,8 +275,9 @@ if __name__ == "__main__": 'timeframes': timeframes, 'stop_loss_pcts': stop_loss_pcts, } + else: + config = default_config - # Use config values start_date = config['start_date'] stop_date = config['stop_date'] initial_usd = config['initial_usd']