- Introduced BacktestCharts class in charts.py to plot profit ratio vs stop loss and average trade vs stop loss for different timeframes. - Updated main.py to integrate new charting functionality and streamline data processing without monthly splits. - Enhanced backtesting logic in TrendDetectorSimple to include transaction costs and improved stop loss handling using 1-minute data for accuracy. - Added functionality to write results to individual CSV files for better organization and analysis.
198 lines
7.2 KiB
Python
198 lines
7.2 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
from trend_detector_simple import TrendDetectorSimple
|
|
import os
|
|
import datetime
|
|
import csv
|
|
|
|
def load_data(file_path, start_date, stop_date):
|
|
"""Load and filter data by date range."""
|
|
data = pd.read_csv(file_path)
|
|
data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s')
|
|
data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)]
|
|
data.columns = data.columns.str.lower()
|
|
return data.set_index('timestamp')
|
|
|
|
def process_month_timeframe(min1_df, month_df, stop_loss_pcts, rule_name, initial_usd):
|
|
"""Process a single month for a given timeframe with all stop loss values."""
|
|
month_df = month_df.copy().reset_index(drop=True)
|
|
trend_detector = TrendDetectorSimple(month_df, verbose=False)
|
|
analysis_results = trend_detector.detect_trends()
|
|
signal_df = analysis_results.get('signal_df')
|
|
|
|
results_rows = []
|
|
trade_rows = []
|
|
for stop_loss_pct in stop_loss_pcts:
|
|
results = trend_detector.backtest_meta_supertrend(
|
|
min1_df,
|
|
initial_usd=initial_usd,
|
|
stop_loss_pct=stop_loss_pct
|
|
)
|
|
trades = results.get('trades', [])
|
|
n_trades = results["n_trades"]
|
|
n_winning_trades = sum(1 for trade in trades if trade['profit_pct'] > 0)
|
|
total_profit = sum(trade['profit_pct'] for trade in trades)
|
|
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
|
|
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
|
|
avg_trade = total_profit / n_trades if n_trades > 0 else 0
|
|
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
|
|
|
|
# Max drawdown
|
|
cumulative_profit = 0
|
|
max_drawdown = 0
|
|
peak = 0
|
|
for trade in trades:
|
|
cumulative_profit += trade['profit_pct']
|
|
if cumulative_profit > peak:
|
|
peak = cumulative_profit
|
|
drawdown = peak - cumulative_profit
|
|
if drawdown > max_drawdown:
|
|
max_drawdown = drawdown
|
|
|
|
# Final USD
|
|
final_usd = initial_usd
|
|
for trade in trades:
|
|
final_usd *= (1 + trade['profit_pct'])
|
|
|
|
row = {
|
|
"timeframe": rule_name,
|
|
"month": str(month_df['timestamp'].iloc[0].to_period('M')),
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"n_trades": n_trades,
|
|
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'),
|
|
"win_rate": win_rate,
|
|
"max_drawdown": max_drawdown,
|
|
"avg_trade": avg_trade,
|
|
"profit_ratio": profit_ratio,
|
|
"initial_usd": initial_usd,
|
|
"final_usd": final_usd,
|
|
}
|
|
results_rows.append(row)
|
|
|
|
for trade in trades:
|
|
trade_rows.append({
|
|
"timeframe": rule_name,
|
|
"month": str(month_df['timestamp'].iloc[0].to_period('M')),
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"entry_time": trade.get("entry_time"),
|
|
"exit_time": trade.get("exit_time"),
|
|
"entry_price": trade.get("entry_price"),
|
|
"exit_price": trade.get("exit_price"),
|
|
"profit_pct": trade.get("profit_pct"),
|
|
"type": trade.get("type", ""),
|
|
})
|
|
|
|
return results_rows, trade_rows
|
|
|
|
def process_timeframe(rule, data_1min, stop_loss_pcts, initial_usd):
|
|
"""Process an entire timeframe sequentially."""
|
|
if rule == "1T":
|
|
df = data_1min.copy()
|
|
else:
|
|
df = data_1min.resample(rule).agg({
|
|
'open': 'first',
|
|
'high': 'max',
|
|
'low': 'min',
|
|
'close': 'last',
|
|
'volume': 'sum'
|
|
}).dropna()
|
|
|
|
df = df.reset_index()
|
|
df['month'] = df['timestamp'].dt.to_period('M')
|
|
results_rows = []
|
|
all_trade_rows = []
|
|
|
|
for month, month_df in df.groupby('month'):
|
|
if len(month_df) < 10:
|
|
continue
|
|
month_results, month_trades = process_month_timeframe(data_1min, month_df, stop_loss_pcts, rule, initial_usd)
|
|
results_rows.extend(month_results)
|
|
all_trade_rows.extend(month_trades)
|
|
|
|
return results_rows, all_trade_rows
|
|
|
|
def aggregate_results(all_rows, initial_usd):
|
|
"""Aggregate results per stop_loss_pct and per rule (timeframe)."""
|
|
from collections import defaultdict
|
|
grouped = defaultdict(list)
|
|
for row in all_rows:
|
|
key = (row['timeframe'], row['stop_loss_pct'])
|
|
grouped[key].append(row)
|
|
|
|
summary_rows = []
|
|
for (rule, stop_loss_pct), rows in grouped.items():
|
|
n_months = len(rows)
|
|
total_trades = sum(r['n_trades'] for r in rows)
|
|
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
|
|
avg_win_rate = np.mean([r['win_rate'] for r in rows])
|
|
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
|
|
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
|
|
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
|
|
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
|
|
|
|
summary_rows.append({
|
|
"timeframe": rule,
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"n_trades": total_trades,
|
|
"n_stop_loss": total_stop_loss,
|
|
"win_rate": avg_win_rate,
|
|
"max_drawdown": avg_max_drawdown,
|
|
"avg_trade": avg_avg_trade,
|
|
"profit_ratio": avg_profit_ratio,
|
|
"initial_usd": initial_usd,
|
|
"final_usd": final_usd,
|
|
})
|
|
return summary_rows
|
|
|
|
def write_results(filename, fieldnames, rows):
|
|
"""Write results to a CSV file."""
|
|
with open(filename, 'w', newline="") as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
for row in rows:
|
|
writer.writerow(row)
|
|
|
|
if __name__ == "__main__":
|
|
# Config
|
|
start_date = '2020-01-01'
|
|
stop_date = '2025-05-15'
|
|
initial_usd = 10000
|
|
|
|
results_dir = "results"
|
|
os.makedirs(results_dir, exist_ok=True)
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M")
|
|
|
|
timeframes = ["6h", "1D"]
|
|
stop_loss_pcts = [0.01, 0.02, 0.03, 0.05, 0.07, 0.10]
|
|
|
|
data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date)
|
|
print(f"1min rows: {len(data_1min)}")
|
|
|
|
filename = os.path.join(
|
|
results_dir,
|
|
f"{timestamp}_backtest_results_{start_date}_{stop_date}_multi_timeframe_stoploss.csv"
|
|
)
|
|
fieldnames = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd"]
|
|
|
|
all_results = []
|
|
all_trades = []
|
|
|
|
for name in timeframes:
|
|
print(f"Processing timeframe: {name}")
|
|
results, trades = process_timeframe(name, data_1min, stop_loss_pcts, initial_usd)
|
|
all_results.extend(results)
|
|
all_trades.extend(trades)
|
|
|
|
summary_rows = aggregate_results(all_results, initial_usd)
|
|
# write_results(filename, fieldnames, summary_rows)
|
|
|
|
trades_filename = os.path.join(
|
|
results_dir,
|
|
f"{timestamp}_backtest_trades.csv"
|
|
)
|
|
trades_fieldnames = [
|
|
"timeframe", "month", "stop_loss_pct", "entry_time", "exit_time",
|
|
"entry_price", "exit_price", "profit_pct", "type"
|
|
]
|
|
# write_results(trades_filename, trades_fieldnames, all_trades)
|