- Refactored the Backtest class for strategy modularity

- Updated entry and exit strategy functions
This commit is contained in:
Simon Moisy 2025-05-22 17:05:19 +08:00
parent f4873c56ff
commit e286dd881a
3 changed files with 170 additions and 149 deletions

View File

@ -1,12 +1,31 @@
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from cycles.supertrend import Supertrends
from cycles.market_fees import MarketFees from cycles.market_fees import MarketFees
class Backtest: class Backtest:
class Data:
def __init__(self, initial_usd, df, min1_df, init_strategy_fields) -> None:
self.initial_usd = initial_usd
self.usd = initial_usd
self.max_balance = initial_usd
self.coin = 0
self.position = 0
self.entry_price = 0
self.entry_time = None
self.current_trade_min1_start_idx = None
self.current_min1_end_idx = None
self.price_open = None
self.price_close = None
self.current_date = None
self.strategies = {}
self.df = df
self.min1_df = min1_df
self = init_strategy_fields(self)
@staticmethod @staticmethod
def run(min1_df, df, initial_usd, stop_loss_pct, debug=False): def run(data, entry_strategy, exit_strategy, debug=False):
""" """
Backtest a simple strategy using the meta supertrend (all three supertrends agree). Backtest a simple strategy using the meta supertrend (all three supertrends agree).
Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss. Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss.
@ -17,85 +36,43 @@ class Backtest:
- stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%) - stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%)
- debug: bool, whether to print debug info - debug: bool, whether to print debug info
""" """
_df = df.copy().reset_index(drop=True)
_df['timestamp'] = pd.to_datetime(_df['timestamp'])
supertrends = Supertrends(_df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
position = 0 # 0 = no position, 1 = long
entry_price = 0
usd = initial_usd
coin = 0
trade_log = [] trade_log = []
max_balance = initial_usd
drawdowns = [] drawdowns = []
trades = [] trades = []
entry_time = None
current_trade_min1_start_idx = None
min1_df['timestamp'] = pd.to_datetime(min1_df.index) for i in range(1, len(data.df)):
data.price_open = data.df['open'].iloc[i]
for i in range(1, len(_df)): data.price_close = data.df['close'].iloc[i]
price_open = _df['open'].iloc[i]
price_close = _df['close'].iloc[i]
date = _df['timestamp'].iloc[i]
prev_mt = meta_trend[i-1]
curr_mt = meta_trend[i]
# Check stop loss if in position data.current_date = data.df['timestamp'].iloc[i]
if position == 1:
stop_loss_result = Backtest.check_stop_loss(
min1_df,
entry_time,
date,
entry_price,
stop_loss_pct,
coin,
usd,
debug,
current_trade_min1_start_idx
)
if stop_loss_result is not None:
trade_log_entry, current_trade_min1_start_idx, position, coin, entry_price = stop_loss_result
trade_log.append(trade_log_entry)
continue
# Update the start index for next check
current_trade_min1_start_idx = min1_df.index[min1_df.index <= date][-1]
# Entry: only if not in position and signal changes to 1 if data.position == 0:
if position == 0 and prev_mt != 1 and curr_mt == 1: if entry_strategy(data, i):
entry_result = Backtest.handle_entry(usd, price_open, date) data, entry_log_entry = Backtest.handle_entry(data)
coin, entry_price, entry_time, usd, position, trade_log_entry = entry_result trade_log.append(entry_log_entry)
trade_log.append(trade_log_entry) elif data.position == 1:
exit_test_results, data, sell_price = exit_strategy(data, i)
# Exit: only if in position and signal changes from 1 to -1
elif position == 1 and prev_mt == 1 and curr_mt == -1: if exit_test_results is not None:
exit_result = Backtest.handle_exit(coin, price_open, entry_price, entry_time, date) data, exit_log_entry = Backtest.handle_exit(data, exit_test_results, sell_price)
usd, coin, position, entry_price, trade_log_entry = exit_result trade_log.append(exit_log_entry)
trade_log.append(trade_log_entry)
# Track drawdown # Track drawdown
balance = usd if position == 0 else coin * price_close balance = data.usd if data.position == 0 else data.coin * data.price_close
if balance > max_balance:
max_balance = balance if balance > data.max_balance:
drawdown = (max_balance - balance) / max_balance data.max_balance = balance
drawdown = (data.max_balance - balance) / data.max_balance
drawdowns.append(drawdown) drawdowns.append(drawdown)
# If still in position at end, sell at last close # If still in position at end, sell at last close
if position == 1: if data.position == 1:
exit_result = Backtest.handle_exit(coin, _df['close'].iloc[-1], entry_price, entry_time, _df['timestamp'].iloc[-1]) data, exit_log_entry = Backtest.handle_exit(data, "EOD", None)
usd, coin, position, entry_price, trade_log_entry = exit_result trade_log.append(exit_log_entry)
trade_log.append(trade_log_entry)
# Calculate statistics # Calculate statistics
final_balance = usd final_balance = data.usd
n_trades = len(trade_log) n_trades = len(trade_log)
wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']] wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']]
win_rate = len(wins) / n_trades if n_trades > 0 else 0 win_rate = len(wins) / n_trades if n_trades > 0 else 0
@ -115,14 +92,14 @@ class Backtest:
'entry': trade['entry'], 'entry': trade['entry'],
'exit': trade['exit'], 'exit': trade['exit'],
'profit_pct': profit_pct, 'profit_pct': profit_pct,
'type': trade.get('type', 'SELL'), 'type': trade['type'],
'fee_usd': trade.get('fee_usd') 'fee_usd': trade['fee_usd']
}) })
fee_usd = trade.get('fee_usd') fee_usd = trade.get('fee_usd')
total_fees_usd += fee_usd total_fees_usd += fee_usd
results = { results = {
"initial_usd": initial_usd, "initial_usd": data.initial_usd,
"final_usd": final_balance, "final_usd": final_balance,
"n_trades": n_trades, "n_trades": n_trades,
"win_rate": win_rate, "win_rate": win_rate,
@ -144,74 +121,45 @@ class Backtest:
return results return results
@staticmethod @staticmethod
def check_stop_loss(min1_df, entry_time, date, entry_price, stop_loss_pct, coin, usd, debug, current_trade_min1_start_idx): def handle_entry(data):
stop_price = entry_price * (1 - stop_loss_pct) entry_fee = MarketFees.calculate_okx_taker_maker_fee(data.usd, is_maker=False)
usd_after_fee = data.usd - entry_fee
if current_trade_min1_start_idx is None:
current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0] data.coin = usd_after_fee / data.price_open
current_min1_end_idx = min1_df.index[min1_df.index <= date][-1] data.entry_price = data.price_open
data.entry_time = data.current_date
# Check all 1-minute candles in between for stop loss data.usd = 0
min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx] data.position = 1
if (min1_slice['low'] <= stop_price).any():
# Stop loss triggered, find the exact candle
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
# More realistic fill: if open < stop, fill at open, else at stop
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
if debug:
print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}")
btc_to_sell = coin
usd_gross = btc_to_sell * sell_price
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
trade_log_entry = {
'type': 'STOP',
'entry': entry_price,
'exit': sell_price,
'entry_time': entry_time,
'exit_time': stop_candle.name,
'fee_usd': exit_fee
}
# After stop loss, reset position and entry
return trade_log_entry, None, 0, 0, 0
return None
@staticmethod
def handle_entry(usd, price_open, date):
entry_fee = MarketFees.calculate_okx_taker_maker_fee(usd, is_maker=False)
usd_after_fee = usd - entry_fee
coin = usd_after_fee / price_open
entry_price = price_open
entry_time = date
usd = 0
position = 1
trade_log_entry = { trade_log_entry = {
'type': 'BUY', 'type': 'BUY',
'entry': entry_price, 'entry': data.entry_price,
'exit': None, 'exit': None,
'entry_time': entry_time, 'entry_time': data.entry_time,
'exit_time': None, 'exit_time': None,
'fee_usd': entry_fee 'fee_usd': entry_fee
} }
return coin, entry_price, entry_time, usd, position, trade_log_entry return data, trade_log_entry
@staticmethod @staticmethod
def handle_exit(coin, price_open, entry_price, entry_time, date): def handle_exit(data, exit_reason, sell_price):
btc_to_sell = coin btc_to_sell = data.coin
usd_gross = btc_to_sell * price_open exit_price = sell_price if sell_price is not None else data.price_open
usd_gross = btc_to_sell * exit_price
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False) exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
usd = usd_gross - exit_fee
trade_log_entry = { data.usd = usd_gross - exit_fee
'type': 'SELL',
'entry': entry_price, exit_log_entry = {
'exit': price_open, 'type': exit_reason,
'entry_time': entry_time, 'entry': data.entry_price,
'exit_time': date, 'exit': exit_price,
'entry_time': data.entry_time,
'exit_time': data.current_date,
'fee_usd': exit_fee 'fee_usd': exit_fee
} }
coin = 0 data.coin = 0
position = 0 data.position = 0
entry_price = 0 data.entry_price = 0
return usd, coin, position, entry_price, trade_log_entry
return data, exit_log_entry

View File

@ -2,6 +2,6 @@ import pandas as pd
class MarketFees: class MarketFees:
@staticmethod @staticmethod
def calculate_okx_taker_maker_fee(amount, is_maker=True): def calculate_okx_taker_maker_fee(amount, is_maker=True) -> float:
fee_rate = 0.0008 if is_maker else 0.0010 fee_rate = 0.0008 if is_maker else 0.0010
return amount * fee_rate return amount * fee_rate

107
main.py
View File

@ -6,11 +6,11 @@ import os
import datetime import datetime
import argparse import argparse
import json import json
import ast
from cycles.utils.storage import Storage from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils from cycles.utils.system import SystemUtils
from cycles.backtest import Backtest from cycles.backtest import Backtest
from cycles.supertrend import Supertrends
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@ -21,6 +21,68 @@ logging.basicConfig(
] ]
) )
def default_init_strategy(data: Backtest.Data) -> Backtest.Data:
supertrends = Supertrends(data.df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
data.strategies["meta_trend"] = meta_trend
return data
def default_entry_strategy(data, df_index):
return data.strategies["meta_trend"][df_index - 1] != 1 and data.strategies["meta_trend"][df_index] == 1
def stop_loss_strategy(data):
stop_price = data.entry_price * (1 - data.strategies["stop_loss_pct"])
# Ensure index is sorted and is a DatetimeIndex
min1_index = data.min1_df.index
# Find the first index >= entry_time
start_candidates = min1_index[min1_index >= data.entry_time]
data.current_trade_min1_start_idx = start_candidates[0]
# Find the last index <= current_date
end_candidates = min1_index[min1_index <= data.current_date]
if len(end_candidates) == 0:
print("Warning: no end candidate here. Need to be checked")
return False, None
data.current_min1_end_idx = end_candidates[-1]
min1_slice = data.min1_df.loc[data.current_trade_min1_start_idx:data.current_min1_end_idx]
# print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}")
# print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}")
if (min1_slice['low'] <= stop_price).any():
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
return True, sell_price
return False, None
def default_exit_strategy(data: Backtest.Data, df_index):
if data.strategies["meta_trend"][df_index - 1] != 1 and \
data.strategies["meta_trend"][df_index] == -1:
return "META_TREND_EXIT_SIGNAL", data, None
stop_loss_result, sell_price = stop_loss_strategy(data)
if stop_loss_result:
data.strategies["current_trade_min1_start_idx"] = \
data.min1_df.index[data.min1_df.index <= data.current_date][-1]
return "STOP_LOSS", data, sell_price
return None, data, None
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False): def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
"""Process the entire timeframe with all stop loss values (no monthly split)""" """Process the entire timeframe with all stop loss values (no monthly split)"""
df = df.copy().reset_index(drop=True) df = df.copy().reset_index(drop=True)
@ -28,13 +90,17 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
results_rows = [] results_rows = []
trade_rows = [] trade_rows = []
min1_df['timestamp'] = pd.to_datetime(min1_df.index) # need ?
for stop_loss_pct in stop_loss_pcts: for stop_loss_pct in stop_loss_pcts:
data = Backtest.Data(initial_usd, df, min1_df, default_init_strategy)
data.strategies["stop_loss_pct"] = stop_loss_pct
results = Backtest.run( results = Backtest.run(
min1_df, data,
df, default_entry_strategy,
initial_usd=initial_usd, default_exit_strategy,
stop_loss_pct=stop_loss_pct, debug
debug=debug
) )
n_trades = results["n_trades"] n_trades = results["n_trades"]
trades = results.get('trades', []) trades = results.get('trades', [])
@ -48,22 +114,29 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
cumulative_profit = 0 cumulative_profit = 0
max_drawdown = 0 max_drawdown = 0
peak = 0 peak = 0
for trade in trades: for trade in trades:
cumulative_profit += trade['profit_pct'] cumulative_profit += trade['profit_pct']
if cumulative_profit > peak: if cumulative_profit > peak:
peak = cumulative_profit peak = cumulative_profit
drawdown = peak - cumulative_profit drawdown = peak - cumulative_profit
if drawdown > max_drawdown: if drawdown > max_drawdown:
max_drawdown = drawdown max_drawdown = drawdown
final_usd = initial_usd final_usd = initial_usd
for trade in trades: for trade in trades:
final_usd *= (1 + trade['profit_pct']) final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades) total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
row = { row = {
"timeframe": rule_name, "timeframe": rule_name,
"stop_loss_pct": stop_loss_pct, "stop_loss_pct": stop_loss_pct,
"n_trades": n_trades, "n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'), "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
"win_rate": win_rate, "win_rate": win_rate,
"max_drawdown": max_drawdown, "max_drawdown": max_drawdown,
"avg_trade": avg_trade, "avg_trade": avg_trade,
@ -75,6 +148,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"total_fees_usd": total_fees_usd, "total_fees_usd": total_fees_usd,
} }
results_rows.append(row) results_rows.append(row)
for trade in trades: for trade in trades:
trade_rows.append({ trade_rows.append({
"timeframe": rule_name, "timeframe": rule_name,
@ -88,20 +162,18 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"fee_usd": trade.get("fee_usd"), "fee_usd": trade.get("fee_usd"),
}) })
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}") logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
if debug: if debug:
for trade in trades: for trade in trades:
if trade['type'] == 'STOP': print(trade)
print(trade)
for trade in trades:
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
print("Large loss trade:", trade)
return results_rows, trade_rows return results_rows, trade_rows
def process(timeframe_info, debug=False): def process(timeframe_info, debug=False):
"""Process a single (timeframe, stop_loss_pct) combination (no monthly split)""" """Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
if rule == "1T": if rule == "1min":
df = data_1min.copy() df = data_1min.copy()
else: else:
df = data_1min.resample(rule).agg({ df = data_1min.resample(rule).agg({
@ -174,14 +246,14 @@ if __name__ == "__main__":
"start_date": "2024-05-15", "start_date": "2024-05-15",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'), "stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"initial_usd": 10000, "initial_usd": 10000,
"timeframes": ["1D"], "timeframes": ["15min"],
"stop_loss_pcts": [0.01, 0.02, 0.03], "stop_loss_pcts": [0.03],
} }
if args.config: if args.config:
with open(args.config, 'r') as f: with open(args.config, 'r') as f:
config = json.load(f) config = json.load(f)
else: elif not debug:
print("No config file provided. Please enter the following values (press Enter to use default):") print("No config file provided. Please enter the following values (press Enter to use default):")
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date'] start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
@ -203,8 +275,9 @@ if __name__ == "__main__":
'timeframes': timeframes, 'timeframes': timeframes,
'stop_loss_pcts': stop_loss_pcts, 'stop_loss_pcts': stop_loss_pcts,
} }
else:
config = default_config
# Use config values
start_date = config['start_date'] start_date = config['start_date']
stop_date = config['stop_date'] stop_date = config['stop_date']
initial_usd = config['initial_usd'] initial_usd = config['initial_usd']