lowkey_backtest/main.py

203 lines
7.6 KiB
Python

import pandas as pd
import numpy as np
from ta.volatility import AverageTrueRange
def load_data(since):
df = pd.read_csv('../data/btcusd_1-min_data.csv')
df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
df = df[df['Timestamp'] >= pd.Timestamp(since)]
return df
def aggregate_data(df, timeframe):
df = df.set_index('Timestamp')
df = df.resample(timeframe).agg({
'Open': 'first',
'High': 'max',
'Low': 'min',
'Close': 'last',
'Volume': 'sum'
})
df = df.reset_index()
return df
def calculate_okx_taker_maker_fee(amount, is_maker=False):
fee_rate = 0.0008 if is_maker else 0.0010
return amount * fee_rate
def calculate_supertrend(df, period, multiplier):
"""
Calculate the Supertrend indicator for a given period and multiplier.
Optionally displays progress during calculation.
Args:
df (pd.DataFrame): DataFrame with 'High', 'Low', 'Close' columns.
period (int): ATR period.
multiplier (float): Multiplier for ATR.
progress_step (int): Step interval for progress display.
show_progress (bool): Whether to print progress updates.
Returns:
pd.Series: Supertrend values.
"""
high = df['High'].values
low = df['Low'].values
close = df['Close'].values
atr = AverageTrueRange(df['High'], df['Low'], df['Close'], window=period).average_true_range().values
hl2 = (high + low) / 2
upperband = hl2 + (multiplier * atr)
lowerband = hl2 - (multiplier * atr)
supertrend = np.full_like(close, np.nan)
in_uptrend = True
supertrend[0] = upperband[0]
total_steps = len(close) - 1
for i in range(1, len(close)):
if close[i] > upperband[i-1]:
in_uptrend = True
elif close[i] < lowerband[i-1]:
in_uptrend = False
# else, keep previous trend
if in_uptrend:
supertrend[i] = max(lowerband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else lowerband[i])
else:
supertrend[i] = min(upperband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else upperband[i])
return pd.Series(supertrend, index=df.index)
def add_supertrend_indicators(df):
"""
Adds Supertrend indicators to the dataframe for the specified (period, multiplier) pairs.
Args:
df (pd.DataFrame): DataFrame with columns 'High', 'Low', 'Close'.
Returns:
pd.DataFrame: DataFrame with new Supertrend columns added.
"""
supertrend_params = [(12, 3.0), (10, 1.0), (11, 2.0)]
for period, multiplier in supertrend_params:
try:
st_col = f'supertrend_{period}_{multiplier}'
df[st_col] = calculate_supertrend(df, period, multiplier)
except Exception as e:
print(f"Error calculating Supertrend {period}, {multiplier}: {e}")
df[f'supertrend_{period}_{multiplier}'] = np.nan
return df
def precompute_1min_slice_indices(df_aggregated, df_1min):
"""
Precompute start and end indices for each aggregated bar using searchsorted.
Returns a list of (start_idx, end_idx) tuples for fast iloc slicing.
"""
timestamps = df_aggregated['Timestamp'].values
one_min_timestamps = df_1min['Timestamp'].values
# Ensure both are sorted
sorted_1min = np.argsort(one_min_timestamps)
one_min_timestamps = one_min_timestamps[sorted_1min]
indices = []
prev_idx = 0
for i in range(1, len(timestamps)):
start, end = timestamps[i-1], timestamps[i]
# Find indices using searchsorted (right for start, right for end)
start_idx = np.searchsorted(one_min_timestamps, start, side='right')
end_idx = np.searchsorted(one_min_timestamps, end, side='right')
indices.append((start_idx, end_idx))
return indices, sorted_1min
def backtest(df_aggregated, df_1min, stop_loss_pct, progress_step=1000):
"""
Backtest trading strategy based on Supertrend indicators with trailing stop loss.
Buys when all three Supertrend columns are positive (>0),
sells when any is negative (<0), or when trailing stop loss is hit.
Args:
df_aggregated (pd.DataFrame): Aggregated OHLCV data with Supertrend columns.
df_1min (pd.DataFrame): 1-minute OHLCV data.
stop_loss_pct (float): Trailing stop loss percentage (e.g., 0.02 for 2%).
progress_step (int): Step interval for progress display.
"""
required_st_cols = ["supertrend_12_3.0", "supertrend_10_1.0", "supertrend_11_2.0"]
for col in required_st_cols:
if col not in df_aggregated.columns:
raise ValueError(f"Missing required Supertrend column: {col}")
# Precompute 1-min slice indices for each aggregated bar
slice_indices, sorted_1min = precompute_1min_slice_indices(df_aggregated, df_1min)
df_1min_sorted = df_1min.iloc[sorted_1min].reset_index(drop=True)
in_position = False
init_usd = 1000
usd = init_usd
coin = 0
highest_price = None
nb_stop_loss = 0
total_steps = len(df_aggregated) - 1
for i in range(1, len(df_aggregated)):
st_vals = [df_aggregated[col][i] for col in required_st_cols]
all_positive = all(val > 0 for val in st_vals)
any_negative = any(val < 0 for val in st_vals)
close_price = df_aggregated['Close'][i]
# Buy condition: all Supertrend values positive
if not in_position and all_positive:
in_position = True
coin = usd / close_price
usd = 0
highest_price = close_price
# If in position, update highest price and check stop loss on 1-min data
elif in_position:
# Update highest price if new high on aggregated bar
if close_price > highest_price:
highest_price = close_price
# Use precomputed indices for this bar
start_idx, end_idx = slice_indices[i-1]
df_1min_slice = df_1min_sorted.iloc[start_idx:end_idx]
stop_triggered = False
for _, row in df_1min_slice.iterrows():
# Update highest price if new high in 1-min bar
if row['Close'] > highest_price:
highest_price = row['Close']
# Trailing stop loss condition on 1-min close
if row['Close'] < highest_price * (1 - stop_loss_pct):
in_position = False
usd = coin * row['Close']
coin = 0
# print(f"Stop loss triggered at {row['Close']:.2f} on {row['Timestamp']}")
nb_stop_loss += 1
highest_price = None
stop_triggered = True
break
# If stop loss was triggered, skip further checks for this bar
if stop_triggered:
continue
# Sell condition: any Supertrend value negative (on aggregated bar close)
if any_negative:
in_position = False
usd = coin * close_price
coin = 0
highest_price = None
if i % progress_step == 0 or i == total_steps:
percent = (i / total_steps) * 100
print(f"Progress: {percent:.1f}% ({i}/{total_steps})")
print(f"Total profit: {usd - init_usd}")
print(f"Number of stop losses: {nb_stop_loss}")
if __name__ == "__main__":
df_1min = load_data('2020-01-01')
df_aggregated = aggregate_data(df_1min, '5min')
# Add Supertrend indicators
df_aggregated = add_supertrend_indicators(df_aggregated)
df_aggregated['log_return'] = np.log(df_aggregated['Close'] / df_aggregated['Close'].shift(1))
# Example: 2% trailing stop loss
backtest(df_aggregated, df_1min, stop_loss_pct=0.02)