Cycles/main.py
Simon Moisy e5c2988d71 Refactor Backtest class and update strategy functions for improved modularity
- Refactored the Backtest class to encapsulate state and behavior, enhancing clarity and maintainability.
- Updated strategy functions to accept the Backtest instance, streamlining data access and manipulation.
- Introduced a new plotting method in BacktestCharts for visualizing close prices with trend indicators.
- Improved handling of meta_trend data to ensure proper visualization and trend representation.
- Adjusted main execution logic to support the new Backtest structure and enhanced debugging capabilities.
2025-05-22 20:02:14 +08:00

340 lines
13 KiB
Python

import pandas as pd
import numpy as np
import logging
import concurrent.futures
import os
import datetime
import argparse
import json
from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils
from cycles.backtest import Backtest
from cycles.Analysis.supertrend import Supertrends
from cycles.charts import BacktestCharts
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("backtest.log"),
logging.StreamHandler()
]
)
def default_init_strategy(backtester: Backtest):
supertrends = Supertrends(backtester.df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
backtester.strategies["meta_trend"] = meta_trend
def default_entry_strategy(backtester: Backtest, df_index):
return backtester.strategies["meta_trend"][df_index - 1] != 1 and backtester.strategies["meta_trend"][df_index] == 1
def stop_loss_strategy(backtester: Backtest):
stop_price = backtester.entry_price * (1 - backtester.strategies["stop_loss_pct"])
min1_index = backtester.min1_df.index
start_candidates = min1_index[min1_index >= backtester.entry_time]
backtester.current_trade_min1_start_idx = start_candidates[0]
end_candidates = min1_index[min1_index <= backtester.current_date]
if len(end_candidates) == 0:
print("Warning: no end candidate here. Need to be checked")
return False, None
backtester.current_min1_end_idx = end_candidates[-1]
min1_slice = backtester.min1_df.loc[backtester.current_trade_min1_start_idx:backtester.current_min1_end_idx]
# print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}")
# print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}")
if (min1_slice['low'] <= stop_price).any():
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
return True, sell_price
return False, None
def default_exit_strategy(backtester: Backtest, df_index):
if backtester.strategies["meta_trend"][df_index - 1] != 1 and \
backtester.strategies["meta_trend"][df_index] == -1:
return "META_TREND_EXIT_SIGNAL", None
stop_loss_result, sell_price = stop_loss_strategy(backtester)
if stop_loss_result:
backtester.strategies["current_trade_min1_start_idx"] = \
backtester.min1_df.index[backtester.min1_df.index <= backtester.current_date][-1]
return "STOP_LOSS", sell_price
return None, None
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
"""Process the entire timeframe with all stop loss values (no monthly split)"""
df = df.copy().reset_index(drop=True)
results_rows = []
trade_rows = []
min1_df['timestamp'] = pd.to_datetime(min1_df.index) # need ?
for stop_loss_pct in stop_loss_pcts:
backtester = Backtest(initial_usd, df, min1_df, default_init_strategy)
backtester.strategies["stop_loss_pct"] = stop_loss_pct
results = backtester.run(
default_entry_strategy,
default_exit_strategy,
debug
)
n_trades = results["n_trades"]
trades = results.get('trades', [])
wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']]
n_winning_trades = len(wins)
total_profit = sum(trade['profit_pct'] for trade in trades)
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
avg_trade = total_profit / n_trades if n_trades > 0 else 0
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
cumulative_profit = 0
max_drawdown = 0
peak = 0
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
final_usd = initial_usd
for trade in trades:
final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
row = {
"timeframe": rule_name,
"stop_loss_pct": stop_loss_pct,
"n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"total_profit": total_profit,
"total_loss": total_loss,
"profit_ratio": profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
}
results_rows.append(row)
for trade in trades:
trade_rows.append({
"timeframe": rule_name,
"stop_loss_pct": stop_loss_pct,
"entry_time": trade.get("entry_time"),
"exit_time": trade.get("exit_time"),
"entry_price": trade.get("entry"),
"exit_price": trade.get("exit"),
"profit_pct": trade.get("profit_pct"),
"type": trade.get("type"),
"fee_usd": trade.get("fee_usd"),
})
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
if debug:
# Plot after each backtest run
try:
meta_trend = backtester.strategies["meta_trend"]
BacktestCharts.plot(df, meta_trend)
except Exception as e:
print(f"Plotting failed: {e}")
return results_rows, trade_rows
def process(timeframe_info, debug=False):
"""Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
if rule == "1min":
df = data_1min.copy()
else:
df = data_1min.resample(rule).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
df = df.reset_index()
results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug)
return results_rows, all_trade_rows
def aggregate_results(all_rows):
"""Aggregate results per stop_loss_pct and per rule (timeframe)"""
from collections import defaultdict
grouped = defaultdict(list)
for row in all_rows:
key = (row['timeframe'], row['stop_loss_pct'])
grouped[key].append(row)
summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items():
n_months = len(rows)
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows])
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
# Calculate final USD
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows])
summary_rows.append({
"timeframe": rule,
"stop_loss_pct": stop_loss_pct,
"n_trades": total_trades,
"n_stop_loss": total_stop_loss,
"win_rate": avg_win_rate,
"max_drawdown": avg_max_drawdown,
"avg_trade": avg_avg_trade,
"profit_ratio": avg_profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
})
return summary_rows
def get_nearest_price(df, target_date):
if len(df) == 0:
return None, None
target_ts = pd.to_datetime(target_date)
nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0]
nearest_time = df.index[nearest_idx]
price = df.iloc[nearest_idx]['close']
return nearest_time, price
if __name__ == "__main__":
debug = True
parser = argparse.ArgumentParser(description="Run backtest with config file.")
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
args = parser.parse_args()
# Default values (from config.json)
default_config = {
"start_date": "2025-05-01",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"initial_usd": 10000,
"timeframes": ["15min"],
"stop_loss_pcts": [0.03],
}
if args.config:
with open(args.config, 'r') as f:
config = json.load(f)
elif not debug:
print("No config file provided. Please enter the following values (press Enter to use default):")
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
stop_date = input(f"Stop date [{default_config['stop_date']}]: ") or default_config['stop_date']
initial_usd_str = input(f"Initial USD [{default_config['initial_usd']}]: ") or str(default_config['initial_usd'])
initial_usd = float(initial_usd_str)
timeframes_str = input(f"Timeframes (comma separated) [{', '.join(default_config['timeframes'])}]: ") or ','.join(default_config['timeframes'])
timeframes = [tf.strip() for tf in timeframes_str.split(',') if tf.strip()]
stop_loss_pcts_str = input(f"Stop loss pcts (comma separated) [{', '.join(str(x) for x in default_config['stop_loss_pcts'])}]: ") or ','.join(str(x) for x in default_config['stop_loss_pcts'])
stop_loss_pcts = [float(x.strip()) for x in stop_loss_pcts_str.split(',') if x.strip()]
config = {
'start_date': start_date,
'stop_date': stop_date,
'initial_usd': initial_usd,
'timeframes': timeframes,
'stop_loss_pcts': stop_loss_pcts,
}
else:
config = default_config
start_date = config['start_date']
stop_date = config['stop_date']
initial_usd = config['initial_usd']
timeframes = config['timeframes']
stop_loss_pcts = config['stop_loss_pcts']
timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
storage = Storage(logging=logging)
system_utils = SystemUtils(logging=logging)
data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date)
nearest_start_time, start_price = get_nearest_price(data_1min, start_date)
nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date)
metadata_lines = [
f"Start date\t{start_date}\tPrice\t{start_price}",
f"Stop date\t{stop_date}\tPrice\t{stop_price}",
f"Initial USD\t{initial_usd}"
]
tasks = [
(name, data_1min, stop_loss_pct, initial_usd)
for name in timeframes
for stop_loss_pct in stop_loss_pcts
]
if debug:
all_results_rows = []
all_trade_rows = []
for task in tasks:
results, trades = process(task, debug)
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
else:
workers = system_utils.get_optimal_workers()
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process, task, debug): task for task in tasks}
all_results_rows = []
all_trade_rows = []
for future in concurrent.futures.as_completed(futures):
results, trades = future.result()
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
backtest_filename = os.path.join(f"{timestamp}_backtest.csv")
backtest_fieldnames = [
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
"max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd"
]
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
storage.write_trades(all_trade_rows, trades_fieldnames)