lowkey_backtest/main.py

557 lines
24 KiB
Python
Raw Normal View History

import pandas as pd
import numpy as np
from ta.volatility import AverageTrueRange
import time
import csv
import math
import os
def load_data(since, until, csv_file):
df = pd.read_csv(csv_file)
df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
df = df[(df['Timestamp'] >= pd.Timestamp(since)) & (df['Timestamp'] <= pd.Timestamp(until))]
return df
def aggregate_data(df, timeframe):
df = df.set_index('Timestamp')
df = df.resample(timeframe).agg({
'Open': 'first',
'High': 'max',
'Low': 'min',
'Close': 'last',
'Volume': 'sum'
})
df = df.reset_index()
return df
def calculate_okx_taker_maker_fee(amount, is_maker=False):
fee_rate = 0.0008 if is_maker else 0.0010
return amount * fee_rate
def calculate_supertrend(df, period, multiplier):
"""
Calculate the Supertrend indicator for a given period and multiplier.
Optionally displays progress during calculation.
Args:
df (pd.DataFrame): DataFrame with 'High', 'Low', 'Close' columns.
period (int): ATR period.
multiplier (float): Multiplier for ATR.
progress_step (int): Step interval for progress display.
show_progress (bool): Whether to print progress updates.
Returns:
pd.Series: Supertrend values.
"""
# Ensure we have enough data for ATR calculation
if len(df) < period + 1:
print(f"Warning: Not enough data for ATR period {period}. Need at least {period + 1} rows, got {len(df)}")
return pd.Series([np.nan] * len(df), index=df.index)
high = df['High'].values
low = df['Low'].values
close = df['Close'].values
# Calculate True Range first
tr = np.zeros_like(close)
for i in range(1, len(close)):
tr[i] = max(
high[i] - low[i], # Current high - current low
abs(high[i] - close[i-1]), # Current high - previous close
abs(low[i] - close[i-1]) # Current low - previous close
)
# Calculate ATR using simple moving average
atr = np.zeros_like(close)
atr[period] = np.mean(tr[1:period+1]) # First ATR value
for i in range(period+1, len(close)):
atr[i] = (atr[i-1] * (period-1) + tr[i]) / period # Exponential-like smoothing
# Fill initial values with the first valid ATR
atr[:period] = atr[period] if atr[period] > 0 else 0.001
hl2 = (high + low) / 2
upperband = hl2 + (multiplier * atr)
lowerband = hl2 - (multiplier * atr)
supertrend = np.full_like(close, np.nan)
in_uptrend = True
supertrend[0] = upperband[0]
total_steps = len(close) - 1
for i in range(1, len(close)):
if close[i] > upperband[i-1]:
in_uptrend = True
elif close[i] < lowerband[i-1]:
in_uptrend = False
# else, keep previous trend
if in_uptrend:
supertrend[i] = max(lowerband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else lowerband[i])
else:
supertrend[i] = min(upperband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else upperband[i])
return pd.Series(supertrend, index=df.index)
def add_supertrend_indicators(df):
"""
Adds Supertrend indicators to the dataframe for the specified (period, multiplier) pairs.
Args:
df (pd.DataFrame): DataFrame with columns 'High', 'Low', 'Close'.
Returns:
pd.DataFrame: DataFrame with new Supertrend columns added.
"""
supertrend_params = [(12, 3.0), (10, 1.0), (11, 2.0)]
for period, multiplier in supertrend_params:
try:
st_col = f'supertrend_{period}_{multiplier}'
df[st_col] = calculate_supertrend(df, period, multiplier)
except Exception as e:
print(f"Error calculating Supertrend {period}, {multiplier}: {e}")
df[f'supertrend_{period}_{multiplier}'] = np.nan
return df
def precompute_1min_slice_indices(df_aggregated, df_1min):
"""
Precompute start and end indices for each aggregated bar using searchsorted.
Returns a list of (start_idx, end_idx) tuples for fast iloc slicing.
"""
timestamps = df_aggregated['Timestamp'].values
one_min_timestamps = df_1min['Timestamp'].values
# Ensure both are sorted
sorted_1min = np.argsort(one_min_timestamps)
one_min_timestamps = one_min_timestamps[sorted_1min]
indices = []
prev_idx = 0
for i in range(1, len(timestamps)):
start, end = timestamps[i-1], timestamps[i]
# Find indices using searchsorted (right for start, right for end)
start_idx = np.searchsorted(one_min_timestamps, start, side='right')
end_idx = np.searchsorted(one_min_timestamps, end, side='right')
indices.append((start_idx, end_idx))
return indices, sorted_1min
def estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate=0.0003, impact_threshold_pct=0.10, impact_slope=0.0010):
"""
Estimate total slippage rate (decimal) using a hybrid model:
- Base slippage: fixed base_slippage_rate (e.g., 0.0003 = 3 bps)
- Extra slippage: if trade size (USD) > impact_threshold_pct * 1-min USD volume,
add impact_slope * (trade_size/threshold - 1)
Args:
trade_usd_size (float): Trade notional in USD before slippage.
minute_row (pd.Series|None): 1-min bar with 'Volume' and a price ('Close' preferred, fallback 'Open').
base_slippage_rate (float): Base slippage in decimal.
impact_threshold_pct (float): Threshold as fraction of 1-min volume (e.g., 0.10 = 10%).
impact_slope (float): Rate added per 1x over threshold (decimal).
Returns:
float: total slippage rate (>= base_slippage_rate).
"""
if minute_row is None:
return float(base_slippage_rate)
try:
minute_base_vol = float(minute_row.get('Volume', 0.0) or 0.0)
minute_price = float(minute_row.get('Close', minute_row.get('Open', 0.0)) or 0.0)
minute_quote_vol = minute_base_vol * minute_price
except Exception:
minute_quote_vol = 0.0
if minute_quote_vol <= 0 or impact_threshold_pct <= 0:
return float(base_slippage_rate)
threshold_quote = minute_quote_vol * impact_threshold_pct
if trade_usd_size <= threshold_quote:
return float(base_slippage_rate)
over_ratio = (trade_usd_size / threshold_quote) - 1.0
extra_slippage = max(0.0, impact_slope * over_ratio)
return float(base_slippage_rate + extra_slippage)
def backtest(timeframe, df_aggregated, df_1min, stop_loss_pct, progress_step=1000,
base_slippage_rate=0.0003, impact_threshold_pct=0.10, impact_slope=0.0010):
"""
Backtest trading strategy based on meta supertrend logic (all three supertrends agree).
Uses signal transitions and open prices for entry/exit to match original implementation.
"""
start_time = time.time()
required_st_cols = ["supertrend_12_3.0", "supertrend_10_1.0", "supertrend_11_2.0"]
for col in required_st_cols:
if col not in df_aggregated.columns:
raise ValueError(f"Missing required Supertrend column: {col}")
# Calculate trend directions for each supertrend (-1, 0, 1)
trends = []
for col in required_st_cols:
# Convert supertrend values to trend direction based on close price position
trend = np.where(df_aggregated['Close'] > df_aggregated[col], 1, -1)
trends.append(trend)
# Stack trends and calculate meta trend (all must agree)
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
meta_trend_signal = meta_trend #incorrect: should be lagging as it introduces lookahead bias.
# Next step: modify OHLCV predictor to not use supertrend as a feature or anyother feature
# that introduces lookahead bias and predict the next close price.
#
# Old code, not that efficient:
# Add signal lagging to avoid lookahead bias
# meta_trend_signal = np.roll(meta_trend, 1)
# meta_trend_signal[0] = 0 # No signal for first bar
# Precompute 1-min slice indices for each aggregated bar
slice_indices, sorted_1min = precompute_1min_slice_indices(df_aggregated, df_1min)
df_1min_sorted = df_1min.iloc[sorted_1min].reset_index(drop=True)
one_min_timestamps_sorted = df_1min_sorted['Timestamp'].values
in_position = False
init_usd = 1000
usd = init_usd
coin = 0
nb_stop_loss = 0
trade_log = []
equity_curve = []
trade_results = []
entry_price = None
entry_time = None
total_slippage_usd = 0.0
total_traded_usd = 0.0
total_steps = len(df_aggregated) - 1
for i in range(1, len(df_aggregated)):
open_price = df_aggregated['Open'][i] # Use open price for entry/exit
close_price = df_aggregated['Close'][i]
timestamp = df_aggregated['Timestamp'][i]
# Get previous and current meta trend signals
prev_mt = meta_trend_signal[i-1] if i > 0 else 0
curr_mt = meta_trend_signal[i]
# Track equity at each bar
equity = usd + coin * close_price
equity_curve.append((timestamp, equity))
# Check stop loss if in position
if in_position:
start_idx, end_idx = slice_indices[i-1]
df_1min_slice = df_1min_sorted.iloc[start_idx:end_idx]
stop_triggered = False
if not df_1min_slice.empty:
stop_loss_threshold = entry_price * (1 - stop_loss_pct)
below_stop = df_1min_slice['Low'] < stop_loss_threshold
if below_stop.any():
first_idx = below_stop.idxmax()
stop_row = df_1min_slice.loc[first_idx]
stop_triggered = True
in_position = False
# More realistic stop loss fill logic with slippage
if stop_row['Open'] < stop_loss_threshold:
base_exit_price = stop_row['Open']
else:
base_exit_price = stop_loss_threshold
trade_usd_size = float(coin * base_exit_price)
slip_rate = estimate_slippage_rate(trade_usd_size, stop_row, base_slippage_rate, impact_threshold_pct, impact_slope)
exit_price = base_exit_price * (1.0 - slip_rate)
exit_time = stop_row['Timestamp']
gross_usd = coin * exit_price
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
usd = gross_usd - fee
trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0
total_slippage_usd += trade_usd_size * slip_rate
total_traded_usd += trade_usd_size
trade_results.append(trade_pnl)
trade_log.append({
'type': 'stop_loss',
'time': exit_time,
'base_price': base_exit_price,
'effective_price': exit_price,
'slippage_rate': slip_rate,
'usd': usd,
'coin': 0,
'pnl': trade_pnl,
'fee': fee
})
coin = 0
nb_stop_loss += 1
entry_price = None
entry_time = None
if stop_triggered:
continue
# Entry condition: signal changes TO bullish (prev != 1 and curr == 1)
if not in_position and prev_mt != 1 and curr_mt == 1:
in_position = True
fee = calculate_okx_taker_maker_fee(usd, is_maker=False)
usd_after_fee = usd - fee
# Slippage on buy increases price
try:
ts64 = np.datetime64(timestamp)
idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left'))
minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None
except Exception:
minute_row = None
trade_usd_size = float(usd_after_fee)
slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope)
effective_entry_price = open_price * (1.0 + slip_rate)
coin = usd_after_fee / effective_entry_price
entry_price = effective_entry_price
entry_time = timestamp
usd = 0
total_slippage_usd += trade_usd_size * slip_rate
total_traded_usd += trade_usd_size
trade_log.append({
'type': 'buy',
'time': timestamp,
'base_price': open_price,
'effective_price': effective_entry_price,
'slippage_rate': slip_rate,
'usd': usd,
'coin': coin,
'fee': fee
})
# Exit condition: signal changes TO bearish (prev == 1 and curr == -1)
elif in_position and prev_mt == 1 and curr_mt == -1:
in_position = False
# Slippage on sell reduces price
try:
ts64 = np.datetime64(timestamp)
idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left'))
minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None
except Exception:
minute_row = None
base_exit_price = open_price
trade_usd_size = float(coin * base_exit_price)
slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope)
exit_price = base_exit_price * (1.0 - slip_rate)
exit_time = timestamp
gross_usd = coin * exit_price
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
usd = gross_usd - fee
trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0
total_slippage_usd += trade_usd_size * slip_rate
total_traded_usd += trade_usd_size
trade_results.append(trade_pnl)
trade_log.append({
'type': 'sell',
'time': exit_time,
'base_price': base_exit_price,
'effective_price': exit_price,
'slippage_rate': slip_rate,
'usd': usd,
'coin': 0,
'pnl': trade_pnl,
'fee': fee
})
coin = 0
entry_price = None
entry_time = None
if i % progress_step == 0 or i == total_steps:
percent = (i / total_steps) * 100
print(f"\rTimeframe: {timeframe},\tProgress: {percent:.1f}%\tCurrent equity: {equity:.2f}\033[K", end='', flush=True)
# Force close any open position at the end
if in_position:
final_open_price = df_aggregated['Open'].iloc[-1] # Use open price for consistency
final_timestamp = df_aggregated['Timestamp'].iloc[-1]
try:
ts64 = np.datetime64(final_timestamp)
idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left'))
minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None
except Exception:
minute_row = None
base_exit_price = final_open_price
trade_usd_size = float(coin * base_exit_price)
slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope)
final_effective_price = base_exit_price * (1.0 - slip_rate)
gross_usd = coin * final_effective_price
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
usd = gross_usd - fee
trade_pnl = (final_effective_price - entry_price) / entry_price if entry_price else 0
total_slippage_usd += trade_usd_size * slip_rate
total_traded_usd += trade_usd_size
trade_results.append(trade_pnl)
trade_log.append({
'type': 'forced_close',
'time': final_timestamp,
'base_price': base_exit_price,
'effective_price': final_effective_price,
'slippage_rate': slip_rate,
'usd': usd,
'coin': 0,
'pnl': trade_pnl,
'fee': fee
})
coin = 0
in_position = False
entry_price = None
print()
print(f"Timeframe: {timeframe},\tTotal profit: {usd - init_usd},\tNumber of stop losses: {nb_stop_loss}")
# --- Performance Metrics ---
equity_arr = np.array([e[1] for e in equity_curve])
# Handle edge cases for empty or invalid equity data
if len(equity_arr) == 0:
print("Warning: No equity data available")
return None
returns = np.diff(equity_arr) / equity_arr[:-1]
# Filter out infinite and NaN returns
returns = returns[np.isfinite(returns)]
total_return = (equity_arr[-1] - equity_arr[0]) / equity_arr[0] if equity_arr[0] != 0 else 0
running_max = np.maximum.accumulate(equity_arr)
if equity_arr[-1] <= 0.01:
max_drawdown = -1.0
else:
drawdowns = (equity_arr - running_max) / running_max
max_drawdown = drawdowns.min() if len(drawdowns) > 0 and np.isfinite(drawdowns).any() else 0
if len(returns) > 1 and np.std(returns) > 1e-9:
sharpe = np.mean(returns) / np.std(returns) * math.sqrt(252)
else:
sharpe = 0
wins = [1 for r in trade_results if r > 0]
win_rate = len(wins) / len(trade_results) if trade_results else 0
num_trades = len(trade_results)
print(f"Performance Metrics:")
print(f" Total Return: {total_return*100:.2f}%")
print(f" Max Drawdown: {max_drawdown*100:.2f}%")
print(f" Sharpe Ratio: {sharpe:.2f}")
print(f" Win Rate: {win_rate*100:.2f}%")
print(f" Number of Trades: {num_trades}")
print(f" Final Equity: ${equity_arr[-1]:.2f}")
print(f" Initial Equity: ${equity_arr[0]:.2f}")
# --- Save Trade Log ---
log_dir = "backtest_logs"
os.makedirs(log_dir, exist_ok=True)
# Format stop_loss_pct for filename (e.g., 0.05 -> 0p05)
stop_loss_str = f"{stop_loss_pct:.2f}".replace('.', 'p')
log_path = os.path.join(log_dir, f"trade_log_{timeframe}_sl{stop_loss_str}.csv")
if trade_log:
all_keys = set()
for entry in trade_log:
all_keys.update(entry.keys())
all_keys = list(all_keys)
trade_log_filled = []
for entry in trade_log:
filled_entry = {k: entry.get(k, None) for k in all_keys}
trade_log_filled.append(filled_entry)
# Calculate total fees for this backtest
total_fees = sum(entry.get('fee', 0) for entry in trade_log)
# Write summary header row, then trade log header and rows
with open(log_path, 'w', newline='') as f:
writer = csv.writer(f)
summary_header = [
'elapsed_time_sec', 'total_return', 'max_drawdown', 'sharpe_ratio',
'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees',
'total_slippage_usd', 'avg_slippage_bps'
]
summary_values = [
f"{time.time() - start_time:.2f}",
f"{total_return*100:.2f}%",
f"{max_drawdown*100:.2f}%",
f"{sharpe:.2f}",
f"{win_rate*100:.2f}%",
str(num_trades),
f"${equity_arr[-1]:.2f}",
f"${equity_arr[0]:.2f}",
str(nb_stop_loss),
f"${total_fees:.4f}",
f"${total_slippage_usd:.4f}",
f"{(total_slippage_usd / total_traded_usd * 10000.0) if total_traded_usd > 0 else 0:.2f}"
]
writer.writerow(summary_header)
writer.writerow(summary_values)
writer.writerow([]) # Blank row for separation
dict_writer = csv.DictWriter(f, fieldnames=all_keys)
dict_writer.writeheader()
dict_writer.writerows(trade_log_filled)
print(f"Trade log saved to {log_path}")
else:
print("No trades to log.")
# Return summary metrics (excluding elapsed time)
return {
'timeframe': timeframe,
'stop_loss': stop_loss_pct,
'total_return': total_return,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe,
'win_rate': win_rate,
'num_trades': num_trades,
'final_equity': equity_arr[-1],
'initial_equity': equity_arr[0],
'num_stop_losses': nb_stop_loss,
'total_fees': total_fees if trade_log else 0,
'total_slippage_usd': total_slippage_usd,
'avg_slippage_bps': (total_slippage_usd / total_traded_usd * 10000.0) if total_traded_usd > 0 else 0.0
}
if __name__ == "__main__":
timeframes = ["5min", "15min", "30min", "1h", "4h", "1d", "2d"]
# timeframes = ["5min", "15min", "1h", "4h", "1d"]
# timeframes = ["30min"]
stoplosses = [0.1, 0.2, 0.3, 0.4, 0.5]
# Slippage configuration (OKX Spot): base in bps, plus volume-impact model
slippage_base_bps = 10 # 10 bps base slippage (realistic, conservative)
impact_threshold_pct = 0.10 # e.g., start impact beyond 10% of 1-min volume
impact_slope = 0.0010 # incremental slippage per 1x over threshold
# df_1min = load_data('2021-11-01', '2024-10-16', '../data/btcusd_1-min_data.csv')
df_1min = load_data('2021-11-01', '2025-08-19', '../data/btcusd_okx_1-min_data.csv')
# Prepare summary CSV
summary_csv_path = "backtest_summary.csv"
summary_header = [
'timeframe', 'stop_loss', 'total_return', 'max_drawdown', 'sharpe_ratio',
'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees',
'total_slippage_usd', 'avg_slippage_bps'
]
with open(summary_csv_path, 'w', newline='') as summary_file:
writer = csv.DictWriter(summary_file, fieldnames=summary_header)
writer.writeheader()
for timeframe in timeframes:
df_aggregated = aggregate_data(df_1min, timeframe)
df_aggregated = add_supertrend_indicators(df_aggregated)
for stop_loss_pct in stoplosses:
summary = backtest(
timeframe,
df_aggregated,
df_1min,
stop_loss_pct=stop_loss_pct,
base_slippage_rate=slippage_base_bps / 10000.0,
impact_threshold_pct=impact_threshold_pct,
impact_slope=impact_slope
)
if summary is not None:
# Format values for CSV (e.g., floats as rounded strings)
summary_row = {
'timeframe': summary['timeframe'],
'stop_loss': summary['stop_loss'],
'total_return': f"{summary['total_return']*100:.2f}%",
'max_drawdown': f"{summary['max_drawdown']*100:.2f}%",
'sharpe_ratio': f"{summary['sharpe_ratio']:.2f}",
'win_rate': f"{summary['win_rate']*100:.2f}%",
'num_trades': summary['num_trades'],
'final_equity': f"${summary['final_equity']:.2f}",
'initial_equity': f"${summary['initial_equity']:.2f}",
'num_stop_losses': summary['num_stop_losses'],
'total_fees': f"${summary['total_fees']:.4f}",
'total_slippage_usd': f"${summary['total_slippage_usd']:.4f}",
'avg_slippage_bps': f"{summary['avg_slippage_bps']:.2f}"
}
writer.writerow(summary_row)