Add initial implementation of backtesting framework with CLI interface. Introduce core modules for data loading, trade management, performance metrics, and logging. Include Supertrend indicator calculations and slippage estimation. Update .gitignore to exclude logs and CSV files.

This commit is contained in:
Simon Moisy 2026-01-09 19:53:01 +08:00
parent a25499e016
commit c4aa965a98
15 changed files with 424 additions and 568 deletions

2
.gitignore vendored
View File

@ -168,3 +168,5 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
./logs/
*.csv

30
.vscode/launch.json vendored
View File

@ -1,16 +1,22 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: main.py",
"type": "debugpy",
"request": "launch",
"program": "main.py",
"console": "integratedTerminal"
}
{
"name": "Python Debugger: cli.py",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/cli.py",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}",
"args": [
"2025-03-01", "2025-09-22",
"--timeframes-minutes", "60", "240", "480",
"--stop-loss", "0.02", "0.05",
"--exit-on-bearish-flip",
"--csv", "../data/btcusd_1-min_data.csv"
],
"env": { "PYTHONPATH": "${workspaceFolder}" }
}
]
}
}

13
__init__.py Normal file
View File

@ -0,0 +1,13 @@
from __future__ import annotations
__all__ = [
"config",
"data",
"indicators",
"market_costs",
"intrabar",
"trade",
"metrics",
"logging_utils",
"backtest",
]

70
backtest.py Normal file
View File

@ -0,0 +1,70 @@
from __future__ import annotations
import pandas as pd
from pathlib import Path
from trade import TradeState, enter_long, exit_long, maybe_trailing_stop
from indicators import add_supertrends, compute_meta_trend
from metrics import compute_metrics
from logging_utils import write_trade_log
DEFAULT_ST_SETTINGS = [(12, 3.0), (10, 1.0), (11, 2.0)]
def backtest(
df: pd.DataFrame,
df_1min: pd.DataFrame,
timeframe_minutes: int,
stop_loss: float,
exit_on_bearish_flip: bool,
fee_bps: float,
slippage_bps: float,
log_path: Path | None = None,
):
df = add_supertrends(df, DEFAULT_ST_SETTINGS)
df["meta_bull"] = compute_meta_trend(df, DEFAULT_ST_SETTINGS)
state = TradeState(stop_loss_frac=stop_loss, fee_bps=fee_bps, slippage_bps=slippage_bps)
equity, trades = [], []
for i, row in df.iterrows():
price = float(row["Close"])
ts = pd.Timestamp(row["Timestamp"])
if state.qty <= 0 and row["meta_bull"] == 1:
evt = enter_long(state, price)
if evt:
evt.update({"t": ts.isoformat(), "reason": "bull_flip"})
trades.append(evt)
start = ts
end = df["Timestamp"].iat[i + 1] if i + 1 < len(df) else ts + pd.Timedelta(minutes=timeframe_minutes)
if state.qty > 0:
win = df_1min[(df_1min["Timestamp"] >= start) & (df_1min["Timestamp"] < end)]
for _, m in win.iterrows():
hi = float(m["High"])
lo = float(m["Low"])
state.max_px = max(state.max_px or hi, hi)
trail = state.max_px * (1.0 - state.stop_loss_frac)
if lo <= trail:
evt = exit_long(state, trail)
if evt:
prev = trades[-1]
pnl = (evt["price"] - (prev.get("price") or evt["price"])) * (prev.get("qty") or 0.0)
evt.update({"t": pd.Timestamp(m["Timestamp"]).isoformat(), "reason": "stop", "pnl": pnl})
trades.append(evt)
break
if state.qty > 0 and exit_on_bearish_flip and row["meta_bull"] == 0:
evt = exit_long(state, price)
if evt:
prev = trades[-1]
pnl = (evt["price"] - (prev.get("price") or evt["price"])) * (prev.get("qty") or 0.0)
evt.update({"t": ts.isoformat(), "reason": "bearish_flip", "pnl": pnl})
trades.append(evt)
equity.append(state.cash + state.qty * price)
equity_curve = pd.Series(equity, index=df["Timestamp"])
if log_path:
write_trade_log(trades, log_path)
perf = compute_metrics(equity_curve, trades)
return perf, equity_curve, trades

80
cli.py Normal file
View File

@ -0,0 +1,80 @@
from __future__ import annotations
import argparse
from pathlib import Path
import pandas as pd
from config import CLIConfig
from data import load_data
from backtest import backtest
def parse_args() -> CLIConfig:
p = argparse.ArgumentParser(prog="bt", description="Simple supertrend backtester")
p.add_argument("start")
p.add_argument("end")
p.add_argument("--timeframe-minutes", type=int, default=15) # single TF
p.add_argument("--timeframes-minutes", nargs="+", type=int) # multi TF: e.g. 5 15 60 240
p.add_argument("--stop-loss", dest="stop_losses", type=float, nargs="+", default=[0.02, 0.05])
p.add_argument("--exit-on-bearish-flip", action="store_true")
p.add_argument("--csv", dest="csv_path", type=Path, required=True)
p.add_argument("--out-csv", type=Path, default=Path("summary.csv"))
p.add_argument("--log-dir", type=Path, default=Path("./logs"))
p.add_argument("--fee-bps", type=float, default=10.0)
p.add_argument("--slippage-bps", type=float, default=2.0)
a = p.parse_args()
return CLIConfig(
start=a.start,
end=a.end,
timeframe_minutes=a.timeframe_minutes,
timeframes_minutes=a.timeframes_minutes,
stop_losses=a.stop_losses,
exit_on_bearish_flip=a.exit_on_bearish_flip,
csv_path=a.csv_path,
out_csv=a.out_csv,
log_dir=a.log_dir,
fee_bps=a.fee_bps,
slippage_bps=a.slippage_bps,
)
def main():
cfg = parse_args()
frames = cfg.timeframes_minutes or [cfg.timeframe_minutes]
rows: list[dict] = []
for tfm in frames:
df_1min, df = load_data(cfg.start, cfg.end, tfm, cfg.csv_path)
for sl in cfg.stop_losses:
log_path = cfg.log_dir / f"{tfm}m_sl{sl:.2%}.csv"
perf, equity, _ = backtest(
df=df,
df_1min=df_1min,
timeframe_minutes=tfm,
stop_loss=sl,
exit_on_bearish_flip=cfg.exit_on_bearish_flip,
fee_bps=cfg.fee_bps,
slippage_bps=cfg.slippage_bps,
log_path=log_path,
)
rows.append({
"timeframe": f"{tfm}min",
"stop_loss": sl,
"exit_on_bearish_flip": cfg.exit_on_bearish_flip,
"total_return": f"{perf.total_return:.2%}",
"max_drawdown": f"{perf.max_drawdown:.2%}",
"sharpe_ratio": f"{perf.sharpe_ratio:.2f}",
"win_rate": f"{perf.win_rate:.2%}",
"num_trades": perf.num_trades,
"final_equity": f"${perf.final_equity:.2f}",
"initial_equity": f"${perf.initial_equity:.2f}",
"num_stop_losses": perf.num_stop_losses,
"total_fees": perf.total_fees,
"total_slippage_usd": perf.total_slippage_usd,
"avg_slippage_bps": perf.avg_slippage_bps,
})
out = pd.DataFrame(rows)
out.to_csv(cfg.out_csv, index=False)
print(out.to_string(index=False))
if __name__ == "__main__":
main()

18
config.py Normal file
View File

@ -0,0 +1,18 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence
@dataclass
class CLIConfig:
start: str
end: str
timeframe_minutes: int
timeframes_minutes: list[int] | None
stop_losses: Sequence[float]
exit_on_bearish_flip: bool
csv_path: Path | None
out_csv: Path
log_dir: Path
fee_bps: float
slippage_bps: float

24
data.py Normal file
View File

@ -0,0 +1,24 @@
from __future__ import annotations
import pandas as pd
from pathlib import Path
def load_data(start: str, end: str, timeframe_minutes: int, csv_path: Path) -> tuple[pd.DataFrame, pd.DataFrame]:
df_1min = pd.read_csv(csv_path)
df_1min["Timestamp"] = pd.to_datetime(df_1min["Timestamp"], unit="s", utc=True)
df_1min = df_1min[(df_1min["Timestamp"] >= pd.Timestamp(start, tz="UTC")) &
(df_1min["Timestamp"] <= pd.Timestamp(end, tz="UTC"))] \
.sort_values("Timestamp").reset_index(drop=True)
if timeframe_minutes != 1:
g = df_1min.set_index("Timestamp").resample(f"{timeframe_minutes}min")
df = pd.DataFrame({
"Open": g["Open"].first(),
"High": g["High"].max(),
"Low": g["Low"].min(),
"Close": g["Close"].last(),
"Volume": g["Volume"].sum(),
}).dropna().reset_index()
else:
df = df_1min.copy()
return df_1min, df

3
indicators/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .supertrend import add_supertrends, compute_meta_trend
__all__ = ["add_supertrends", "compute_meta_trend"]

58
indicators/supertrend.py Normal file
View File

@ -0,0 +1,58 @@
from __future__ import annotations
import pandas as pd
import numpy as np
def _atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series:
hl = (high - low).abs()
hc = (high - close.shift()).abs()
lc = (low - close.shift()).abs()
tr = pd.concat([hl, hc, lc], axis=1).max(axis=1)
return tr.rolling(period, min_periods=period).mean()
def supertrend_series(df: pd.DataFrame, length: int, multiplier: float) -> pd.Series:
atr = _atr(df["High"], df["Low"], df["Close"], length)
hl2 = (df["High"] + df["Low"]) / 2
upper = hl2 + multiplier * atr
lower = hl2 - multiplier * atr
trend = pd.Series(index=df.index, dtype=float)
dir_up = True
prev_upper = np.nan
prev_lower = np.nan
for i in range(len(df)):
if i == 0 or pd.isna(atr.iat[i]):
trend.iat[i] = np.nan
prev_upper = upper.iat[i]
prev_lower = lower.iat[i]
continue
cu = min(upper.iat[i], prev_upper) if dir_up else upper.iat[i]
cl = max(lower.iat[i], prev_lower) if not dir_up else lower.iat[i]
if df["Close"].iat[i] > cu:
dir_up = True
elif df["Close"].iat[i] < cl:
dir_up = False
prev_upper = cu if dir_up else upper.iat[i]
prev_lower = lower.iat[i] if dir_up else cl
trend.iat[i] = cl if dir_up else cu
return trend
def add_supertrends(df: pd.DataFrame, settings: list[tuple[int, float]]) -> pd.DataFrame:
out = df.copy()
for length, mult in settings:
col = f"supertrend_{length}_{mult}"
out[col] = supertrend_series(out, length, mult)
out[f"bull_{length}_{mult}"] = (out["Close"] >= out[col]).astype(int)
return out
def compute_meta_trend(df: pd.DataFrame, settings: list[tuple[int, float]]) -> pd.Series:
bull_cols = [f"bull_{l}_{m}" for l, m in settings]
return (df[bull_cols].sum(axis=1) == len(bull_cols)).astype(int)

10
intrabar.py Normal file
View File

@ -0,0 +1,10 @@
from __future__ import annotations
import pandas as pd
def precompute_slices(df: pd.DataFrame) -> pd.DataFrame:
return df # hook for future use
def entry_slippage_row(price: float, qty: float, slippage_bps: float) -> float:
return price + price * (slippage_bps / 1e4)

11
logging_utils.py Normal file
View File

@ -0,0 +1,11 @@
from __future__ import annotations
from pathlib import Path
import pandas as pd
def write_trade_log(trades: list[dict], path: Path) -> None:
if not trades:
return
df = pd.DataFrame(trades)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(path, index=False)

556
main.py
View File

@ -1,556 +0,0 @@
import pandas as pd
import numpy as np
from ta.volatility import AverageTrueRange
import time
import csv
import math
import os
def load_data(since, until, csv_file):
df = pd.read_csv(csv_file)
df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
df = df[(df['Timestamp'] >= pd.Timestamp(since)) & (df['Timestamp'] <= pd.Timestamp(until))]
return df
def aggregate_data(df, timeframe):
df = df.set_index('Timestamp')
df = df.resample(timeframe).agg({
'Open': 'first',
'High': 'max',
'Low': 'min',
'Close': 'last',
'Volume': 'sum'
})
df = df.reset_index()
return df
def calculate_okx_taker_maker_fee(amount, is_maker=False):
fee_rate = 0.0008 if is_maker else 0.0010
return amount * fee_rate
def calculate_supertrend(df, period, multiplier):
"""
Calculate the Supertrend indicator for a given period and multiplier.
Optionally displays progress during calculation.
Args:
df (pd.DataFrame): DataFrame with 'High', 'Low', 'Close' columns.
period (int): ATR period.
multiplier (float): Multiplier for ATR.
progress_step (int): Step interval for progress display.
show_progress (bool): Whether to print progress updates.
Returns:
pd.Series: Supertrend values.
"""
# Ensure we have enough data for ATR calculation
if len(df) < period + 1:
print(f"Warning: Not enough data for ATR period {period}. Need at least {period + 1} rows, got {len(df)}")
return pd.Series([np.nan] * len(df), index=df.index)
high = df['High'].values
low = df['Low'].values
close = df['Close'].values
# Calculate True Range first
tr = np.zeros_like(close)
for i in range(1, len(close)):
tr[i] = max(
high[i] - low[i], # Current high - current low
abs(high[i] - close[i-1]), # Current high - previous close
abs(low[i] - close[i-1]) # Current low - previous close
)
# Calculate ATR using simple moving average
atr = np.zeros_like(close)
atr[period] = np.mean(tr[1:period+1]) # First ATR value
for i in range(period+1, len(close)):
atr[i] = (atr[i-1] * (period-1) + tr[i]) / period # Exponential-like smoothing
# Fill initial values with the first valid ATR
atr[:period] = atr[period] if atr[period] > 0 else 0.001
hl2 = (high + low) / 2
upperband = hl2 + (multiplier * atr)
lowerband = hl2 - (multiplier * atr)
supertrend = np.full_like(close, np.nan)
in_uptrend = True
supertrend[0] = upperband[0]
total_steps = len(close) - 1
for i in range(1, len(close)):
if close[i] > upperband[i-1]:
in_uptrend = True
elif close[i] < lowerband[i-1]:
in_uptrend = False
# else, keep previous trend
if in_uptrend:
supertrend[i] = max(lowerband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else lowerband[i])
else:
supertrend[i] = min(upperband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else upperband[i])
return pd.Series(supertrend, index=df.index)
def add_supertrend_indicators(df):
"""
Adds Supertrend indicators to the dataframe for the specified (period, multiplier) pairs.
Args:
df (pd.DataFrame): DataFrame with columns 'High', 'Low', 'Close'.
Returns:
pd.DataFrame: DataFrame with new Supertrend columns added.
"""
supertrend_params = [(12, 3.0), (10, 1.0), (11, 2.0)]
for period, multiplier in supertrend_params:
try:
st_col = f'supertrend_{period}_{multiplier}'
df[st_col] = calculate_supertrend(df, period, multiplier)
except Exception as e:
print(f"Error calculating Supertrend {period}, {multiplier}: {e}")
df[f'supertrend_{period}_{multiplier}'] = np.nan
return df
def precompute_1min_slice_indices(df_aggregated, df_1min):
"""
Precompute start and end indices for each aggregated bar using searchsorted.
Returns a list of (start_idx, end_idx) tuples for fast iloc slicing.
"""
timestamps = df_aggregated['Timestamp'].values
one_min_timestamps = df_1min['Timestamp'].values
# Ensure both are sorted
sorted_1min = np.argsort(one_min_timestamps)
one_min_timestamps = one_min_timestamps[sorted_1min]
indices = []
prev_idx = 0
for i in range(1, len(timestamps)):
start, end = timestamps[i-1], timestamps[i]
# Find indices using searchsorted (right for start, right for end)
start_idx = np.searchsorted(one_min_timestamps, start, side='right')
end_idx = np.searchsorted(one_min_timestamps, end, side='right')
indices.append((start_idx, end_idx))
return indices, sorted_1min
def estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate=0.0003, impact_threshold_pct=0.10, impact_slope=0.0010):
"""
Estimate total slippage rate (decimal) using a hybrid model:
- Base slippage: fixed base_slippage_rate (e.g., 0.0003 = 3 bps)
- Extra slippage: if trade size (USD) > impact_threshold_pct * 1-min USD volume,
add impact_slope * (trade_size/threshold - 1)
Args:
trade_usd_size (float): Trade notional in USD before slippage.
minute_row (pd.Series|None): 1-min bar with 'Volume' and a price ('Close' preferred, fallback 'Open').
base_slippage_rate (float): Base slippage in decimal.
impact_threshold_pct (float): Threshold as fraction of 1-min volume (e.g., 0.10 = 10%).
impact_slope (float): Rate added per 1x over threshold (decimal).
Returns:
float: total slippage rate (>= base_slippage_rate).
"""
if minute_row is None:
return float(base_slippage_rate)
try:
minute_base_vol = float(minute_row.get('Volume', 0.0) or 0.0)
minute_price = float(minute_row.get('Close', minute_row.get('Open', 0.0)) or 0.0)
minute_quote_vol = minute_base_vol * minute_price
except Exception:
minute_quote_vol = 0.0
if minute_quote_vol <= 0 or impact_threshold_pct <= 0:
return float(base_slippage_rate)
threshold_quote = minute_quote_vol * impact_threshold_pct
if trade_usd_size <= threshold_quote:
return float(base_slippage_rate)
over_ratio = (trade_usd_size / threshold_quote) - 1.0
extra_slippage = max(0.0, impact_slope * over_ratio)
return float(base_slippage_rate + extra_slippage)
def backtest(timeframe, df_aggregated, df_1min, stop_loss_pct, progress_step=1000,
base_slippage_rate=0.0003, impact_threshold_pct=0.10, impact_slope=0.0010):
"""
Backtest trading strategy based on meta supertrend logic (all three supertrends agree).
Uses signal transitions and open prices for entry/exit to match original implementation.
"""
start_time = time.time()
required_st_cols = ["supertrend_12_3.0", "supertrend_10_1.0", "supertrend_11_2.0"]
for col in required_st_cols:
if col not in df_aggregated.columns:
raise ValueError(f"Missing required Supertrend column: {col}")
# Calculate trend directions for each supertrend (-1, 0, 1)
trends = []
for col in required_st_cols:
# Convert supertrend values to trend direction based on close price position
trend = np.where(df_aggregated['Close'] > df_aggregated[col], 1, -1)
trends.append(trend)
# Stack trends and calculate meta trend (all must agree)
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
meta_trend_signal = meta_trend #incorrect: should be lagging as it introduces lookahead bias.
# Next step: modify OHLCV predictor to not use supertrend as a feature or anyother feature
# that introduces lookahead bias and predict the next close price.
#
# Old code, not that efficient:
# Add signal lagging to avoid lookahead bias
# meta_trend_signal = np.roll(meta_trend, 1)
# meta_trend_signal[0] = 0 # No signal for first bar
# Precompute 1-min slice indices for each aggregated bar
slice_indices, sorted_1min = precompute_1min_slice_indices(df_aggregated, df_1min)
df_1min_sorted = df_1min.iloc[sorted_1min].reset_index(drop=True)
one_min_timestamps_sorted = df_1min_sorted['Timestamp'].values
in_position = False
init_usd = 1000
usd = init_usd
coin = 0
nb_stop_loss = 0
trade_log = []
equity_curve = []
trade_results = []
entry_price = None
entry_time = None
total_slippage_usd = 0.0
total_traded_usd = 0.0
total_steps = len(df_aggregated) - 1
for i in range(1, len(df_aggregated)):
open_price = df_aggregated['Open'][i] # Use open price for entry/exit
close_price = df_aggregated['Close'][i]
timestamp = df_aggregated['Timestamp'][i]
# Get previous and current meta trend signals
prev_mt = meta_trend_signal[i-1] if i > 0 else 0
curr_mt = meta_trend_signal[i]
# Track equity at each bar
equity = usd + coin * close_price
equity_curve.append((timestamp, equity))
# Check stop loss if in position
if in_position:
start_idx, end_idx = slice_indices[i-1]
df_1min_slice = df_1min_sorted.iloc[start_idx:end_idx]
stop_triggered = False
if not df_1min_slice.empty:
stop_loss_threshold = entry_price * (1 - stop_loss_pct)
below_stop = df_1min_slice['Low'] < stop_loss_threshold
if below_stop.any():
first_idx = below_stop.idxmax()
stop_row = df_1min_slice.loc[first_idx]
stop_triggered = True
in_position = False
# More realistic stop loss fill logic with slippage
if stop_row['Open'] < stop_loss_threshold:
base_exit_price = stop_row['Open']
else:
base_exit_price = stop_loss_threshold
trade_usd_size = float(coin * base_exit_price)
slip_rate = estimate_slippage_rate(trade_usd_size, stop_row, base_slippage_rate, impact_threshold_pct, impact_slope)
exit_price = base_exit_price * (1.0 - slip_rate)
exit_time = stop_row['Timestamp']
gross_usd = coin * exit_price
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
usd = gross_usd - fee
trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0
total_slippage_usd += trade_usd_size * slip_rate
total_traded_usd += trade_usd_size
trade_results.append(trade_pnl)
trade_log.append({
'type': 'stop_loss',
'time': exit_time,
'base_price': base_exit_price,
'effective_price': exit_price,
'slippage_rate': slip_rate,
'usd': usd,
'coin': 0,
'pnl': trade_pnl,
'fee': fee
})
coin = 0
nb_stop_loss += 1
entry_price = None
entry_time = None
if stop_triggered:
continue
# Entry condition: signal changes TO bullish (prev != 1 and curr == 1)
if not in_position and prev_mt != 1 and curr_mt == 1:
in_position = True
fee = calculate_okx_taker_maker_fee(usd, is_maker=False)
usd_after_fee = usd - fee
# Slippage on buy increases price
try:
ts64 = np.datetime64(timestamp)
idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left'))
minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None
except Exception:
minute_row = None
trade_usd_size = float(usd_after_fee)
slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope)
effective_entry_price = open_price * (1.0 + slip_rate)
coin = usd_after_fee / effective_entry_price
entry_price = effective_entry_price
entry_time = timestamp
usd = 0
total_slippage_usd += trade_usd_size * slip_rate
total_traded_usd += trade_usd_size
trade_log.append({
'type': 'buy',
'time': timestamp,
'base_price': open_price,
'effective_price': effective_entry_price,
'slippage_rate': slip_rate,
'usd': usd,
'coin': coin,
'fee': fee
})
# Exit condition: signal changes TO bearish (prev == 1 and curr == -1)
elif in_position and prev_mt == 1 and curr_mt == -1:
in_position = False
# Slippage on sell reduces price
try:
ts64 = np.datetime64(timestamp)
idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left'))
minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None
except Exception:
minute_row = None
base_exit_price = open_price
trade_usd_size = float(coin * base_exit_price)
slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope)
exit_price = base_exit_price * (1.0 - slip_rate)
exit_time = timestamp
gross_usd = coin * exit_price
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
usd = gross_usd - fee
trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0
total_slippage_usd += trade_usd_size * slip_rate
total_traded_usd += trade_usd_size
trade_results.append(trade_pnl)
trade_log.append({
'type': 'sell',
'time': exit_time,
'base_price': base_exit_price,
'effective_price': exit_price,
'slippage_rate': slip_rate,
'usd': usd,
'coin': 0,
'pnl': trade_pnl,
'fee': fee
})
coin = 0
entry_price = None
entry_time = None
if i % progress_step == 0 or i == total_steps:
percent = (i / total_steps) * 100
print(f"\rTimeframe: {timeframe},\tProgress: {percent:.1f}%\tCurrent equity: {equity:.2f}\033[K", end='', flush=True)
# Force close any open position at the end
if in_position:
final_open_price = df_aggregated['Open'].iloc[-1] # Use open price for consistency
final_timestamp = df_aggregated['Timestamp'].iloc[-1]
try:
ts64 = np.datetime64(final_timestamp)
idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left'))
minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None
except Exception:
minute_row = None
base_exit_price = final_open_price
trade_usd_size = float(coin * base_exit_price)
slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope)
final_effective_price = base_exit_price * (1.0 - slip_rate)
gross_usd = coin * final_effective_price
fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False)
usd = gross_usd - fee
trade_pnl = (final_effective_price - entry_price) / entry_price if entry_price else 0
total_slippage_usd += trade_usd_size * slip_rate
total_traded_usd += trade_usd_size
trade_results.append(trade_pnl)
trade_log.append({
'type': 'forced_close',
'time': final_timestamp,
'base_price': base_exit_price,
'effective_price': final_effective_price,
'slippage_rate': slip_rate,
'usd': usd,
'coin': 0,
'pnl': trade_pnl,
'fee': fee
})
coin = 0
in_position = False
entry_price = None
print()
print(f"Timeframe: {timeframe},\tTotal profit: {usd - init_usd},\tNumber of stop losses: {nb_stop_loss}")
# --- Performance Metrics ---
equity_arr = np.array([e[1] for e in equity_curve])
# Handle edge cases for empty or invalid equity data
if len(equity_arr) == 0:
print("Warning: No equity data available")
return None
returns = np.diff(equity_arr) / equity_arr[:-1]
# Filter out infinite and NaN returns
returns = returns[np.isfinite(returns)]
total_return = (equity_arr[-1] - equity_arr[0]) / equity_arr[0] if equity_arr[0] != 0 else 0
running_max = np.maximum.accumulate(equity_arr)
if equity_arr[-1] <= 0.01:
max_drawdown = -1.0
else:
drawdowns = (equity_arr - running_max) / running_max
max_drawdown = drawdowns.min() if len(drawdowns) > 0 and np.isfinite(drawdowns).any() else 0
if len(returns) > 1 and np.std(returns) > 1e-9:
sharpe = np.mean(returns) / np.std(returns) * math.sqrt(252)
else:
sharpe = 0
wins = [1 for r in trade_results if r > 0]
win_rate = len(wins) / len(trade_results) if trade_results else 0
num_trades = len(trade_results)
print(f"Performance Metrics:")
print(f" Total Return: {total_return*100:.2f}%")
print(f" Max Drawdown: {max_drawdown*100:.2f}%")
print(f" Sharpe Ratio: {sharpe:.2f}")
print(f" Win Rate: {win_rate*100:.2f}%")
print(f" Number of Trades: {num_trades}")
print(f" Final Equity: ${equity_arr[-1]:.2f}")
print(f" Initial Equity: ${equity_arr[0]:.2f}")
# --- Save Trade Log ---
log_dir = "backtest_logs"
os.makedirs(log_dir, exist_ok=True)
# Format stop_loss_pct for filename (e.g., 0.05 -> 0p05)
stop_loss_str = f"{stop_loss_pct:.2f}".replace('.', 'p')
log_path = os.path.join(log_dir, f"trade_log_{timeframe}_sl{stop_loss_str}.csv")
if trade_log:
all_keys = set()
for entry in trade_log:
all_keys.update(entry.keys())
all_keys = list(all_keys)
trade_log_filled = []
for entry in trade_log:
filled_entry = {k: entry.get(k, None) for k in all_keys}
trade_log_filled.append(filled_entry)
# Calculate total fees for this backtest
total_fees = sum(entry.get('fee', 0) for entry in trade_log)
# Write summary header row, then trade log header and rows
with open(log_path, 'w', newline='') as f:
writer = csv.writer(f)
summary_header = [
'elapsed_time_sec', 'total_return', 'max_drawdown', 'sharpe_ratio',
'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees',
'total_slippage_usd', 'avg_slippage_bps'
]
summary_values = [
f"{time.time() - start_time:.2f}",
f"{total_return*100:.2f}%",
f"{max_drawdown*100:.2f}%",
f"{sharpe:.2f}",
f"{win_rate*100:.2f}%",
str(num_trades),
f"${equity_arr[-1]:.2f}",
f"${equity_arr[0]:.2f}",
str(nb_stop_loss),
f"${total_fees:.4f}",
f"${total_slippage_usd:.4f}",
f"{(total_slippage_usd / total_traded_usd * 10000.0) if total_traded_usd > 0 else 0:.2f}"
]
writer.writerow(summary_header)
writer.writerow(summary_values)
writer.writerow([]) # Blank row for separation
dict_writer = csv.DictWriter(f, fieldnames=all_keys)
dict_writer.writeheader()
dict_writer.writerows(trade_log_filled)
print(f"Trade log saved to {log_path}")
else:
print("No trades to log.")
# Return summary metrics (excluding elapsed time)
return {
'timeframe': timeframe,
'stop_loss': stop_loss_pct,
'total_return': total_return,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe,
'win_rate': win_rate,
'num_trades': num_trades,
'final_equity': equity_arr[-1],
'initial_equity': equity_arr[0],
'num_stop_losses': nb_stop_loss,
'total_fees': total_fees if trade_log else 0,
'total_slippage_usd': total_slippage_usd,
'avg_slippage_bps': (total_slippage_usd / total_traded_usd * 10000.0) if total_traded_usd > 0 else 0.0
}
if __name__ == "__main__":
timeframes = ["5min", "15min", "30min", "1h", "4h", "1d", "2d"]
# timeframes = ["5min", "15min", "1h", "4h", "1d"]
# timeframes = ["30min"]
stoplosses = [0.1, 0.2, 0.3, 0.4, 0.5]
# Slippage configuration (OKX Spot): base in bps, plus volume-impact model
slippage_base_bps = 10 # 10 bps base slippage (realistic, conservative)
impact_threshold_pct = 0.10 # e.g., start impact beyond 10% of 1-min volume
impact_slope = 0.0010 # incremental slippage per 1x over threshold
# df_1min = load_data('2021-11-01', '2024-10-16', '../data/btcusd_1-min_data.csv')
df_1min = load_data('2021-11-01', '2025-08-19', '../data/btcusd_okx_1-min_data.csv')
# Prepare summary CSV
summary_csv_path = "backtest_summary.csv"
summary_header = [
'timeframe', 'stop_loss', 'total_return', 'max_drawdown', 'sharpe_ratio',
'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees',
'total_slippage_usd', 'avg_slippage_bps'
]
with open(summary_csv_path, 'w', newline='') as summary_file:
writer = csv.DictWriter(summary_file, fieldnames=summary_header)
writer.writeheader()
for timeframe in timeframes:
df_aggregated = aggregate_data(df_1min, timeframe)
df_aggregated = add_supertrend_indicators(df_aggregated)
for stop_loss_pct in stoplosses:
summary = backtest(
timeframe,
df_aggregated,
df_1min,
stop_loss_pct=stop_loss_pct,
base_slippage_rate=slippage_base_bps / 10000.0,
impact_threshold_pct=impact_threshold_pct,
impact_slope=impact_slope
)
if summary is not None:
# Format values for CSV (e.g., floats as rounded strings)
summary_row = {
'timeframe': summary['timeframe'],
'stop_loss': summary['stop_loss'],
'total_return': f"{summary['total_return']*100:.2f}%",
'max_drawdown': f"{summary['max_drawdown']*100:.2f}%",
'sharpe_ratio': f"{summary['sharpe_ratio']:.2f}",
'win_rate': f"{summary['win_rate']*100:.2f}%",
'num_trades': summary['num_trades'],
'final_equity': f"${summary['final_equity']:.2f}",
'initial_equity': f"${summary['initial_equity']:.2f}",
'num_stop_losses': summary['num_stop_losses'],
'total_fees': f"${summary['total_fees']:.4f}",
'total_slippage_usd': f"${summary['total_slippage_usd']:.4f}",
'avg_slippage_bps': f"{summary['avg_slippage_bps']:.2f}"
}
writer.writerow(summary_row)

11
market_costs.py Normal file
View File

@ -0,0 +1,11 @@
from __future__ import annotations
TAKER_FEE_BPS_DEFAULT = 10.0 # 0.10%
def okx_fee(fee_bps: float, notional_usd: float) -> float:
return notional_usd * (fee_bps / 1e4)
def estimate_slippage_rate(slippage_bps: float, notional_usd: float) -> float:
return notional_usd * (slippage_bps / 1e4)

54
metrics.py Normal file
View File

@ -0,0 +1,54 @@
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
import pandas as pd
@dataclass
class Perf:
total_return: float
max_drawdown: float
sharpe_ratio: float
win_rate: float
num_trades: int
final_equity: float
initial_equity: float
num_stop_losses: int
total_fees: float
total_slippage_usd: float
avg_slippage_bps: float
def compute_metrics(equity_curve: pd.Series, trades: list[dict]) -> Perf:
ret = equity_curve.pct_change().fillna(0.0)
total_return = equity_curve.iat[-1] / equity_curve.iat[0] - 1.0
cummax = equity_curve.cummax()
dd = (equity_curve / cummax - 1.0).min()
max_drawdown = dd
if ret.std(ddof=0) > 0:
sharpe = (ret.mean() / ret.std(ddof=0)) * np.sqrt(252 * 24 * 60) # minute bars -> annualized
else:
sharpe = 0.0
closes = [t for t in trades if t.get("side") == "SELL"]
wins = [t for t in closes if t.get("pnl", 0.0) > 0]
win_rate = (len(wins) / len(closes)) if closes else 0.0
fees = sum(t.get("fee", 0.0) for t in trades)
slip = sum(t.get("slippage", 0.0) for t in trades)
slippage_bps = [t.get("slippage_bps", 0.0) for t in trades if "slippage_bps" in t]
return Perf(
total_return=total_return,
max_drawdown=max_drawdown,
sharpe_ratio=sharpe,
win_rate=win_rate,
num_trades=len(closes),
final_equity=float(equity_curve.iat[-1]),
initial_equity=float(equity_curve.iat[0]),
num_stop_losses=sum(1 for t in closes if t.get("reason") == "stop"),
total_fees=fees,
total_slippage_usd=slip,
avg_slippage_bps=float(np.mean(slippage_bps)) if slippage_bps else 0.0,
)

52
trade.py Normal file
View File

@ -0,0 +1,52 @@
from __future__ import annotations
from dataclasses import dataclass
import pandas as pd
from market_costs import okx_fee, estimate_slippage_rate
from intrabar import entry_slippage_row
@dataclass
class TradeState:
cash: float = 1000.0
qty: float = 0.0
entry_px: float | None = None
max_px: float | None = None
stop_loss_frac: float = 0.02
fee_bps: float = 10.0
slippage_bps: float = 2.0
def enter_long(state: TradeState, price: float) -> dict:
if state.qty > 0:
return {}
px = entry_slippage_row(price, 0.0, state.slippage_bps)
qty = state.cash / px
fee = okx_fee(state.fee_bps, state.cash)
state.qty = max(qty - fee / px, 0.0)
state.cash = 0.0
state.entry_px = px
state.max_px = px
return {"side": "BUY", "price": px, "qty": state.qty, "fee": fee}
def maybe_trailing_stop(state: TradeState, price: float) -> float:
if state.qty <= 0:
return float("inf")
state.max_px = max(state.max_px or price, price)
trail_px = state.max_px * (1.0 - state.stop_loss_frac)
return trail_px
def exit_long(state: TradeState, price: float) -> dict:
if state.qty <= 0:
return {}
notional = state.qty * price
slip = estimate_slippage_rate(state.slippage_bps, notional)
fee = okx_fee(state.fee_bps, notional)
cash_back = notional - slip - fee
event = {"side": "SELL", "price": price, "qty": state.qty, "fee": fee, "slippage": slip}
state.cash = cash_back
state.qty = 0.0
state.entry_px = None
state.max_px = None
return event