557 lines
24 KiB
Python
557 lines
24 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
from ta.volatility import AverageTrueRange
|
|
import time
|
|
import csv
|
|
import math
|
|
import os
|
|
|
|
|
|
def load_data(since, until, csv_file):
|
|
df = pd.read_csv(csv_file)
|
|
df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
|
|
df = df[(df['Timestamp'] >= pd.Timestamp(since)) & (df['Timestamp'] <= pd.Timestamp(until))]
|
|
return df
|
|
|
|
def aggregate_data(df, timeframe):
|
|
df = df.set_index('Timestamp')
|
|
df = df.resample(timeframe).agg({
|
|
'Open': 'first',
|
|
'High': 'max',
|
|
'Low': 'min',
|
|
'Close': 'last',
|
|
'Volume': 'sum'
|
|
})
|
|
df = df.reset_index()
|
|
return df
|
|
|
|
def calculate_okx_taker_maker_fee(amount, is_maker=False):
|
|
fee_rate = 0.0008 if is_maker else 0.0010
|
|
return amount * fee_rate
|
|
|
|
def calculate_supertrend(df, period, multiplier):
|
|
"""
|
|
Calculate the Supertrend indicator for a given period and multiplier.
|
|
Optionally displays progress during calculation.
|
|
Args:
|
|
df (pd.DataFrame): DataFrame with 'High', 'Low', 'Close' columns.
|
|
period (int): ATR period.
|
|
multiplier (float): Multiplier for ATR.
|
|
progress_step (int): Step interval for progress display.
|
|
show_progress (bool): Whether to print progress updates.
|
|
Returns:
|
|
pd.Series: Supertrend values.
|
|
"""
|
|
# Ensure we have enough data for ATR calculation
|
|
if len(df) < period + 1:
|
|
print(f"Warning: Not enough data for ATR period {period}. Need at least {period + 1} rows, got {len(df)}")
|
|
return pd.Series([np.nan] * len(df), index=df.index)
|
|
|
|
high = df['High'].values
|
|
low = df['Low'].values
|
|
close = df['Close'].values
|
|
|
|
# Calculate True Range first
|
|
tr = np.zeros_like(close)
|
|
for i in range(1, len(close)):
|
|
tr[i] = max(
|
|
high[i] - low[i], # Current high - current low
|
|
abs(high[i] - close[i-1]), # Current high - previous close
|
|
abs(low[i] - close[i-1]) # Current low - previous close
|
|
)
|
|
|
|
# Calculate ATR using simple moving average
|
|
atr = np.zeros_like(close)
|
|
atr[period] = np.mean(tr[1:period+1]) # First ATR value
|
|
for i in range(period+1, len(close)):
|
|
atr[i] = (atr[i-1] * (period-1) + tr[i]) / period # Exponential-like smoothing
|
|
|
|
# Fill initial values with the first valid ATR
|
|
atr[:period] = atr[period] if atr[period] > 0 else 0.001
|
|
|
|
hl2 = (high + low) / 2
|
|
upperband = hl2 + (multiplier * atr)
|
|
lowerband = hl2 - (multiplier * atr)
|
|
|
|
supertrend = np.full_like(close, np.nan)
|
|
in_uptrend = True
|
|
|
|
supertrend[0] = upperband[0]
|
|
total_steps = len(close) - 1
|
|
|
|
for i in range(1, len(close)):
|
|
if close[i] > upperband[i-1]:
|
|
in_uptrend = True
|
|
elif close[i] < lowerband[i-1]:
|
|
in_uptrend = False
|
|
# else, keep previous trend
|
|
|
|
if in_uptrend:
|
|
supertrend[i] = max(lowerband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else lowerband[i])
|
|
else:
|
|
supertrend[i] = min(upperband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else upperband[i])
|
|
|
|
return pd.Series(supertrend, index=df.index)
|
|
|
|
def add_supertrend_indicators(df):
|
|
"""
|
|
Adds Supertrend indicators to the dataframe for the specified (period, multiplier) pairs.
|
|
Args:
|
|
df (pd.DataFrame): DataFrame with columns 'High', 'Low', 'Close'.
|
|
Returns:
|
|
pd.DataFrame: DataFrame with new Supertrend columns added.
|
|
"""
|
|
supertrend_params = [(12, 3.0), (10, 1.0), (11, 2.0)]
|
|
for period, multiplier in supertrend_params:
|
|
try:
|
|
st_col = f'supertrend_{period}_{multiplier}'
|
|
df[st_col] = calculate_supertrend(df, period, multiplier)
|
|
except Exception as e:
|
|
print(f"Error calculating Supertrend {period}, {multiplier}: {e}")
|
|
df[f'supertrend_{period}_{multiplier}'] = np.nan
|
|
return df
|
|
|
|
def precompute_1min_slice_indices(df_aggregated, df_1min):
|
|
"""
|
|
Precompute start and end indices for each aggregated bar using searchsorted.
|
|
Returns a list of (start_idx, end_idx) tuples for fast iloc slicing.
|
|
"""
|
|
timestamps = df_aggregated['Timestamp'].values
|
|
one_min_timestamps = df_1min['Timestamp'].values
|
|
# Ensure both are sorted
|
|
sorted_1min = np.argsort(one_min_timestamps)
|
|
one_min_timestamps = one_min_timestamps[sorted_1min]
|
|
indices = []
|
|
prev_idx = 0
|
|
for i in range(1, len(timestamps)):
|
|
start, end = timestamps[i-1], timestamps[i]
|
|
# Find indices using searchsorted (right for start, right for end)
|
|
start_idx = np.searchsorted(one_min_timestamps, start, side='right')
|
|
end_idx = np.searchsorted(one_min_timestamps, end, side='right')
|
|
indices.append((start_idx, end_idx))
|
|
return indices, sorted_1min
|
|
|
|
def estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate=0.0003, impact_threshold_pct=0.10, impact_slope=0.0010):
|
|
"""
|
|
Estimate total slippage rate (decimal) using a hybrid model:
|
|
- Base slippage: fixed base_slippage_rate (e.g., 0.0003 = 3 bps)
|
|
- Extra slippage: if trade size (USD) > impact_threshold_pct * 1-min USD volume,
|
|
add impact_slope * (trade_size/threshold - 1)
|
|
|
|
Args:
|
|
trade_usd_size (float): Trade notional in USD before slippage.
|
|
minute_row (pd.Series|None): 1-min bar with 'Volume' and a price ('Close' preferred, fallback 'Open').
|
|
base_slippage_rate (float): Base slippage in decimal.
|
|
impact_threshold_pct (float): Threshold as fraction of 1-min volume (e.g., 0.10 = 10%).
|
|
impact_slope (float): Rate added per 1x over threshold (decimal).
|
|
|
|
Returns:
|
|
float: total slippage rate (>= base_slippage_rate).
|
|
"""
|
|
if minute_row is None:
|
|
return float(base_slippage_rate)
|
|
try:
|
|
minute_base_vol = float(minute_row.get('Volume', 0.0) or 0.0)
|
|
minute_price = float(minute_row.get('Close', minute_row.get('Open', 0.0)) or 0.0)
|
|
minute_quote_vol = minute_base_vol * minute_price
|
|
except Exception:
|
|
minute_quote_vol = 0.0
|
|
|
|
if minute_quote_vol <= 0 or impact_threshold_pct <= 0:
|
|
return float(base_slippage_rate)
|
|
|
|
threshold_quote = minute_quote_vol * impact_threshold_pct
|
|
if trade_usd_size <= threshold_quote:
|
|
return float(base_slippage_rate)
|
|
|
|
over_ratio = (trade_usd_size / threshold_quote) - 1.0
|
|
extra_slippage = max(0.0, impact_slope * over_ratio)
|
|
return float(base_slippage_rate + extra_slippage)
|
|
|
|
def backtest(timeframe, df_aggregated, df_1min, stop_loss_pct, progress_step=1000,
|
|
base_slippage_rate=0.0003, impact_threshold_pct=0.10, impact_slope=0.0010):
|
|
"""
|
|
Backtest trading strategy based on meta supertrend logic (all three supertrends agree).
|
|
Uses signal transitions and open prices for entry/exit to match original implementation.
|
|
"""
|
|
start_time = time.time()
|
|
required_st_cols = ["supertrend_12_3.0", "supertrend_10_1.0", "supertrend_11_2.0"]
|
|
for col in required_st_cols:
|
|
if col not in df_aggregated.columns:
|
|
raise ValueError(f"Missing required Supertrend column: {col}")
|
|
|
|
# Calculate trend directions for each supertrend (-1, 0, 1)
|
|
trends = []
|
|
for col in required_st_cols:
|
|
# Convert supertrend values to trend direction based on close price position
|
|
trend = np.where(df_aggregated['Close'] > df_aggregated[col], 1, -1)
|
|
trends.append(trend)
|
|
|
|
# Stack trends and calculate meta trend (all must agree)
|
|
trends_arr = np.stack(trends, axis=1)
|
|
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
|
|
trends_arr[:,0], 0)
|
|
|
|
meta_trend_signal = meta_trend #incorrect: should be lagging as it introduces lookahead bias.
|
|
# Next step: modify OHLCV predictor to not use supertrend as a feature or anyother feature
|
|
# that introduces lookahead bias and predict the next close price.
|
|
#
|
|
# Old code, not that efficient:
|
|
# Add signal lagging to avoid lookahead bias
|
|
# meta_trend_signal = np.roll(meta_trend, 1)
|
|
# meta_trend_signal[0] = 0 # No signal for first bar
|
|
|
|
# Precompute 1-min slice indices for each aggregated bar
|
|
slice_indices, sorted_1min = precompute_1min_slice_indices(df_aggregated, df_1min)
|
|
df_1min_sorted = df_1min.iloc[sorted_1min].reset_index(drop=True)
|
|
one_min_timestamps_sorted = df_1min_sorted['Timestamp'].values
|
|
|
|
in_position = False
|
|
init_usd = 1000
|
|
usd = init_usd
|
|
coin = 0
|
|
nb_stop_loss = 0
|
|
trade_log = []
|
|
equity_curve = []
|
|
trade_results = []
|
|
entry_price = None
|
|
entry_time = None
|
|
total_slippage_usd = 0.0
|
|
total_traded_usd = 0.0
|
|
|
|
total_steps = len(df_aggregated) - 1
|
|
for i in range(1, len(df_aggregated)):
|
|
open_price = df_aggregated['Open'][i] # Use open price for entry/exit
|
|
close_price = df_aggregated['Close'][i]
|
|
timestamp = df_aggregated['Timestamp'][i]
|
|
|
|
# Get previous and current meta trend signals
|
|
prev_mt = meta_trend_signal[i-1] if i > 0 else 0
|
|
curr_mt = meta_trend_signal[i]
|
|
|
|
# Track equity at each bar
|
|
equity = usd + coin * close_price
|
|
equity_curve.append((timestamp, equity))
|
|
|
|
# Check stop loss if in position
|
|
if in_position:
|
|
start_idx, end_idx = slice_indices[i-1]
|
|
df_1min_slice = df_1min_sorted.iloc[start_idx:end_idx]
|
|
stop_triggered = False
|
|
|
|
if not df_1min_slice.empty:
|
|
stop_loss_threshold = entry_price * (1 - stop_loss_pct)
|
|
below_stop = df_1min_slice['Low'] < stop_loss_threshold
|
|
|
|
if below_stop.any():
|
|
first_idx = below_stop.idxmax()
|
|
stop_row = df_1min_slice.loc[first_idx]
|
|
stop_triggered = True
|
|
in_position = False
|
|
|
|
# More realistic stop loss fill logic with slippage
|
|
if stop_row['Open'] < stop_loss_threshold:
|
|
base_exit_price = stop_row['Open']
|
|
else:
|
|
base_exit_price = stop_loss_threshold
|
|
trade_usd_size = float(coin * base_exit_price)
|
|
slip_rate = estimate_slippage_rate(trade_usd_size, stop_row, base_slippage_rate, impact_threshold_pct, impact_slope)
|
|
exit_price = base_exit_price * (1.0 - slip_rate)
|
|
|
|
exit_time = stop_row['Timestamp']
|
|
gross_usd = coin * exit_price
|
|
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
|
|
usd = gross_usd - fee
|
|
trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0
|
|
total_slippage_usd += trade_usd_size * slip_rate
|
|
total_traded_usd += trade_usd_size
|
|
trade_results.append(trade_pnl)
|
|
trade_log.append({
|
|
'type': 'stop_loss',
|
|
'time': exit_time,
|
|
'base_price': base_exit_price,
|
|
'effective_price': exit_price,
|
|
'slippage_rate': slip_rate,
|
|
'usd': usd,
|
|
'coin': 0,
|
|
'pnl': trade_pnl,
|
|
'fee': fee
|
|
})
|
|
coin = 0
|
|
nb_stop_loss += 1
|
|
entry_price = None
|
|
entry_time = None
|
|
|
|
if stop_triggered:
|
|
continue
|
|
|
|
# Entry condition: signal changes TO bullish (prev != 1 and curr == 1)
|
|
if not in_position and prev_mt != 1 and curr_mt == 1:
|
|
in_position = True
|
|
fee = calculate_okx_taker_maker_fee(usd, is_maker=False)
|
|
usd_after_fee = usd - fee
|
|
# Slippage on buy increases price
|
|
try:
|
|
ts64 = np.datetime64(timestamp)
|
|
idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left'))
|
|
minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None
|
|
except Exception:
|
|
minute_row = None
|
|
trade_usd_size = float(usd_after_fee)
|
|
slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope)
|
|
effective_entry_price = open_price * (1.0 + slip_rate)
|
|
coin = usd_after_fee / effective_entry_price
|
|
entry_price = effective_entry_price
|
|
entry_time = timestamp
|
|
usd = 0
|
|
total_slippage_usd += trade_usd_size * slip_rate
|
|
total_traded_usd += trade_usd_size
|
|
trade_log.append({
|
|
'type': 'buy',
|
|
'time': timestamp,
|
|
'base_price': open_price,
|
|
'effective_price': effective_entry_price,
|
|
'slippage_rate': slip_rate,
|
|
'usd': usd,
|
|
'coin': coin,
|
|
'fee': fee
|
|
})
|
|
|
|
# Exit condition: signal changes TO bearish (prev == 1 and curr == -1)
|
|
elif in_position and prev_mt == 1 and curr_mt == -1:
|
|
in_position = False
|
|
# Slippage on sell reduces price
|
|
try:
|
|
ts64 = np.datetime64(timestamp)
|
|
idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left'))
|
|
minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None
|
|
except Exception:
|
|
minute_row = None
|
|
base_exit_price = open_price
|
|
trade_usd_size = float(coin * base_exit_price)
|
|
slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope)
|
|
exit_price = base_exit_price * (1.0 - slip_rate)
|
|
exit_time = timestamp
|
|
gross_usd = coin * exit_price
|
|
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
|
|
usd = gross_usd - fee
|
|
trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0
|
|
total_slippage_usd += trade_usd_size * slip_rate
|
|
total_traded_usd += trade_usd_size
|
|
trade_results.append(trade_pnl)
|
|
trade_log.append({
|
|
'type': 'sell',
|
|
'time': exit_time,
|
|
'base_price': base_exit_price,
|
|
'effective_price': exit_price,
|
|
'slippage_rate': slip_rate,
|
|
'usd': usd,
|
|
'coin': 0,
|
|
'pnl': trade_pnl,
|
|
'fee': fee
|
|
})
|
|
coin = 0
|
|
entry_price = None
|
|
entry_time = None
|
|
|
|
if i % progress_step == 0 or i == total_steps:
|
|
percent = (i / total_steps) * 100
|
|
print(f"\rTimeframe: {timeframe},\tProgress: {percent:.1f}%\tCurrent equity: {equity:.2f}\033[K", end='', flush=True)
|
|
|
|
# Force close any open position at the end
|
|
if in_position:
|
|
final_open_price = df_aggregated['Open'].iloc[-1] # Use open price for consistency
|
|
final_timestamp = df_aggregated['Timestamp'].iloc[-1]
|
|
try:
|
|
ts64 = np.datetime64(final_timestamp)
|
|
idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left'))
|
|
minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None
|
|
except Exception:
|
|
minute_row = None
|
|
base_exit_price = final_open_price
|
|
trade_usd_size = float(coin * base_exit_price)
|
|
slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope)
|
|
final_effective_price = base_exit_price * (1.0 - slip_rate)
|
|
gross_usd = coin * final_effective_price
|
|
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
|
|
usd = gross_usd - fee
|
|
trade_pnl = (final_effective_price - entry_price) / entry_price if entry_price else 0
|
|
total_slippage_usd += trade_usd_size * slip_rate
|
|
total_traded_usd += trade_usd_size
|
|
trade_results.append(trade_pnl)
|
|
trade_log.append({
|
|
'type': 'forced_close',
|
|
'time': final_timestamp,
|
|
'base_price': base_exit_price,
|
|
'effective_price': final_effective_price,
|
|
'slippage_rate': slip_rate,
|
|
'usd': usd,
|
|
'coin': 0,
|
|
'pnl': trade_pnl,
|
|
'fee': fee
|
|
})
|
|
coin = 0
|
|
in_position = False
|
|
entry_price = None
|
|
|
|
print()
|
|
print(f"Timeframe: {timeframe},\tTotal profit: {usd - init_usd},\tNumber of stop losses: {nb_stop_loss}")
|
|
|
|
# --- Performance Metrics ---
|
|
equity_arr = np.array([e[1] for e in equity_curve])
|
|
# Handle edge cases for empty or invalid equity data
|
|
if len(equity_arr) == 0:
|
|
print("Warning: No equity data available")
|
|
return None
|
|
returns = np.diff(equity_arr) / equity_arr[:-1]
|
|
# Filter out infinite and NaN returns
|
|
returns = returns[np.isfinite(returns)]
|
|
total_return = (equity_arr[-1] - equity_arr[0]) / equity_arr[0] if equity_arr[0] != 0 else 0
|
|
running_max = np.maximum.accumulate(equity_arr)
|
|
if equity_arr[-1] <= 0.01:
|
|
max_drawdown = -1.0
|
|
else:
|
|
drawdowns = (equity_arr - running_max) / running_max
|
|
max_drawdown = drawdowns.min() if len(drawdowns) > 0 and np.isfinite(drawdowns).any() else 0
|
|
if len(returns) > 1 and np.std(returns) > 1e-9:
|
|
sharpe = np.mean(returns) / np.std(returns) * math.sqrt(252)
|
|
else:
|
|
sharpe = 0
|
|
wins = [1 for r in trade_results if r > 0]
|
|
win_rate = len(wins) / len(trade_results) if trade_results else 0
|
|
num_trades = len(trade_results)
|
|
|
|
print(f"Performance Metrics:")
|
|
print(f" Total Return: {total_return*100:.2f}%")
|
|
print(f" Max Drawdown: {max_drawdown*100:.2f}%")
|
|
print(f" Sharpe Ratio: {sharpe:.2f}")
|
|
print(f" Win Rate: {win_rate*100:.2f}%")
|
|
print(f" Number of Trades: {num_trades}")
|
|
print(f" Final Equity: ${equity_arr[-1]:.2f}")
|
|
print(f" Initial Equity: ${equity_arr[0]:.2f}")
|
|
|
|
# --- Save Trade Log ---
|
|
log_dir = "backtest_logs"
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
# Format stop_loss_pct for filename (e.g., 0.05 -> 0p05)
|
|
stop_loss_str = f"{stop_loss_pct:.2f}".replace('.', 'p')
|
|
log_path = os.path.join(log_dir, f"trade_log_{timeframe}_sl{stop_loss_str}.csv")
|
|
if trade_log:
|
|
all_keys = set()
|
|
for entry in trade_log:
|
|
all_keys.update(entry.keys())
|
|
all_keys = list(all_keys)
|
|
|
|
trade_log_filled = []
|
|
for entry in trade_log:
|
|
filled_entry = {k: entry.get(k, None) for k in all_keys}
|
|
trade_log_filled.append(filled_entry)
|
|
|
|
# Calculate total fees for this backtest
|
|
total_fees = sum(entry.get('fee', 0) for entry in trade_log)
|
|
|
|
# Write summary header row, then trade log header and rows
|
|
with open(log_path, 'w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
summary_header = [
|
|
'elapsed_time_sec', 'total_return', 'max_drawdown', 'sharpe_ratio',
|
|
'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees',
|
|
'total_slippage_usd', 'avg_slippage_bps'
|
|
]
|
|
summary_values = [
|
|
f"{time.time() - start_time:.2f}",
|
|
f"{total_return*100:.2f}%",
|
|
f"{max_drawdown*100:.2f}%",
|
|
f"{sharpe:.2f}",
|
|
f"{win_rate*100:.2f}%",
|
|
str(num_trades),
|
|
f"${equity_arr[-1]:.2f}",
|
|
f"${equity_arr[0]:.2f}",
|
|
str(nb_stop_loss),
|
|
f"${total_fees:.4f}",
|
|
f"${total_slippage_usd:.4f}",
|
|
f"{(total_slippage_usd / total_traded_usd * 10000.0) if total_traded_usd > 0 else 0:.2f}"
|
|
]
|
|
writer.writerow(summary_header)
|
|
writer.writerow(summary_values)
|
|
writer.writerow([]) # Blank row for separation
|
|
dict_writer = csv.DictWriter(f, fieldnames=all_keys)
|
|
dict_writer.writeheader()
|
|
dict_writer.writerows(trade_log_filled)
|
|
|
|
print(f"Trade log saved to {log_path}")
|
|
else:
|
|
print("No trades to log.")
|
|
|
|
# Return summary metrics (excluding elapsed time)
|
|
return {
|
|
'timeframe': timeframe,
|
|
'stop_loss': stop_loss_pct,
|
|
'total_return': total_return,
|
|
'max_drawdown': max_drawdown,
|
|
'sharpe_ratio': sharpe,
|
|
'win_rate': win_rate,
|
|
'num_trades': num_trades,
|
|
'final_equity': equity_arr[-1],
|
|
'initial_equity': equity_arr[0],
|
|
'num_stop_losses': nb_stop_loss,
|
|
'total_fees': total_fees if trade_log else 0,
|
|
'total_slippage_usd': total_slippage_usd,
|
|
'avg_slippage_bps': (total_slippage_usd / total_traded_usd * 10000.0) if total_traded_usd > 0 else 0.0
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
timeframes = ["5min", "15min", "30min", "1h", "4h", "1d", "2d"]
|
|
# timeframes = ["5min", "15min", "1h", "4h", "1d"]
|
|
# timeframes = ["30min"]
|
|
stoplosses = [0.1, 0.2, 0.3, 0.4, 0.5]
|
|
# Slippage configuration (OKX Spot): base in bps, plus volume-impact model
|
|
slippage_base_bps = 10 # 10 bps base slippage (realistic, conservative)
|
|
impact_threshold_pct = 0.10 # e.g., start impact beyond 10% of 1-min volume
|
|
impact_slope = 0.0010 # incremental slippage per 1x over threshold
|
|
|
|
# df_1min = load_data('2021-11-01', '2024-10-16', '../data/btcusd_1-min_data.csv')
|
|
df_1min = load_data('2021-11-01', '2025-08-19', '../data/btcusd_okx_1-min_data.csv')
|
|
|
|
# Prepare summary CSV
|
|
summary_csv_path = "backtest_summary.csv"
|
|
summary_header = [
|
|
'timeframe', 'stop_loss', 'total_return', 'max_drawdown', 'sharpe_ratio',
|
|
'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees',
|
|
'total_slippage_usd', 'avg_slippage_bps'
|
|
]
|
|
with open(summary_csv_path, 'w', newline='') as summary_file:
|
|
writer = csv.DictWriter(summary_file, fieldnames=summary_header)
|
|
writer.writeheader()
|
|
for timeframe in timeframes:
|
|
df_aggregated = aggregate_data(df_1min, timeframe)
|
|
df_aggregated = add_supertrend_indicators(df_aggregated)
|
|
for stop_loss_pct in stoplosses:
|
|
summary = backtest(
|
|
timeframe,
|
|
df_aggregated,
|
|
df_1min,
|
|
stop_loss_pct=stop_loss_pct,
|
|
base_slippage_rate=slippage_base_bps / 10000.0,
|
|
impact_threshold_pct=impact_threshold_pct,
|
|
impact_slope=impact_slope
|
|
)
|
|
if summary is not None:
|
|
# Format values for CSV (e.g., floats as rounded strings)
|
|
summary_row = {
|
|
'timeframe': summary['timeframe'],
|
|
'stop_loss': summary['stop_loss'],
|
|
'total_return': f"{summary['total_return']*100:.2f}%",
|
|
'max_drawdown': f"{summary['max_drawdown']*100:.2f}%",
|
|
'sharpe_ratio': f"{summary['sharpe_ratio']:.2f}",
|
|
'win_rate': f"{summary['win_rate']*100:.2f}%",
|
|
'num_trades': summary['num_trades'],
|
|
'final_equity': f"${summary['final_equity']:.2f}",
|
|
'initial_equity': f"${summary['initial_equity']:.2f}",
|
|
'num_stop_losses': summary['num_stop_losses'],
|
|
'total_fees': f"${summary['total_fees']:.4f}",
|
|
'total_slippage_usd': f"${summary['total_slippage_usd']:.4f}",
|
|
'avg_slippage_bps': f"{summary['avg_slippage_bps']:.2f}"
|
|
}
|
|
writer.writerow(summary_row)
|