Add BacktestCharts class for visualizing backtest results and update main.py for enhanced data processing

- Introduced BacktestCharts class in charts.py to plot profit ratio vs stop loss and average trade vs stop loss for different timeframes.
- Updated main.py to integrate new charting functionality and streamline data processing without monthly splits.
- Enhanced backtesting logic in TrendDetectorSimple to include transaction costs and improved stop loss handling using 1-minute data for accuracy.
- Added functionality to write results to individual CSV files for better organization and analysis.
This commit is contained in:
Simon Moisy 2025-05-17 13:07:40 +08:00
parent ec8b1a7cf2
commit 125d4f7d52
4 changed files with 488 additions and 134 deletions

86
charts.py Normal file
View File

@ -0,0 +1,86 @@
import os
import matplotlib.pyplot as plt
class BacktestCharts:
def __init__(self, charts_dir="charts"):
self.charts_dir = charts_dir
os.makedirs(self.charts_dir, exist_ok=True)
def plot_profit_ratio_vs_stop_loss(self, results, filename="profit_ratio_vs_stop_loss.png"):
"""
Plots profit ratio vs stop loss percentage for each timeframe.
Parameters:
- results: list of dicts, each with keys: 'timeframe', 'stop_loss_pct', 'profit_ratio'
- filename: output filename (will be saved in charts_dir)
"""
# Organize data by timeframe
from collections import defaultdict
data = defaultdict(lambda: {"stop_loss_pct": [], "profit_ratio": []})
for row in results:
tf = row["timeframe"]
data[tf]["stop_loss_pct"].append(row["stop_loss_pct"])
data[tf]["profit_ratio"].append(row["profit_ratio"])
plt.figure(figsize=(10, 6))
for tf, vals in data.items():
# Sort by stop_loss_pct for smooth lines
sorted_pairs = sorted(zip(vals["stop_loss_pct"], vals["profit_ratio"]))
stop_loss, profit_ratio = zip(*sorted_pairs)
plt.plot(
[s * 100 for s in stop_loss], # Convert to percent
profit_ratio,
marker="o",
label=tf
)
plt.xlabel("Stop Loss (%)")
plt.ylabel("Profit Ratio")
plt.title("Profit Ratio vs Stop Loss (%) per Timeframe")
plt.legend(title="Timeframe")
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
output_path = os.path.join(self.charts_dir, filename)
plt.savefig(output_path)
plt.close()
def plot_average_trade_vs_stop_loss(self, results, filename="average_trade_vs_stop_loss.png"):
"""
Plots average trade vs stop loss percentage for each timeframe.
Parameters:
- results: list of dicts, each with keys: 'timeframe', 'stop_loss_pct', 'average_trade'
- filename: output filename (will be saved in charts_dir)
"""
from collections import defaultdict
data = defaultdict(lambda: {"stop_loss_pct": [], "average_trade": []})
for row in results:
tf = row["timeframe"]
if "average_trade" not in row:
continue # Skip rows without average_trade
data[tf]["stop_loss_pct"].append(row["stop_loss_pct"])
data[tf]["average_trade"].append(row["average_trade"])
plt.figure(figsize=(10, 6))
for tf, vals in data.items():
# Sort by stop_loss_pct for smooth lines
sorted_pairs = sorted(zip(vals["stop_loss_pct"], vals["average_trade"]))
stop_loss, average_trade = zip(*sorted_pairs)
plt.plot(
[s * 100 for s in stop_loss], # Convert to percent
average_trade,
marker="o",
label=tf
)
plt.xlabel("Stop Loss (%)")
plt.ylabel("Average Trade")
plt.title("Average Trade vs Stop Loss (%) per Timeframe")
plt.legend(title="Timeframe")
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
output_path = os.path.join(self.charts_dir, filename)
plt.savefig(output_path)
plt.close()

220
main.py
View File

@ -8,6 +8,9 @@ import logging
import concurrent.futures import concurrent.futures
import os import os
import psutil import psutil
import datetime
from charts import BacktestCharts
from collections import Counter
# Set up logging # Set up logging
logging.basicConfig( logging.basicConfig(
@ -54,34 +57,28 @@ def load_data(file_path, start_date, stop_date):
return data.set_index('timestamp') return data.set_index('timestamp')
def process_month_timeframe(month_df, stop_loss_pcts, rule_name, initial_usd): def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
"""Process a single month for a given timeframe with all stop loss values""" """Process the entire timeframe with all stop loss values (no monthly split)"""
month_df = month_df.copy().reset_index(drop=True) df = df.copy().reset_index(drop=True)
trend_detector = TrendDetectorSimple(df, verbose=False)
# Only calculate trends once per month-timeframe combination
trend_detector = TrendDetectorSimple(month_df, verbose=False)
analysis_results = trend_detector.detect_trends()
# Calculate backtest for each stop_loss_pct
results_rows = [] results_rows = []
trade_rows = []
for stop_loss_pct in stop_loss_pcts: for stop_loss_pct in stop_loss_pcts:
results = trend_detector.backtest_meta_supertrend( results = trend_detector.backtest_meta_supertrend(
min1_df,
initial_usd=initial_usd, initial_usd=initial_usd,
stop_loss_pct=stop_loss_pct stop_loss_pct=stop_loss_pct,
debug=debug
) )
# Process results
n_trades = results["n_trades"] n_trades = results["n_trades"]
trades = results.get('trades', []) trades = results.get('trades', [])
n_winning_trades = sum(1 for trade in trades if trade['profit_pct'] > 0) n_winning_trades = sum(1 for trade in trades if trade['profit_pct'] > 0)
total_profit = sum(trade['profit_pct'] for trade in trades) total_profit = sum(trade['profit_pct'] for trade in trades)
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0) total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0 win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
avg_trade = total_profit / n_trades if n_trades > 0 else 0 avg_trade = total_profit / n_trades if n_trades > 0 else 0
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf') profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
# Calculate max drawdown
cumulative_profit = 0 cumulative_profit = 0
max_drawdown = 0 max_drawdown = 0
peak = 0 peak = 0
@ -92,28 +89,46 @@ def process_month_timeframe(month_df, stop_loss_pcts, rule_name, initial_usd):
drawdown = peak - cumulative_profit drawdown = peak - cumulative_profit
if drawdown > max_drawdown: if drawdown > max_drawdown:
max_drawdown = drawdown max_drawdown = drawdown
final_usd = initial_usd
# Create row for trade in trades:
final_usd *= (1 + trade['profit_pct'])
row = { row = {
"timeframe": rule_name, "timeframe": rule_name,
"month": str(month_df['timestamp'].iloc[0].to_period('M')),
"stop_loss_pct": stop_loss_pct, "stop_loss_pct": stop_loss_pct,
"n_trades": n_trades, "n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'), "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'),
"win_rate": win_rate, "win_rate": win_rate,
"max_drawdown": max_drawdown, "max_drawdown": max_drawdown,
"avg_trade": avg_trade, "avg_trade": avg_trade,
"profit_ratio": profit_ratio "profit_ratio": profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
} }
results_rows.append(row) results_rows.append(row)
for trade in trades:
return results_rows trade_rows.append({
"timeframe": rule_name,
"stop_loss_pct": stop_loss_pct,
"entry_time": trade.get("entry_time"),
"exit_time": trade.get("exit_time"),
"entry_price": trade.get("entry"),
"exit_price": trade.get("exit"),
"profit_pct": trade.get("profit_pct"),
"type": trade.get("type", ""),
})
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
if debug:
for trade in trades:
if trade['type'] == 'STOP':
print(trade)
for trade in trades:
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
print("Large loss trade:", trade)
return results_rows, trade_rows
def process_timeframe(timeframe_info): def process_timeframe(timeframe_info, debug=False):
"""Process an entire timeframe""" """Process an entire timeframe (no monthly split)"""
rule, rule_name, data_1min, stop_loss_pcts, initial_usd = timeframe_info rule, data_1min, stop_loss_pcts, initial_usd = timeframe_info
# Resample data if needed
if rule == "1T": if rule == "1T":
df = data_1min.copy() df = data_1min.copy()
else: else:
@ -124,29 +139,15 @@ def process_timeframe(timeframe_info):
'close': 'last', 'close': 'last',
'volume': 'sum' 'volume': 'sum'
}).dropna() }).dropna()
df = df.reset_index() df = df.reset_index()
df['month'] = df['timestamp'].dt.to_period('M')
results_rows = [] # --- Add this block to check alignment ---
print("1-min data range:", data_1min.index.min(), "to", data_1min.index.max())
# Process each month print(f"{rule} data range:", df['timestamp'].min(), "to", df['timestamp'].max())
for month, month_df in df.groupby('month'): # -----------------------------------------
if len(month_df) < 10: # Skip very small months
continue results_rows, all_trade_rows = process_timeframe_data(data_1min, df, stop_loss_pcts, rule, initial_usd, debug=debug)
return results_rows, all_trade_rows
logging.info(f"Processing: timeframe={rule_name}, month={month}")
try:
month_results = process_month_timeframe(month_df, stop_loss_pcts, rule_name, initial_usd)
results_rows.extend(month_results)
# Write intermediate results to avoid memory buildup
if len(results_rows) > 100:
return results_rows
except Exception as e:
logging.error(f"Error processing {rule_name}, month={month}: {str(e)}")
return results_rows
def write_results_chunk(filename, fieldnames, rows, write_header=False): def write_results_chunk(filename, fieldnames, rows, write_header=False):
"""Write a chunk of results to a CSV file""" """Write a chunk of results to a CSV file"""
@ -159,7 +160,9 @@ def write_results_chunk(filename, fieldnames, rows, write_header=False):
writer.writeheader() writer.writeheader()
for row in rows: for row in rows:
writer.writerow(row) # Only keep keys that are in fieldnames
filtered_row = {k: v for k, v in row.items() if k in fieldnames}
writer.writerow(filtered_row)
def aggregate_results(all_rows): def aggregate_results(all_rows):
"""Aggregate results per stop_loss_pct and per rule (timeframe)""" """Aggregate results per stop_loss_pct and per rule (timeframe)"""
@ -180,6 +183,9 @@ def aggregate_results(all_rows):
avg_avg_trade = np.mean([r['avg_trade'] for r in rows]) avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows]) avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
# Calculate final USD
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
summary_rows.append({ summary_rows.append({
"timeframe": rule, "timeframe": rule,
"stop_loss_pct": stop_loss_pct, "stop_loss_pct": stop_loss_pct,
@ -189,40 +195,63 @@ def aggregate_results(all_rows):
"max_drawdown": avg_max_drawdown, "max_drawdown": avg_max_drawdown,
"avg_trade": avg_avg_trade, "avg_trade": avg_avg_trade,
"profit_ratio": avg_profit_ratio, "profit_ratio": avg_profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
}) })
return summary_rows return summary_rows
def write_results_per_combination(results_rows, trade_rows, timestamp):
results_dir = "results"
os.makedirs(results_dir, exist_ok=True)
for row in results_rows:
timeframe = row["timeframe"]
stop_loss_pct = row["stop_loss_pct"]
filename = os.path.join(
results_dir,
f"{timestamp}_backtest_{timeframe}_{stop_loss_pct}.csv"
)
fieldnames = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd"]
write_results_chunk(filename, fieldnames, [row], write_header=not os.path.exists(filename))
for trade in trade_rows:
timeframe = trade["timeframe"]
stop_loss_pct = trade["stop_loss_pct"]
trades_filename = os.path.join(
results_dir,
f"{timestamp}_trades_{timeframe}_{stop_loss_pct}.csv"
)
trades_fieldnames = [
"timeframe", "stop_loss_pct", "entry_time", "exit_time",
"entry_price", "exit_price", "profit_pct", "type"
]
write_results_chunk(trades_filename, trades_fieldnames, [trade], write_header=not os.path.exists(trades_filename))
if __name__ == "__main__": if __name__ == "__main__":
# Configuration # Configuration
start_date = '2020-01-01' start_date = '2020-01-01'
stop_date = '2025-05-15' stop_date = '2025-05-15'
initial_usd = 10000 initial_usd = 10000
debug = False # Set to True to enable debug prints
timeframes = { # --- NEW: Prepare results folder and timestamp ---
# "1T": "1min", results_dir = "results"
"15T": "15min", os.makedirs(results_dir, exist_ok=True)
"1H": "1h", timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M")
"6H": "6h", # --- END NEW ---
"1D": "1D",
} # Replace the dictionary with a list of timeframe names
timeframes = ["15min", "1h", "6h", "1D"]
# timeframes = ["6h"]
stop_loss_pcts = [0.01, 0.02, 0.03, 0.05, 0.07, 0.10] stop_loss_pcts = [0.01, 0.02, 0.03, 0.05, 0.07, 0.10]
# stop_loss_pcts = [0.01]
# Load data once # Load data once
data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date) data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date)
logging.info(f"1min rows: {len(data_1min)}") logging.info(f"1min rows: {len(data_1min)}")
# Set up result file
filename = f"backtest_results_{start_date}_{stop_date}_multi_timeframe_stoploss.csv"
fieldnames = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio"]
# Initialize output file with header
write_results_chunk(filename, fieldnames, [], write_header=True)
# Prepare tasks # Prepare tasks
tasks = [ tasks = [
(rule, name, data_1min, stop_loss_pcts, initial_usd) (name, data_1min, stop_loss_pcts, initial_usd)
for rule, name in timeframes.items() for name in timeframes
] ]
# Determine optimal worker count # Determine optimal worker count
@ -231,23 +260,42 @@ if __name__ == "__main__":
# Process tasks with optimized concurrency # Process tasks with optimized concurrency
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process_timeframe, task): task[1] for task in tasks} futures = {executor.submit(process_timeframe, task, debug): task[1] for task in tasks}
all_results_rows = []
# Collect all results
all_results = []
for future in concurrent.futures.as_completed(futures): for future in concurrent.futures.as_completed(futures):
timeframe_name = futures[future] #try:
try: results, trades = future.result()
results = future.result() if results or trades:
if results: all_results_rows.extend(results)
# logging.info(f"Writing {len(results)} results for {timeframe_name}") write_results_per_combination(results, trades, timestamp)
# write_results_chunk(filename, fieldnames, results) # <-- REMOVE or COMMENT THIS OUT #except Exception as exc:
all_results.extend(results) # logging.error(f"generated an exception: {exc}")
except Exception as exc:
logging.error(f"{timeframe_name} generated an exception: {exc}")
# Write summary rows
summary_rows = aggregate_results(all_results)
write_results_chunk(filename, fieldnames, summary_rows, write_header=True) # Only write summary
logging.info(f"Results written to {filename}") # Write all results to a single CSV file
combined_filename = os.path.join(results_dir, f"{timestamp}_backtest_combined.csv")
combined_fieldnames = [
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
"max_drawdown", "avg_trade", "profit_ratio", "final_usd"
]
def format_row(row):
# Format percentages and floats as in your example
return {
"timeframe": row["timeframe"],
"stop_loss_pct": f"{row['stop_loss_pct']*100:.2f}%",
"n_trades": row["n_trades"],
"n_stop_loss": row["n_stop_loss"],
"win_rate": f"{row['win_rate']*100:.2f}%",
"max_drawdown": f"{row['max_drawdown']*100:.2f}%",
"avg_trade": f"{row['avg_trade']*100:.2f}%",
"profit_ratio": f"{row['profit_ratio']*100:.2f}%",
"final_usd": f"{row['final_usd']:.2f}",
}
with open(combined_filename, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=combined_fieldnames, delimiter='\t')
writer.writeheader()
for row in all_results_rows:
writer.writerow(format_row(row))
logging.info(f"Combined results written to {combined_filename}")

197
main_debug.py Normal file
View File

@ -0,0 +1,197 @@
import pandas as pd
import numpy as np
from trend_detector_simple import TrendDetectorSimple
import os
import datetime
import csv
def load_data(file_path, start_date, stop_date):
"""Load and filter data by date range."""
data = pd.read_csv(file_path)
data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s')
data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)]
data.columns = data.columns.str.lower()
return data.set_index('timestamp')
def process_month_timeframe(min1_df, month_df, stop_loss_pcts, rule_name, initial_usd):
"""Process a single month for a given timeframe with all stop loss values."""
month_df = month_df.copy().reset_index(drop=True)
trend_detector = TrendDetectorSimple(month_df, verbose=False)
analysis_results = trend_detector.detect_trends()
signal_df = analysis_results.get('signal_df')
results_rows = []
trade_rows = []
for stop_loss_pct in stop_loss_pcts:
results = trend_detector.backtest_meta_supertrend(
min1_df,
initial_usd=initial_usd,
stop_loss_pct=stop_loss_pct
)
trades = results.get('trades', [])
n_trades = results["n_trades"]
n_winning_trades = sum(1 for trade in trades if trade['profit_pct'] > 0)
total_profit = sum(trade['profit_pct'] for trade in trades)
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
avg_trade = total_profit / n_trades if n_trades > 0 else 0
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
# Max drawdown
cumulative_profit = 0
max_drawdown = 0
peak = 0
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
# Final USD
final_usd = initial_usd
for trade in trades:
final_usd *= (1 + trade['profit_pct'])
row = {
"timeframe": rule_name,
"month": str(month_df['timestamp'].iloc[0].to_period('M')),
"stop_loss_pct": stop_loss_pct,
"n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'),
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"profit_ratio": profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
}
results_rows.append(row)
for trade in trades:
trade_rows.append({
"timeframe": rule_name,
"month": str(month_df['timestamp'].iloc[0].to_period('M')),
"stop_loss_pct": stop_loss_pct,
"entry_time": trade.get("entry_time"),
"exit_time": trade.get("exit_time"),
"entry_price": trade.get("entry_price"),
"exit_price": trade.get("exit_price"),
"profit_pct": trade.get("profit_pct"),
"type": trade.get("type", ""),
})
return results_rows, trade_rows
def process_timeframe(rule, data_1min, stop_loss_pcts, initial_usd):
"""Process an entire timeframe sequentially."""
if rule == "1T":
df = data_1min.copy()
else:
df = data_1min.resample(rule).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
df = df.reset_index()
df['month'] = df['timestamp'].dt.to_period('M')
results_rows = []
all_trade_rows = []
for month, month_df in df.groupby('month'):
if len(month_df) < 10:
continue
month_results, month_trades = process_month_timeframe(data_1min, month_df, stop_loss_pcts, rule, initial_usd)
results_rows.extend(month_results)
all_trade_rows.extend(month_trades)
return results_rows, all_trade_rows
def aggregate_results(all_rows, initial_usd):
"""Aggregate results per stop_loss_pct and per rule (timeframe)."""
from collections import defaultdict
grouped = defaultdict(list)
for row in all_rows:
key = (row['timeframe'], row['stop_loss_pct'])
grouped[key].append(row)
summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items():
n_months = len(rows)
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows])
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
summary_rows.append({
"timeframe": rule,
"stop_loss_pct": stop_loss_pct,
"n_trades": total_trades,
"n_stop_loss": total_stop_loss,
"win_rate": avg_win_rate,
"max_drawdown": avg_max_drawdown,
"avg_trade": avg_avg_trade,
"profit_ratio": avg_profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
})
return summary_rows
def write_results(filename, fieldnames, rows):
"""Write results to a CSV file."""
with open(filename, 'w', newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)
if __name__ == "__main__":
# Config
start_date = '2020-01-01'
stop_date = '2025-05-15'
initial_usd = 10000
results_dir = "results"
os.makedirs(results_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M")
timeframes = ["6h", "1D"]
stop_loss_pcts = [0.01, 0.02, 0.03, 0.05, 0.07, 0.10]
data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date)
print(f"1min rows: {len(data_1min)}")
filename = os.path.join(
results_dir,
f"{timestamp}_backtest_results_{start_date}_{stop_date}_multi_timeframe_stoploss.csv"
)
fieldnames = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd"]
all_results = []
all_trades = []
for name in timeframes:
print(f"Processing timeframe: {name}")
results, trades = process_timeframe(name, data_1min, stop_loss_pcts, initial_usd)
all_results.extend(results)
all_trades.extend(trades)
summary_rows = aggregate_results(all_results, initial_usd)
# write_results(filename, fieldnames, summary_rows)
trades_filename = os.path.join(
results_dir,
f"{timestamp}_backtest_trades.csv"
)
trades_fieldnames = [
"timeframe", "month", "stop_loss_pct", "entry_time", "exit_time",
"entry_price", "exit_price", "profit_pct", "type"
]
# write_results(trades_filename, trades_fieldnames, all_trades)

View File

@ -4,10 +4,10 @@ import logging
from scipy.signal import find_peaks from scipy.signal import find_peaks
from matplotlib.patches import Rectangle from matplotlib.patches import Rectangle
from scipy import stats from scipy import stats
from scipy import stats
import concurrent.futures import concurrent.futures
from functools import partial from functools import partial
from functools import lru_cache from functools import lru_cache
import matplotlib.pyplot as plt
# Color configuration # Color configuration
# Plot colors # Plot colors
@ -107,9 +107,9 @@ def cached_supertrend_calculation(period, multiplier, data_tuple):
def calculate_supertrend_external(data, period, multiplier): def calculate_supertrend_external(data, period, multiplier):
# Convert DataFrame columns to hashable tuples # Convert DataFrame columns to hashable tuples
high_tuple = tuple(data['high'].values) high_tuple = tuple(data['high'])
low_tuple = tuple(data['low'].values) low_tuple = tuple(data['low'])
close_tuple = tuple(data['close'].values) close_tuple = tuple(data['close'])
# Call the cached function # Call the cached function
return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple)) return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple))
@ -350,9 +350,6 @@ class TrendDetectorSimple:
if not self.display: if not self.display:
return # Do nothing if display is False return # Do nothing if display is False
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
plt.style.use(self.plot_style) plt.style.use(self.plot_style)
if view == "both": if view == "both":
@ -641,34 +638,21 @@ class TrendDetectorSimple:
ax.plot([], [], color_down, linewidth=self.line_width, ax.plot([], [], color_down, linewidth=self.line_width,
label=f'ST (P:{period}, M:{multiplier}) Down') label=f'ST (P:{period}, M:{multiplier}) Down')
def backtest_meta_supertrend(self, initial_usd=10000, stop_loss_pct=0.05): def backtest_meta_supertrend(self, min1_df, initial_usd=10000, stop_loss_pct=0.05, transaction_cost=0.001, debug=False):
""" """
Backtest a simple strategy using the meta supertrend (all three supertrends agree). Backtest a simple strategy using the meta supertrend (all three supertrends agree).
Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss. Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss.
Parameters: Parameters:
- min1_df: pandas DataFrame, 1-minute timeframe data for more accurate stop loss checking (optional)
- initial_usd: float, starting USD amount - initial_usd: float, starting USD amount
- stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%) - stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%)
- transaction_cost: float, transaction cost as a fraction (e.g. 0.001 for 0.1%)
- debug: bool, whether to print debug info
""" """
import pandas as pd
df = self.data.copy().reset_index(drop=True) df = self.data.copy().reset_index(drop=True)
df['timestamp'] = pd.to_datetime(df['timestamp']) df['timestamp'] = pd.to_datetime(df['timestamp'])
if len(df) == 0:
self.logger.warning("No data available for backtest.")
return {
"initial_usd": initial_usd,
"final_usd": initial_usd,
"n_trades": 0,
"win_rate": 0,
"max_drawdown": 0,
"avg_trade": 0,
"trade_log": [],
"first_trade": {},
"last_trade": {},
"trades": [],
}
# Get meta supertrend (all three agree) # Get meta supertrend (all three agree)
supertrend_results_list = self._calculate_supertrend_indicators() supertrend_results_list = self._calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list] trends = [st['results']['trend'] for st in supertrend_results_list]
@ -676,12 +660,6 @@ class TrendDetectorSimple:
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0) trends_arr[:,0], 0)
# Precompute buy/sell signals
buy_signals = (meta_trend == 1) & (np.roll(meta_trend, 1) != 1)
sell_signals = (meta_trend == -1) & (np.roll(meta_trend, 1) != -1)
buy_signals[0] = False # Ignore first element due to np.roll
sell_signals[0] = False
position = 0 # 0 = no position, 1 = long position = 0 # 0 = no position, 1 = long
entry_price = 0 entry_price = 0
usd = initial_usd usd = initial_usd
@ -690,9 +668,13 @@ class TrendDetectorSimple:
max_balance = initial_usd max_balance = initial_usd
drawdowns = [] drawdowns = []
trades = [] trades = []
entry_time = None
current_trade_min1_start_idx = None
min1_df['timestamp'] = pd.to_datetime(min1_df.index)
for i in range(1, len(df)): for i in range(1, len(df)):
if i % 100 == 0: if i % 100 == 0 and debug:
self.logger.debug(f"Progress: {i}/{len(df)} rows processed.") self.logger.debug(f"Progress: {i}/{len(df)} rows processed.")
price_open = df['open'].iloc[i] price_open = df['open'].iloc[i]
@ -701,36 +683,72 @@ class TrendDetectorSimple:
price_close = df['close'].iloc[i] price_close = df['close'].iloc[i]
date = df['timestamp'].iloc[i] date = df['timestamp'].iloc[i]
mt = meta_trend[i] mt = meta_trend[i]
# Check stop loss if in position # Check stop loss if in position
if position == 1: if position == 1:
stop_price = entry_price * (1 - stop_loss_pct) stop_price = entry_price * (1 - stop_loss_pct)
if price_low <= stop_price:
# Stop loss triggered if current_trade_min1_start_idx is None:
sell_price = stop_price # First check after entry, find the entry point in 1-min data
usd = coin * sell_price current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0]
trade_log.append({'type': 'STOP', 'entry': entry_price, 'exit': sell_price, 'entry_time': entry_time, 'exit_time': date})
# Get the end index for current check
current_min1_end_idx = min1_df.index[min1_df.index <= date][-1]
# Check all 1-minute candles in between for stop loss
min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx]
if (min1_slice['low'] <= stop_price).any():
# Stop loss triggered, find the exact candle
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
# More realistic fill: if open < stop, fill at open, else at stop
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
if debug:
print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}")
usd = coin * sell_price * (1 - transaction_cost) # Apply transaction cost
trade_log.append({
'type': 'STOP',
'entry': entry_price,
'exit': sell_price,
'entry_time': entry_time,
'exit_time': stop_candle.name # Use index name instead of timestamp column
})
coin = 0 coin = 0
position = 0 position = 0
entry_price = 0 entry_price = 0
current_trade_min1_start_idx = None
continue continue
# Update the start index for next check
current_trade_min1_start_idx = current_min1_end_idx
# Entry logic # Entry logic
if position == 0 and mt == 1: if position == 0 and mt == 1:
# Buy at open # Buy at open, apply transaction cost
coin = usd / price_open coin = (usd * (1 - transaction_cost)) / price_open
entry_price = price_open entry_price = price_open
entry_time = date entry_time = date
usd = 0 usd = 0
position = 1 position = 1
current_trade_min1_start_idx = None # Will be set on first stop loss check
# Exit logic # Exit logic
elif position == 1 and mt == -1: elif position == 1 and mt == -1:
# Sell at open # Sell at open, apply transaction cost
usd = coin * price_open usd = coin * price_open * (1 - transaction_cost)
trade_log.append({'type': 'SELL', 'entry': entry_price, 'exit': price_open, 'entry_time': entry_time, 'exit_time': date}) trade_log.append({
'type': 'SELL',
'entry': entry_price,
'exit': price_open,
'entry_time': entry_time,
'exit_time': date
})
coin = 0 coin = 0
position = 0 position = 0
entry_price = 0 entry_price = 0
current_trade_min1_start_idx = None
# Track drawdown # Track drawdown
balance = usd if position == 0 else coin * price_close balance = usd if position == 0 else coin * price_close
@ -738,17 +756,22 @@ class TrendDetectorSimple:
max_balance = balance max_balance = balance
drawdown = (max_balance - balance) / max_balance drawdown = (max_balance - balance) / max_balance
drawdowns.append(drawdown) drawdowns.append(drawdown)
if i % 1000 == 0 or i == len(df) - 1:
self.logger.debug(f"Progress: {i}/{len(df)} rows processed.")
# If still in position at end, sell at last close # If still in position at end, sell at last close
if position == 1: if position == 1:
usd = coin * df['close'].iloc[-1] usd = coin * df['close'].iloc[-1] * (1 - transaction_cost) # Apply transaction cost
trade_log.append({'type': 'EOD', 'entry': entry_price, 'exit': df['close'].iloc[-1], 'entry_time': entry_time, 'exit_time': df['timestamp'].iloc[-1]}) trade_log.append({
'type': 'EOD',
'entry': entry_price,
'exit': df['close'].iloc[-1],
'entry_time': entry_time,
'exit_time': df['timestamp'].iloc[-1]
})
coin = 0 coin = 0
position = 0 position = 0
entry_price = 0 entry_price = 0
# Calculate statistics
final_balance = usd final_balance = usd
n_trades = len(trade_log) n_trades = len(trade_log)
wins = [1 for t in trade_log if t['exit'] > t['entry']] wins = [1 for t in trade_log if t['exit'] > t['entry']]