342 lines
13 KiB
Python
342 lines
13 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import logging
|
|
import concurrent.futures
|
|
import os
|
|
import datetime
|
|
import argparse
|
|
import json
|
|
|
|
from cycles.utils.storage import Storage
|
|
from cycles.utils.system import SystemUtils
|
|
from cycles.backtest import Backtest
|
|
from cycles.Analysis.supertrend import Supertrends
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler("backtest.log"),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
def default_init_strategy(data: Backtest.Data) -> Backtest.Data:
|
|
supertrends = Supertrends(data.df, verbose=False)
|
|
|
|
supertrend_results_list = supertrends.calculate_supertrend_indicators()
|
|
trends = [st['results']['trend'] for st in supertrend_results_list]
|
|
trends_arr = np.stack(trends, axis=1)
|
|
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
|
|
trends_arr[:,0], 0)
|
|
|
|
data.strategies["meta_trend"] = meta_trend
|
|
|
|
return data
|
|
|
|
def default_entry_strategy(data, df_index):
|
|
return data.strategies["meta_trend"][df_index - 1] != 1 and data.strategies["meta_trend"][df_index] == 1
|
|
|
|
def stop_loss_strategy(data):
|
|
stop_price = data.entry_price * (1 - data.strategies["stop_loss_pct"])
|
|
|
|
# Ensure index is sorted and is a DatetimeIndex
|
|
min1_index = data.min1_df.index
|
|
|
|
# Find the first index >= entry_time
|
|
start_candidates = min1_index[min1_index >= data.entry_time]
|
|
data.current_trade_min1_start_idx = start_candidates[0]
|
|
|
|
# Find the last index <= current_date
|
|
end_candidates = min1_index[min1_index <= data.current_date]
|
|
if len(end_candidates) == 0:
|
|
print("Warning: no end candidate here. Need to be checked")
|
|
return False, None
|
|
data.current_min1_end_idx = end_candidates[-1]
|
|
|
|
min1_slice = data.min1_df.loc[data.current_trade_min1_start_idx:data.current_min1_end_idx]
|
|
|
|
# print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}")
|
|
# print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}")
|
|
|
|
if (min1_slice['low'] <= stop_price).any():
|
|
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
|
|
|
|
if stop_candle['open'] < stop_price:
|
|
sell_price = stop_candle['open']
|
|
else:
|
|
sell_price = stop_price
|
|
return True, sell_price
|
|
|
|
return False, None
|
|
|
|
def default_exit_strategy(data: Backtest.Data, df_index):
|
|
if data.strategies["meta_trend"][df_index - 1] != 1 and \
|
|
data.strategies["meta_trend"][df_index] == -1:
|
|
return "META_TREND_EXIT_SIGNAL", data, None
|
|
|
|
stop_loss_result, sell_price = stop_loss_strategy(data)
|
|
if stop_loss_result:
|
|
data.strategies["current_trade_min1_start_idx"] = \
|
|
data.min1_df.index[data.min1_df.index <= data.current_date][-1]
|
|
return "STOP_LOSS", data, sell_price
|
|
|
|
return None, data, None
|
|
|
|
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
|
|
"""Process the entire timeframe with all stop loss values (no monthly split)"""
|
|
df = df.copy().reset_index(drop=True)
|
|
|
|
results_rows = []
|
|
trade_rows = []
|
|
|
|
min1_df['timestamp'] = pd.to_datetime(min1_df.index) # need ?
|
|
|
|
for stop_loss_pct in stop_loss_pcts:
|
|
data = Backtest.Data(initial_usd, df, min1_df, default_init_strategy)
|
|
data.strategies["stop_loss_pct"] = stop_loss_pct
|
|
|
|
results = Backtest.run(
|
|
data,
|
|
default_entry_strategy,
|
|
default_exit_strategy,
|
|
debug
|
|
)
|
|
n_trades = results["n_trades"]
|
|
trades = results.get('trades', [])
|
|
wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']]
|
|
n_winning_trades = len(wins)
|
|
total_profit = sum(trade['profit_pct'] for trade in trades)
|
|
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
|
|
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
|
|
avg_trade = total_profit / n_trades if n_trades > 0 else 0
|
|
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
|
|
cumulative_profit = 0
|
|
max_drawdown = 0
|
|
peak = 0
|
|
|
|
for trade in trades:
|
|
cumulative_profit += trade['profit_pct']
|
|
|
|
if cumulative_profit > peak:
|
|
peak = cumulative_profit
|
|
drawdown = peak - cumulative_profit
|
|
|
|
if drawdown > max_drawdown:
|
|
max_drawdown = drawdown
|
|
|
|
final_usd = initial_usd
|
|
|
|
for trade in trades:
|
|
final_usd *= (1 + trade['profit_pct'])
|
|
|
|
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
|
|
|
|
row = {
|
|
"timeframe": rule_name,
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"n_trades": n_trades,
|
|
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
|
|
"win_rate": win_rate,
|
|
"max_drawdown": max_drawdown,
|
|
"avg_trade": avg_trade,
|
|
"total_profit": total_profit,
|
|
"total_loss": total_loss,
|
|
"profit_ratio": profit_ratio,
|
|
"initial_usd": initial_usd,
|
|
"final_usd": final_usd,
|
|
"total_fees_usd": total_fees_usd,
|
|
}
|
|
results_rows.append(row)
|
|
|
|
for trade in trades:
|
|
trade_rows.append({
|
|
"timeframe": rule_name,
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"entry_time": trade.get("entry_time"),
|
|
"exit_time": trade.get("exit_time"),
|
|
"entry_price": trade.get("entry"),
|
|
"exit_price": trade.get("exit"),
|
|
"profit_pct": trade.get("profit_pct"),
|
|
"type": trade.get("type"),
|
|
"fee_usd": trade.get("fee_usd"),
|
|
})
|
|
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
|
|
|
|
if debug:
|
|
for trade in trades:
|
|
print(trade)
|
|
|
|
return results_rows, trade_rows
|
|
|
|
def process(timeframe_info, debug=False):
|
|
"""Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
|
|
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
|
|
|
|
if rule == "1min":
|
|
df = data_1min.copy()
|
|
else:
|
|
df = data_1min.resample(rule).agg({
|
|
'open': 'first',
|
|
'high': 'max',
|
|
'low': 'min',
|
|
'close': 'last',
|
|
'volume': 'sum'
|
|
}).dropna()
|
|
df = df.reset_index()
|
|
results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug)
|
|
return results_rows, all_trade_rows
|
|
|
|
def aggregate_results(all_rows):
|
|
"""Aggregate results per stop_loss_pct and per rule (timeframe)"""
|
|
from collections import defaultdict
|
|
|
|
grouped = defaultdict(list)
|
|
for row in all_rows:
|
|
key = (row['timeframe'], row['stop_loss_pct'])
|
|
grouped[key].append(row)
|
|
|
|
summary_rows = []
|
|
for (rule, stop_loss_pct), rows in grouped.items():
|
|
n_months = len(rows)
|
|
total_trades = sum(r['n_trades'] for r in rows)
|
|
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
|
|
avg_win_rate = np.mean([r['win_rate'] for r in rows])
|
|
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
|
|
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
|
|
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
|
|
|
|
# Calculate final USD
|
|
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
|
|
total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows])
|
|
|
|
summary_rows.append({
|
|
"timeframe": rule,
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"n_trades": total_trades,
|
|
"n_stop_loss": total_stop_loss,
|
|
"win_rate": avg_win_rate,
|
|
"max_drawdown": avg_max_drawdown,
|
|
"avg_trade": avg_avg_trade,
|
|
"profit_ratio": avg_profit_ratio,
|
|
"initial_usd": initial_usd,
|
|
"final_usd": final_usd,
|
|
"total_fees_usd": total_fees_usd,
|
|
})
|
|
return summary_rows
|
|
|
|
def get_nearest_price(df, target_date):
|
|
if len(df) == 0:
|
|
return None, None
|
|
target_ts = pd.to_datetime(target_date)
|
|
nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0]
|
|
nearest_time = df.index[nearest_idx]
|
|
price = df.iloc[nearest_idx]['close']
|
|
return nearest_time, price
|
|
|
|
if __name__ == "__main__":
|
|
debug = False
|
|
|
|
parser = argparse.ArgumentParser(description="Run backtest with config file.")
|
|
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
|
|
args = parser.parse_args()
|
|
|
|
# Default values (from config.json)
|
|
default_config = {
|
|
"start_date": "2024-05-15",
|
|
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
|
|
"initial_usd": 10000,
|
|
"timeframes": ["15min"],
|
|
"stop_loss_pcts": [0.03],
|
|
}
|
|
|
|
if args.config:
|
|
with open(args.config, 'r') as f:
|
|
config = json.load(f)
|
|
elif not debug:
|
|
print("No config file provided. Please enter the following values (press Enter to use default):")
|
|
|
|
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
|
|
stop_date = input(f"Stop date [{default_config['stop_date']}]: ") or default_config['stop_date']
|
|
|
|
initial_usd_str = input(f"Initial USD [{default_config['initial_usd']}]: ") or str(default_config['initial_usd'])
|
|
initial_usd = float(initial_usd_str)
|
|
|
|
timeframes_str = input(f"Timeframes (comma separated) [{', '.join(default_config['timeframes'])}]: ") or ','.join(default_config['timeframes'])
|
|
timeframes = [tf.strip() for tf in timeframes_str.split(',') if tf.strip()]
|
|
|
|
stop_loss_pcts_str = input(f"Stop loss pcts (comma separated) [{', '.join(str(x) for x in default_config['stop_loss_pcts'])}]: ") or ','.join(str(x) for x in default_config['stop_loss_pcts'])
|
|
stop_loss_pcts = [float(x.strip()) for x in stop_loss_pcts_str.split(',') if x.strip()]
|
|
|
|
config = {
|
|
'start_date': start_date,
|
|
'stop_date': stop_date,
|
|
'initial_usd': initial_usd,
|
|
'timeframes': timeframes,
|
|
'stop_loss_pcts': stop_loss_pcts,
|
|
}
|
|
else:
|
|
config = default_config
|
|
|
|
start_date = config['start_date']
|
|
stop_date = config['stop_date']
|
|
initial_usd = config['initial_usd']
|
|
timeframes = config['timeframes']
|
|
stop_loss_pcts = config['stop_loss_pcts']
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
|
|
|
|
storage = Storage(logging=logging)
|
|
system_utils = SystemUtils(logging=logging)
|
|
|
|
data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date)
|
|
|
|
nearest_start_time, start_price = get_nearest_price(data_1min, start_date)
|
|
nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date)
|
|
|
|
metadata_lines = [
|
|
f"Start date\t{start_date}\tPrice\t{start_price}",
|
|
f"Stop date\t{stop_date}\tPrice\t{stop_price}",
|
|
f"Initial USD\t{initial_usd}"
|
|
]
|
|
|
|
tasks = [
|
|
(name, data_1min, stop_loss_pct, initial_usd)
|
|
for name in timeframes
|
|
for stop_loss_pct in stop_loss_pcts
|
|
]
|
|
|
|
workers = system_utils.get_optimal_workers()
|
|
|
|
if debug:
|
|
all_results_rows = []
|
|
all_trade_rows = []
|
|
for task in tasks:
|
|
results, trades = process(task, debug)
|
|
if results or trades:
|
|
all_results_rows.extend(results)
|
|
all_trade_rows.extend(trades)
|
|
else:
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(process, task, debug): task for task in tasks}
|
|
all_results_rows = []
|
|
all_trade_rows = []
|
|
|
|
for future in concurrent.futures.as_completed(futures):
|
|
results, trades = future.result()
|
|
|
|
if results or trades:
|
|
all_results_rows.extend(results)
|
|
all_trade_rows.extend(trades)
|
|
|
|
backtest_filename = os.path.join(f"{timestamp}_backtest.csv")
|
|
backtest_fieldnames = [
|
|
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
|
|
"max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd"
|
|
]
|
|
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)
|
|
|
|
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
|
|
storage.write_trades(all_trade_rows, trades_fieldnames)
|
|
|
|
|