lowkey_backtest/main.py

446 lines
18 KiB
Python
Raw Normal View History

import pandas as pd
import numpy as np
from ta.volatility import AverageTrueRange
import time
import csv
import math
import os
def load_data(since, until):
df = pd.read_csv('../data/btcusd_1-min_data.csv')
df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
df = df[(df['Timestamp'] >= pd.Timestamp(since)) & (df['Timestamp'] <= pd.Timestamp(until))]
return df
def aggregate_data(df, timeframe):
df = df.set_index('Timestamp')
df = df.resample(timeframe).agg({
'Open': 'first',
'High': 'max',
'Low': 'min',
'Close': 'last',
'Volume': 'sum'
})
df = df.reset_index()
return df
def calculate_okx_taker_maker_fee(amount, is_maker=False):
fee_rate = 0.0008 if is_maker else 0.0010
return amount * fee_rate
def calculate_supertrend(df, period, multiplier):
"""
Calculate the Supertrend indicator for a given period and multiplier.
Optionally displays progress during calculation.
Args:
df (pd.DataFrame): DataFrame with 'High', 'Low', 'Close' columns.
period (int): ATR period.
multiplier (float): Multiplier for ATR.
progress_step (int): Step interval for progress display.
show_progress (bool): Whether to print progress updates.
Returns:
pd.Series: Supertrend values.
"""
# Ensure we have enough data for ATR calculation
if len(df) < period + 1:
print(f"Warning: Not enough data for ATR period {period}. Need at least {period + 1} rows, got {len(df)}")
return pd.Series([np.nan] * len(df), index=df.index)
high = df['High'].values
low = df['Low'].values
close = df['Close'].values
# Calculate True Range first
tr = np.zeros_like(close)
for i in range(1, len(close)):
tr[i] = max(
high[i] - low[i], # Current high - current low
abs(high[i] - close[i-1]), # Current high - previous close
abs(low[i] - close[i-1]) # Current low - previous close
)
# Calculate ATR using simple moving average
atr = np.zeros_like(close)
atr[period] = np.mean(tr[1:period+1]) # First ATR value
for i in range(period+1, len(close)):
atr[i] = (atr[i-1] * (period-1) + tr[i]) / period # Exponential-like smoothing
# Fill initial values with the first valid ATR
atr[:period] = atr[period] if atr[period] > 0 else 0.001
hl2 = (high + low) / 2
upperband = hl2 + (multiplier * atr)
lowerband = hl2 - (multiplier * atr)
supertrend = np.full_like(close, np.nan)
in_uptrend = True
supertrend[0] = upperband[0]
total_steps = len(close) - 1
for i in range(1, len(close)):
if close[i] > upperband[i-1]:
in_uptrend = True
elif close[i] < lowerband[i-1]:
in_uptrend = False
# else, keep previous trend
if in_uptrend:
supertrend[i] = max(lowerband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else lowerband[i])
else:
supertrend[i] = min(upperband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else upperband[i])
return pd.Series(supertrend, index=df.index)
def add_supertrend_indicators(df):
"""
Adds Supertrend indicators to the dataframe for the specified (period, multiplier) pairs.
Args:
df (pd.DataFrame): DataFrame with columns 'High', 'Low', 'Close'.
Returns:
pd.DataFrame: DataFrame with new Supertrend columns added.
"""
supertrend_params = [(12, 3.0), (10, 1.0), (11, 2.0)]
for period, multiplier in supertrend_params:
try:
st_col = f'supertrend_{period}_{multiplier}'
df[st_col] = calculate_supertrend(df, period, multiplier)
except Exception as e:
print(f"Error calculating Supertrend {period}, {multiplier}: {e}")
df[f'supertrend_{period}_{multiplier}'] = np.nan
return df
def precompute_1min_slice_indices(df_aggregated, df_1min):
"""
Precompute start and end indices for each aggregated bar using searchsorted.
Returns a list of (start_idx, end_idx) tuples for fast iloc slicing.
"""
timestamps = df_aggregated['Timestamp'].values
one_min_timestamps = df_1min['Timestamp'].values
# Ensure both are sorted
sorted_1min = np.argsort(one_min_timestamps)
one_min_timestamps = one_min_timestamps[sorted_1min]
indices = []
prev_idx = 0
for i in range(1, len(timestamps)):
start, end = timestamps[i-1], timestamps[i]
# Find indices using searchsorted (right for start, right for end)
start_idx = np.searchsorted(one_min_timestamps, start, side='right')
end_idx = np.searchsorted(one_min_timestamps, end, side='right')
indices.append((start_idx, end_idx))
return indices, sorted_1min
def backtest(timeframe, df_aggregated, df_1min, stop_loss_pct, progress_step=1000):
"""
Backtest trading strategy based on meta supertrend logic (all three supertrends agree).
Uses signal transitions and open prices for entry/exit to match original implementation.
"""
start_time = time.time()
required_st_cols = ["supertrend_12_3.0", "supertrend_10_1.0", "supertrend_11_2.0"]
for col in required_st_cols:
if col not in df_aggregated.columns:
raise ValueError(f"Missing required Supertrend column: {col}")
# Calculate trend directions for each supertrend (-1, 0, 1)
trends = []
for col in required_st_cols:
# Convert supertrend values to trend direction based on close price position
trend = np.where(df_aggregated['Close'] > df_aggregated[col], 1, -1)
trends.append(trend)
# Stack trends and calculate meta trend (all must agree)
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
meta_trend_signal = meta_trend #incorrect: should be lagging as it introduces lookahead bias.
# Next step: modify OHLCV predictor to not use supertrend as a feature or anyother feature
# that introduces lookahead bias and predict the next close price.
#
# Old code, not that efficient:
# Add signal lagging to avoid lookahead bias
# meta_trend_signal = np.roll(meta_trend, 1)
# meta_trend_signal[0] = 0 # No signal for first bar
# Precompute 1-min slice indices for each aggregated bar
slice_indices, sorted_1min = precompute_1min_slice_indices(df_aggregated, df_1min)
df_1min_sorted = df_1min.iloc[sorted_1min].reset_index(drop=True)
in_position = False
init_usd = 1000
usd = init_usd
coin = 0
nb_stop_loss = 0
trade_log = []
equity_curve = []
trade_results = []
entry_price = None
entry_time = None
total_steps = len(df_aggregated) - 1
for i in range(1, len(df_aggregated)):
open_price = df_aggregated['Open'][i] # Use open price for entry/exit
close_price = df_aggregated['Close'][i]
timestamp = df_aggregated['Timestamp'][i]
# Get previous and current meta trend signals
prev_mt = meta_trend_signal[i-1] if i > 0 else 0
curr_mt = meta_trend_signal[i]
# Track equity at each bar
equity = usd + coin * close_price
equity_curve.append((timestamp, equity))
# Check stop loss if in position
if in_position:
start_idx, end_idx = slice_indices[i-1]
df_1min_slice = df_1min_sorted.iloc[start_idx:end_idx]
stop_triggered = False
if not df_1min_slice.empty:
stop_loss_threshold = entry_price * (1 - stop_loss_pct)
below_stop = df_1min_slice['Low'] < stop_loss_threshold
if below_stop.any():
first_idx = below_stop.idxmax()
stop_row = df_1min_slice.loc[first_idx]
stop_triggered = True
in_position = False
# More realistic stop loss fill logic
if stop_row['Open'] < stop_loss_threshold:
exit_price = stop_row['Open']
else:
exit_price = stop_loss_threshold
exit_time = stop_row['Timestamp']
gross_usd = coin * exit_price
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
usd = gross_usd - fee
trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0
trade_results.append(trade_pnl)
trade_log.append({
'type': 'stop_loss',
'time': exit_time,
'price': exit_price,
'usd': usd,
'coin': 0,
'pnl': trade_pnl,
'fee': fee
})
coin = 0
nb_stop_loss += 1
entry_price = None
entry_time = None
if stop_triggered:
continue
# Entry condition: signal changes TO bullish (prev != 1 and curr == 1)
if not in_position and prev_mt != 1 and curr_mt == 1:
in_position = True
fee = calculate_okx_taker_maker_fee(usd, is_maker=False)
usd_after_fee = usd - fee
coin = usd_after_fee / open_price # Use open price
entry_price = open_price
entry_time = timestamp
usd = 0
trade_log.append({
'type': 'buy',
'time': timestamp,
'price': open_price,
'usd': usd,
'coin': coin,
'fee': fee
})
# Exit condition: signal changes TO bearish (prev == 1 and curr == -1)
elif in_position and prev_mt == 1 and curr_mt == -1:
in_position = False
exit_price = open_price # Use open price
exit_time = timestamp
gross_usd = coin * open_price
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
usd = gross_usd - fee
trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0
trade_results.append(trade_pnl)
trade_log.append({
'type': 'sell',
'time': exit_time,
'price': exit_price,
'usd': usd,
'coin': 0,
'pnl': trade_pnl,
'fee': fee
})
coin = 0
entry_price = None
entry_time = None
if i % progress_step == 0 or i == total_steps:
percent = (i / total_steps) * 100
print(f"\rTimeframe: {timeframe},\tProgress: {percent:.1f}%\tCurrent equity: {equity:.2f}\033[K", end='', flush=True)
# Force close any open position at the end
if in_position:
final_open_price = df_aggregated['Open'].iloc[-1] # Use open price for consistency
final_timestamp = df_aggregated['Timestamp'].iloc[-1]
gross_usd = coin * final_open_price
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
usd = gross_usd - fee
trade_pnl = (final_open_price - entry_price) / entry_price if entry_price else 0
trade_results.append(trade_pnl)
trade_log.append({
'type': 'forced_close',
'time': final_timestamp,
'price': final_open_price,
'usd': usd,
'coin': 0,
'pnl': trade_pnl,
'fee': fee
})
coin = 0
in_position = False
entry_price = None
print()
print(f"Timeframe: {timeframe},\tTotal profit: {usd - init_usd},\tNumber of stop losses: {nb_stop_loss}")
# --- Performance Metrics ---
equity_arr = np.array([e[1] for e in equity_curve])
# Handle edge cases for empty or invalid equity data
if len(equity_arr) == 0:
print("Warning: No equity data available")
return None
returns = np.diff(equity_arr) / equity_arr[:-1]
# Filter out infinite and NaN returns
returns = returns[np.isfinite(returns)]
total_return = (equity_arr[-1] - equity_arr[0]) / equity_arr[0] if equity_arr[0] != 0 else 0
running_max = np.maximum.accumulate(equity_arr)
if equity_arr[-1] <= 0.01:
max_drawdown = -1.0
else:
drawdowns = (equity_arr - running_max) / running_max
max_drawdown = drawdowns.min() if len(drawdowns) > 0 and np.isfinite(drawdowns).any() else 0
if len(returns) > 1 and np.std(returns) > 1e-9:
sharpe = np.mean(returns) / np.std(returns) * math.sqrt(252)
else:
sharpe = 0
wins = [1 for r in trade_results if r > 0]
win_rate = len(wins) / len(trade_results) if trade_results else 0
num_trades = len(trade_results)
print(f"Performance Metrics:")
print(f" Total Return: {total_return*100:.2f}%")
print(f" Max Drawdown: {max_drawdown*100:.2f}%")
print(f" Sharpe Ratio: {sharpe:.2f}")
print(f" Win Rate: {win_rate*100:.2f}%")
print(f" Number of Trades: {num_trades}")
print(f" Final Equity: ${equity_arr[-1]:.2f}")
print(f" Initial Equity: ${equity_arr[0]:.2f}")
# --- Save Trade Log ---
log_dir = "backtest_logs"
os.makedirs(log_dir, exist_ok=True)
# Format stop_loss_pct for filename (e.g., 0.05 -> 0p05)
stop_loss_str = f"{stop_loss_pct:.2f}".replace('.', 'p')
log_path = os.path.join(log_dir, f"trade_log_{timeframe}_sl{stop_loss_str}.csv")
if trade_log:
all_keys = set()
for entry in trade_log:
all_keys.update(entry.keys())
all_keys = list(all_keys)
trade_log_filled = []
for entry in trade_log:
filled_entry = {k: entry.get(k, None) for k in all_keys}
trade_log_filled.append(filled_entry)
# Calculate total fees for this backtest
total_fees = sum(entry.get('fee', 0) for entry in trade_log)
# Write summary header row, then trade log header and rows
with open(log_path, 'w', newline='') as f:
writer = csv.writer(f)
summary_header = [
'elapsed_time_sec', 'total_return', 'max_drawdown', 'sharpe_ratio',
'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees'
]
summary_values = [
f"{time.time() - start_time:.2f}",
f"{total_return*100:.2f}%",
f"{max_drawdown*100:.2f}%",
f"{sharpe:.2f}",
f"{win_rate*100:.2f}%",
str(num_trades),
f"${equity_arr[-1]:.2f}",
f"${equity_arr[0]:.2f}",
str(nb_stop_loss),
f"${total_fees:.4f}"
]
writer.writerow(summary_header)
writer.writerow(summary_values)
writer.writerow([]) # Blank row for separation
dict_writer = csv.DictWriter(f, fieldnames=all_keys)
dict_writer.writeheader()
dict_writer.writerows(trade_log_filled)
print(f"Trade log saved to {log_path}")
else:
print("No trades to log.")
# Return summary metrics (excluding elapsed time)
return {
'timeframe': timeframe,
'stop_loss': stop_loss_pct,
'total_return': total_return,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe,
'win_rate': win_rate,
'num_trades': num_trades,
'final_equity': equity_arr[-1],
'initial_equity': equity_arr[0],
'num_stop_losses': nb_stop_loss,
'total_fees': total_fees if trade_log else 0
}
if __name__ == "__main__":
timeframes = ["5min", "15min", "30min", "1h", "4h", "1d"]
# timeframes = ["5min", "15min", "1h", "4h", "1d"]
# timeframes = ["30min"]
stoplosses = [0.03, 0.05, 0.1]
df_1min = load_data('2021-11-01', '2024-10-16')
# Prepare summary CSV
summary_csv_path = "backtest_summary.csv"
summary_header = [
'timeframe', 'stop_loss', 'total_return', 'max_drawdown', 'sharpe_ratio',
'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees'
]
with open(summary_csv_path, 'w', newline='') as summary_file:
writer = csv.DictWriter(summary_file, fieldnames=summary_header)
writer.writeheader()
for timeframe in timeframes:
df_aggregated = aggregate_data(df_1min, timeframe)
df_aggregated = add_supertrend_indicators(df_aggregated)
for stop_loss_pct in stoplosses:
summary = backtest(timeframe, df_aggregated, df_1min, stop_loss_pct=stop_loss_pct)
if summary is not None:
# Format values for CSV (e.g., floats as rounded strings)
summary_row = {
'timeframe': summary['timeframe'],
'stop_loss': summary['stop_loss'],
'total_return': f"{summary['total_return']*100:.2f}%",
'max_drawdown': f"{summary['max_drawdown']*100:.2f}%",
'sharpe_ratio': f"{summary['sharpe_ratio']:.2f}",
'win_rate': f"{summary['win_rate']*100:.2f}%",
'num_trades': summary['num_trades'],
'final_equity': f"${summary['final_equity']:.2f}",
'initial_equity': f"${summary['initial_equity']:.2f}",
'num_stop_losses': summary['num_stop_losses'],
'total_fees': f"${summary['total_fees']:.4f}"
}
writer.writerow(summary_row)