Compare commits
2 Commits
736b278ee2
...
268bc33bbf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
268bc33bbf | ||
|
|
e286dd881a |
@ -1,12 +1,31 @@
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
from cycles.supertrend import Supertrends
|
||||
from cycles.market_fees import MarketFees
|
||||
|
||||
class Backtest:
|
||||
class Data:
|
||||
def __init__(self, initial_usd, df, min1_df, init_strategy_fields) -> None:
|
||||
self.initial_usd = initial_usd
|
||||
self.usd = initial_usd
|
||||
self.max_balance = initial_usd
|
||||
self.coin = 0
|
||||
self.position = 0
|
||||
self.entry_price = 0
|
||||
self.entry_time = None
|
||||
self.current_trade_min1_start_idx = None
|
||||
self.current_min1_end_idx = None
|
||||
self.price_open = None
|
||||
self.price_close = None
|
||||
self.current_date = None
|
||||
self.strategies = {}
|
||||
self.df = df
|
||||
self.min1_df = min1_df
|
||||
|
||||
self = init_strategy_fields(self)
|
||||
|
||||
@staticmethod
|
||||
def run(min1_df, df, initial_usd, stop_loss_pct, debug=False):
|
||||
def run(data, entry_strategy, exit_strategy, debug=False):
|
||||
"""
|
||||
Backtest a simple strategy using the meta supertrend (all three supertrends agree).
|
||||
Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss.
|
||||
@ -17,85 +36,43 @@ class Backtest:
|
||||
- stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%)
|
||||
- debug: bool, whether to print debug info
|
||||
"""
|
||||
_df = df.copy().reset_index(drop=True)
|
||||
_df['timestamp'] = pd.to_datetime(_df['timestamp'])
|
||||
|
||||
supertrends = Supertrends(_df, verbose=False)
|
||||
|
||||
supertrend_results_list = supertrends.calculate_supertrend_indicators()
|
||||
trends = [st['results']['trend'] for st in supertrend_results_list]
|
||||
trends_arr = np.stack(trends, axis=1)
|
||||
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
|
||||
trends_arr[:,0], 0)
|
||||
|
||||
position = 0 # 0 = no position, 1 = long
|
||||
entry_price = 0
|
||||
usd = initial_usd
|
||||
coin = 0
|
||||
|
||||
trade_log = []
|
||||
max_balance = initial_usd
|
||||
drawdowns = []
|
||||
trades = []
|
||||
entry_time = None
|
||||
current_trade_min1_start_idx = None
|
||||
|
||||
min1_df['timestamp'] = pd.to_datetime(min1_df.index)
|
||||
|
||||
for i in range(1, len(_df)):
|
||||
price_open = _df['open'].iloc[i]
|
||||
price_close = _df['close'].iloc[i]
|
||||
date = _df['timestamp'].iloc[i]
|
||||
prev_mt = meta_trend[i-1]
|
||||
curr_mt = meta_trend[i]
|
||||
for i in range(1, len(data.df)):
|
||||
data.price_open = data.df['open'].iloc[i]
|
||||
data.price_close = data.df['close'].iloc[i]
|
||||
|
||||
# Check stop loss if in position
|
||||
if position == 1:
|
||||
stop_loss_result = Backtest.check_stop_loss(
|
||||
min1_df,
|
||||
entry_time,
|
||||
date,
|
||||
entry_price,
|
||||
stop_loss_pct,
|
||||
coin,
|
||||
usd,
|
||||
debug,
|
||||
current_trade_min1_start_idx
|
||||
)
|
||||
if stop_loss_result is not None:
|
||||
trade_log_entry, current_trade_min1_start_idx, position, coin, entry_price = stop_loss_result
|
||||
trade_log.append(trade_log_entry)
|
||||
continue
|
||||
# Update the start index for next check
|
||||
current_trade_min1_start_idx = min1_df.index[min1_df.index <= date][-1]
|
||||
data.current_date = data.df['timestamp'].iloc[i]
|
||||
|
||||
# Entry: only if not in position and signal changes to 1
|
||||
if position == 0 and prev_mt != 1 and curr_mt == 1:
|
||||
entry_result = Backtest.handle_entry(usd, price_open, date)
|
||||
coin, entry_price, entry_time, usd, position, trade_log_entry = entry_result
|
||||
trade_log.append(trade_log_entry)
|
||||
|
||||
# Exit: only if in position and signal changes from 1 to -1
|
||||
elif position == 1 and prev_mt == 1 and curr_mt == -1:
|
||||
exit_result = Backtest.handle_exit(coin, price_open, entry_price, entry_time, date)
|
||||
usd, coin, position, entry_price, trade_log_entry = exit_result
|
||||
trade_log.append(trade_log_entry)
|
||||
if data.position == 0:
|
||||
if entry_strategy(data, i):
|
||||
data, entry_log_entry = Backtest.handle_entry(data)
|
||||
trade_log.append(entry_log_entry)
|
||||
elif data.position == 1:
|
||||
exit_test_results, data, sell_price = exit_strategy(data, i)
|
||||
|
||||
if exit_test_results is not None:
|
||||
data, exit_log_entry = Backtest.handle_exit(data, exit_test_results, sell_price)
|
||||
trade_log.append(exit_log_entry)
|
||||
|
||||
# Track drawdown
|
||||
balance = usd if position == 0 else coin * price_close
|
||||
if balance > max_balance:
|
||||
max_balance = balance
|
||||
drawdown = (max_balance - balance) / max_balance
|
||||
balance = data.usd if data.position == 0 else data.coin * data.price_close
|
||||
|
||||
if balance > data.max_balance:
|
||||
data.max_balance = balance
|
||||
|
||||
drawdown = (data.max_balance - balance) / data.max_balance
|
||||
drawdowns.append(drawdown)
|
||||
|
||||
# If still in position at end, sell at last close
|
||||
if position == 1:
|
||||
exit_result = Backtest.handle_exit(coin, _df['close'].iloc[-1], entry_price, entry_time, _df['timestamp'].iloc[-1])
|
||||
usd, coin, position, entry_price, trade_log_entry = exit_result
|
||||
trade_log.append(trade_log_entry)
|
||||
if data.position == 1:
|
||||
data, exit_log_entry = Backtest.handle_exit(data, "EOD", None)
|
||||
trade_log.append(exit_log_entry)
|
||||
|
||||
# Calculate statistics
|
||||
final_balance = usd
|
||||
final_balance = data.usd
|
||||
n_trades = len(trade_log)
|
||||
wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']]
|
||||
win_rate = len(wins) / n_trades if n_trades > 0 else 0
|
||||
@ -115,14 +92,14 @@ class Backtest:
|
||||
'entry': trade['entry'],
|
||||
'exit': trade['exit'],
|
||||
'profit_pct': profit_pct,
|
||||
'type': trade.get('type', 'SELL'),
|
||||
'fee_usd': trade.get('fee_usd')
|
||||
'type': trade['type'],
|
||||
'fee_usd': trade['fee_usd']
|
||||
})
|
||||
fee_usd = trade.get('fee_usd')
|
||||
total_fees_usd += fee_usd
|
||||
|
||||
results = {
|
||||
"initial_usd": initial_usd,
|
||||
"initial_usd": data.initial_usd,
|
||||
"final_usd": final_balance,
|
||||
"n_trades": n_trades,
|
||||
"win_rate": win_rate,
|
||||
@ -144,74 +121,45 @@ class Backtest:
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def check_stop_loss(min1_df, entry_time, date, entry_price, stop_loss_pct, coin, usd, debug, current_trade_min1_start_idx):
|
||||
stop_price = entry_price * (1 - stop_loss_pct)
|
||||
|
||||
if current_trade_min1_start_idx is None:
|
||||
current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0]
|
||||
current_min1_end_idx = min1_df.index[min1_df.index <= date][-1]
|
||||
|
||||
# Check all 1-minute candles in between for stop loss
|
||||
min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx]
|
||||
if (min1_slice['low'] <= stop_price).any():
|
||||
# Stop loss triggered, find the exact candle
|
||||
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
|
||||
# More realistic fill: if open < stop, fill at open, else at stop
|
||||
if stop_candle['open'] < stop_price:
|
||||
sell_price = stop_candle['open']
|
||||
else:
|
||||
sell_price = stop_price
|
||||
if debug:
|
||||
print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}")
|
||||
btc_to_sell = coin
|
||||
usd_gross = btc_to_sell * sell_price
|
||||
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
|
||||
trade_log_entry = {
|
||||
'type': 'STOP',
|
||||
'entry': entry_price,
|
||||
'exit': sell_price,
|
||||
'entry_time': entry_time,
|
||||
'exit_time': stop_candle.name,
|
||||
'fee_usd': exit_fee
|
||||
}
|
||||
# After stop loss, reset position and entry
|
||||
return trade_log_entry, None, 0, 0, 0
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def handle_entry(usd, price_open, date):
|
||||
entry_fee = MarketFees.calculate_okx_taker_maker_fee(usd, is_maker=False)
|
||||
usd_after_fee = usd - entry_fee
|
||||
coin = usd_after_fee / price_open
|
||||
entry_price = price_open
|
||||
entry_time = date
|
||||
usd = 0
|
||||
position = 1
|
||||
def handle_entry(data):
|
||||
entry_fee = MarketFees.calculate_okx_taker_maker_fee(data.usd, is_maker=False)
|
||||
usd_after_fee = data.usd - entry_fee
|
||||
|
||||
data.coin = usd_after_fee / data.price_open
|
||||
data.entry_price = data.price_open
|
||||
data.entry_time = data.current_date
|
||||
data.usd = 0
|
||||
data.position = 1
|
||||
|
||||
trade_log_entry = {
|
||||
'type': 'BUY',
|
||||
'entry': entry_price,
|
||||
'entry': data.entry_price,
|
||||
'exit': None,
|
||||
'entry_time': entry_time,
|
||||
'entry_time': data.entry_time,
|
||||
'exit_time': None,
|
||||
'fee_usd': entry_fee
|
||||
}
|
||||
return coin, entry_price, entry_time, usd, position, trade_log_entry
|
||||
return data, trade_log_entry
|
||||
|
||||
@staticmethod
|
||||
def handle_exit(coin, price_open, entry_price, entry_time, date):
|
||||
btc_to_sell = coin
|
||||
usd_gross = btc_to_sell * price_open
|
||||
def handle_exit(data, exit_reason, sell_price):
|
||||
btc_to_sell = data.coin
|
||||
exit_price = sell_price if sell_price is not None else data.price_open
|
||||
usd_gross = btc_to_sell * exit_price
|
||||
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
|
||||
usd = usd_gross - exit_fee
|
||||
trade_log_entry = {
|
||||
'type': 'SELL',
|
||||
'entry': entry_price,
|
||||
'exit': price_open,
|
||||
'entry_time': entry_time,
|
||||
'exit_time': date,
|
||||
|
||||
data.usd = usd_gross - exit_fee
|
||||
|
||||
exit_log_entry = {
|
||||
'type': exit_reason,
|
||||
'entry': data.entry_price,
|
||||
'exit': exit_price,
|
||||
'entry_time': data.entry_time,
|
||||
'exit_time': data.current_date,
|
||||
'fee_usd': exit_fee
|
||||
}
|
||||
coin = 0
|
||||
position = 0
|
||||
entry_price = 0
|
||||
return usd, coin, position, entry_price, trade_log_entry
|
||||
data.coin = 0
|
||||
data.position = 0
|
||||
data.entry_price = 0
|
||||
|
||||
return data, exit_log_entry
|
||||
@ -2,6 +2,6 @@ import pandas as pd
|
||||
|
||||
class MarketFees:
|
||||
@staticmethod
|
||||
def calculate_okx_taker_maker_fee(amount, is_maker=True):
|
||||
def calculate_okx_taker_maker_fee(amount, is_maker=True) -> float:
|
||||
fee_rate = 0.0008 if is_maker else 0.0010
|
||||
return amount * fee_rate
|
||||
|
||||
107
main.py
107
main.py
@ -6,11 +6,11 @@ import os
|
||||
import datetime
|
||||
import argparse
|
||||
import json
|
||||
import ast
|
||||
|
||||
from cycles.utils.storage import Storage
|
||||
from cycles.utils.system import SystemUtils
|
||||
from cycles.backtest import Backtest
|
||||
from cycles.supertrend import Supertrends
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@ -21,6 +21,68 @@ logging.basicConfig(
|
||||
]
|
||||
)
|
||||
|
||||
def default_init_strategy(data: Backtest.Data) -> Backtest.Data:
|
||||
supertrends = Supertrends(data.df, verbose=False)
|
||||
|
||||
supertrend_results_list = supertrends.calculate_supertrend_indicators()
|
||||
trends = [st['results']['trend'] for st in supertrend_results_list]
|
||||
trends_arr = np.stack(trends, axis=1)
|
||||
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
|
||||
trends_arr[:,0], 0)
|
||||
|
||||
data.strategies["meta_trend"] = meta_trend
|
||||
|
||||
return data
|
||||
|
||||
def default_entry_strategy(data, df_index):
|
||||
return data.strategies["meta_trend"][df_index - 1] != 1 and data.strategies["meta_trend"][df_index] == 1
|
||||
|
||||
def stop_loss_strategy(data):
|
||||
stop_price = data.entry_price * (1 - data.strategies["stop_loss_pct"])
|
||||
|
||||
# Ensure index is sorted and is a DatetimeIndex
|
||||
min1_index = data.min1_df.index
|
||||
|
||||
# Find the first index >= entry_time
|
||||
start_candidates = min1_index[min1_index >= data.entry_time]
|
||||
data.current_trade_min1_start_idx = start_candidates[0]
|
||||
|
||||
# Find the last index <= current_date
|
||||
end_candidates = min1_index[min1_index <= data.current_date]
|
||||
if len(end_candidates) == 0:
|
||||
print("Warning: no end candidate here. Need to be checked")
|
||||
return False, None
|
||||
data.current_min1_end_idx = end_candidates[-1]
|
||||
|
||||
min1_slice = data.min1_df.loc[data.current_trade_min1_start_idx:data.current_min1_end_idx]
|
||||
|
||||
# print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}")
|
||||
# print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}")
|
||||
|
||||
if (min1_slice['low'] <= stop_price).any():
|
||||
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
|
||||
|
||||
if stop_candle['open'] < stop_price:
|
||||
sell_price = stop_candle['open']
|
||||
else:
|
||||
sell_price = stop_price
|
||||
return True, sell_price
|
||||
|
||||
return False, None
|
||||
|
||||
def default_exit_strategy(data: Backtest.Data, df_index):
|
||||
if data.strategies["meta_trend"][df_index - 1] != 1 and \
|
||||
data.strategies["meta_trend"][df_index] == -1:
|
||||
return "META_TREND_EXIT_SIGNAL", data, None
|
||||
|
||||
stop_loss_result, sell_price = stop_loss_strategy(data)
|
||||
if stop_loss_result:
|
||||
data.strategies["current_trade_min1_start_idx"] = \
|
||||
data.min1_df.index[data.min1_df.index <= data.current_date][-1]
|
||||
return "STOP_LOSS", data, sell_price
|
||||
|
||||
return None, data, None
|
||||
|
||||
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
|
||||
"""Process the entire timeframe with all stop loss values (no monthly split)"""
|
||||
df = df.copy().reset_index(drop=True)
|
||||
@ -28,13 +90,17 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
|
||||
results_rows = []
|
||||
trade_rows = []
|
||||
|
||||
min1_df['timestamp'] = pd.to_datetime(min1_df.index) # need ?
|
||||
|
||||
for stop_loss_pct in stop_loss_pcts:
|
||||
data = Backtest.Data(initial_usd, df, min1_df, default_init_strategy)
|
||||
data.strategies["stop_loss_pct"] = stop_loss_pct
|
||||
|
||||
results = Backtest.run(
|
||||
min1_df,
|
||||
df,
|
||||
initial_usd=initial_usd,
|
||||
stop_loss_pct=stop_loss_pct,
|
||||
debug=debug
|
||||
data,
|
||||
default_entry_strategy,
|
||||
default_exit_strategy,
|
||||
debug
|
||||
)
|
||||
n_trades = results["n_trades"]
|
||||
trades = results.get('trades', [])
|
||||
@ -48,22 +114,29 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
|
||||
cumulative_profit = 0
|
||||
max_drawdown = 0
|
||||
peak = 0
|
||||
|
||||
for trade in trades:
|
||||
cumulative_profit += trade['profit_pct']
|
||||
|
||||
if cumulative_profit > peak:
|
||||
peak = cumulative_profit
|
||||
drawdown = peak - cumulative_profit
|
||||
|
||||
if drawdown > max_drawdown:
|
||||
max_drawdown = drawdown
|
||||
|
||||
final_usd = initial_usd
|
||||
|
||||
for trade in trades:
|
||||
final_usd *= (1 + trade['profit_pct'])
|
||||
|
||||
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
|
||||
|
||||
row = {
|
||||
"timeframe": rule_name,
|
||||
"stop_loss_pct": stop_loss_pct,
|
||||
"n_trades": n_trades,
|
||||
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'),
|
||||
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
|
||||
"win_rate": win_rate,
|
||||
"max_drawdown": max_drawdown,
|
||||
"avg_trade": avg_trade,
|
||||
@ -75,6 +148,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
|
||||
"total_fees_usd": total_fees_usd,
|
||||
}
|
||||
results_rows.append(row)
|
||||
|
||||
for trade in trades:
|
||||
trade_rows.append({
|
||||
"timeframe": rule_name,
|
||||
@ -88,20 +162,18 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
|
||||
"fee_usd": trade.get("fee_usd"),
|
||||
})
|
||||
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
|
||||
|
||||
if debug:
|
||||
for trade in trades:
|
||||
if trade['type'] == 'STOP':
|
||||
print(trade)
|
||||
for trade in trades:
|
||||
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
|
||||
print("Large loss trade:", trade)
|
||||
print(trade)
|
||||
|
||||
return results_rows, trade_rows
|
||||
|
||||
def process(timeframe_info, debug=False):
|
||||
"""Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
|
||||
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
|
||||
|
||||
if rule == "1T":
|
||||
if rule == "1min":
|
||||
df = data_1min.copy()
|
||||
else:
|
||||
df = data_1min.resample(rule).agg({
|
||||
@ -174,14 +246,14 @@ if __name__ == "__main__":
|
||||
"start_date": "2024-05-15",
|
||||
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
|
||||
"initial_usd": 10000,
|
||||
"timeframes": ["1D"],
|
||||
"stop_loss_pcts": [0.01, 0.02, 0.03],
|
||||
"timeframes": ["15min"],
|
||||
"stop_loss_pcts": [0.03],
|
||||
}
|
||||
|
||||
if args.config:
|
||||
with open(args.config, 'r') as f:
|
||||
config = json.load(f)
|
||||
else:
|
||||
elif not debug:
|
||||
print("No config file provided. Please enter the following values (press Enter to use default):")
|
||||
|
||||
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
|
||||
@ -203,8 +275,9 @@ if __name__ == "__main__":
|
||||
'timeframes': timeframes,
|
||||
'stop_loss_pcts': stop_loss_pcts,
|
||||
}
|
||||
else:
|
||||
config = default_config
|
||||
|
||||
# Use config values
|
||||
start_date = config['start_date']
|
||||
stop_date = config['stop_date']
|
||||
initial_usd = config['initial_usd']
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user