From c4aa965a984bb6d8b262d02755ac4c5da023d355 Mon Sep 17 00:00:00 2001 From: Simon Moisy Date: Fri, 9 Jan 2026 19:53:01 +0800 Subject: [PATCH] Add initial implementation of backtesting framework with CLI interface. Introduce core modules for data loading, trade management, performance metrics, and logging. Include Supertrend indicator calculations and slippage estimation. Update .gitignore to exclude logs and CSV files. --- .gitignore | 2 + .vscode/launch.json | 30 ++- __init__.py | 13 + backtest.py | 70 +++++ cli.py | 80 ++++++ config.py | 18 ++ data.py | 24 ++ indicators/__init__.py | 3 + indicators/supertrend.py | 58 ++++ intrabar.py | 10 + logging_utils.py | 11 + main.py | 556 --------------------------------------- market_costs.py | 11 + metrics.py | 54 ++++ trade.py | 52 ++++ 15 files changed, 424 insertions(+), 568 deletions(-) create mode 100644 __init__.py create mode 100644 backtest.py create mode 100644 cli.py create mode 100644 config.py create mode 100644 data.py create mode 100644 indicators/__init__.py create mode 100644 indicators/supertrend.py create mode 100644 intrabar.py create mode 100644 logging_utils.py delete mode 100644 main.py create mode 100644 market_costs.py create mode 100644 metrics.py create mode 100644 trade.py diff --git a/.gitignore b/.gitignore index 0dbf2f2..736352e 100644 --- a/.gitignore +++ b/.gitignore @@ -168,3 +168,5 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +./logs/ +*.csv \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index bb1ca3d..237a81e 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,16 +1,22 @@ { - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ - - { - "name": "Python Debugger: main.py", - "type": "debugpy", - "request": "launch", - "program": "main.py", - "console": "integratedTerminal" - } + { + "name": "Python Debugger: cli.py", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/cli.py", + "console": "integratedTerminal", + "cwd": "${workspaceFolder}", + "args": [ + "2025-03-01", "2025-09-22", + "--timeframes-minutes", "60", "240", "480", + "--stop-loss", "0.02", "0.05", + "--exit-on-bearish-flip", + "--csv", "../data/btcusd_1-min_data.csv" + ], + "env": { "PYTHONPATH": "${workspaceFolder}" } + } ] -} \ No newline at end of file + } + \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..b9b60b2 --- /dev/null +++ b/__init__.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +__all__ = [ + "config", + "data", + "indicators", + "market_costs", + "intrabar", + "trade", + "metrics", + "logging_utils", + "backtest", +] \ No newline at end of file diff --git a/backtest.py b/backtest.py new file mode 100644 index 0000000..d4300cd --- /dev/null +++ b/backtest.py @@ -0,0 +1,70 @@ +from __future__ import annotations +import pandas as pd +from pathlib import Path +from trade import TradeState, enter_long, exit_long, maybe_trailing_stop +from indicators import add_supertrends, compute_meta_trend +from metrics import compute_metrics +from logging_utils import write_trade_log + +DEFAULT_ST_SETTINGS = [(12, 3.0), (10, 1.0), (11, 2.0)] + +def backtest( + df: pd.DataFrame, + df_1min: pd.DataFrame, + timeframe_minutes: int, + stop_loss: float, + exit_on_bearish_flip: bool, + fee_bps: float, + slippage_bps: float, + log_path: Path | None = None, +): + df = add_supertrends(df, DEFAULT_ST_SETTINGS) + df["meta_bull"] = compute_meta_trend(df, DEFAULT_ST_SETTINGS) + + state = TradeState(stop_loss_frac=stop_loss, fee_bps=fee_bps, slippage_bps=slippage_bps) + equity, trades = [], [] + + for i, row in df.iterrows(): + price = float(row["Close"]) + ts = pd.Timestamp(row["Timestamp"]) + + if state.qty <= 0 and row["meta_bull"] == 1: + evt = enter_long(state, price) + if evt: + evt.update({"t": ts.isoformat(), "reason": "bull_flip"}) + trades.append(evt) + + start = ts + end = df["Timestamp"].iat[i + 1] if i + 1 < len(df) else ts + pd.Timedelta(minutes=timeframe_minutes) + + if state.qty > 0: + win = df_1min[(df_1min["Timestamp"] >= start) & (df_1min["Timestamp"] < end)] + for _, m in win.iterrows(): + hi = float(m["High"]) + lo = float(m["Low"]) + state.max_px = max(state.max_px or hi, hi) + trail = state.max_px * (1.0 - state.stop_loss_frac) + if lo <= trail: + evt = exit_long(state, trail) + if evt: + prev = trades[-1] + pnl = (evt["price"] - (prev.get("price") or evt["price"])) * (prev.get("qty") or 0.0) + evt.update({"t": pd.Timestamp(m["Timestamp"]).isoformat(), "reason": "stop", "pnl": pnl}) + trades.append(evt) + break + + if state.qty > 0 and exit_on_bearish_flip and row["meta_bull"] == 0: + evt = exit_long(state, price) + if evt: + prev = trades[-1] + pnl = (evt["price"] - (prev.get("price") or evt["price"])) * (prev.get("qty") or 0.0) + evt.update({"t": ts.isoformat(), "reason": "bearish_flip", "pnl": pnl}) + trades.append(evt) + + equity.append(state.cash + state.qty * price) + + equity_curve = pd.Series(equity, index=df["Timestamp"]) + if log_path: + write_trade_log(trades, log_path) + perf = compute_metrics(equity_curve, trades) + return perf, equity_curve, trades diff --git a/cli.py b/cli.py new file mode 100644 index 0000000..58b1493 --- /dev/null +++ b/cli.py @@ -0,0 +1,80 @@ +from __future__ import annotations +import argparse +from pathlib import Path +import pandas as pd + +from config import CLIConfig +from data import load_data +from backtest import backtest + +def parse_args() -> CLIConfig: + p = argparse.ArgumentParser(prog="bt", description="Simple supertrend backtester") + p.add_argument("start") + p.add_argument("end") + p.add_argument("--timeframe-minutes", type=int, default=15) # single TF + p.add_argument("--timeframes-minutes", nargs="+", type=int) # multi TF: e.g. 5 15 60 240 + p.add_argument("--stop-loss", dest="stop_losses", type=float, nargs="+", default=[0.02, 0.05]) + p.add_argument("--exit-on-bearish-flip", action="store_true") + p.add_argument("--csv", dest="csv_path", type=Path, required=True) + p.add_argument("--out-csv", type=Path, default=Path("summary.csv")) + p.add_argument("--log-dir", type=Path, default=Path("./logs")) + p.add_argument("--fee-bps", type=float, default=10.0) + p.add_argument("--slippage-bps", type=float, default=2.0) + a = p.parse_args() + + return CLIConfig( + start=a.start, + end=a.end, + timeframe_minutes=a.timeframe_minutes, + timeframes_minutes=a.timeframes_minutes, + stop_losses=a.stop_losses, + exit_on_bearish_flip=a.exit_on_bearish_flip, + csv_path=a.csv_path, + out_csv=a.out_csv, + log_dir=a.log_dir, + fee_bps=a.fee_bps, + slippage_bps=a.slippage_bps, + ) + +def main(): + cfg = parse_args() + frames = cfg.timeframes_minutes or [cfg.timeframe_minutes] + + rows: list[dict] = [] + for tfm in frames: + df_1min, df = load_data(cfg.start, cfg.end, tfm, cfg.csv_path) + for sl in cfg.stop_losses: + log_path = cfg.log_dir / f"{tfm}m_sl{sl:.2%}.csv" + perf, equity, _ = backtest( + df=df, + df_1min=df_1min, + timeframe_minutes=tfm, + stop_loss=sl, + exit_on_bearish_flip=cfg.exit_on_bearish_flip, + fee_bps=cfg.fee_bps, + slippage_bps=cfg.slippage_bps, + log_path=log_path, + ) + rows.append({ + "timeframe": f"{tfm}min", + "stop_loss": sl, + "exit_on_bearish_flip": cfg.exit_on_bearish_flip, + "total_return": f"{perf.total_return:.2%}", + "max_drawdown": f"{perf.max_drawdown:.2%}", + "sharpe_ratio": f"{perf.sharpe_ratio:.2f}", + "win_rate": f"{perf.win_rate:.2%}", + "num_trades": perf.num_trades, + "final_equity": f"${perf.final_equity:.2f}", + "initial_equity": f"${perf.initial_equity:.2f}", + "num_stop_losses": perf.num_stop_losses, + "total_fees": perf.total_fees, + "total_slippage_usd": perf.total_slippage_usd, + "avg_slippage_bps": perf.avg_slippage_bps, + }) + + out = pd.DataFrame(rows) + out.to_csv(cfg.out_csv, index=False) + print(out.to_string(index=False)) + +if __name__ == "__main__": + main() diff --git a/config.py b/config.py new file mode 100644 index 0000000..70c9f3d --- /dev/null +++ b/config.py @@ -0,0 +1,18 @@ +from __future__ import annotations +from dataclasses import dataclass +from pathlib import Path +from typing import Sequence + +@dataclass +class CLIConfig: + start: str + end: str + timeframe_minutes: int + timeframes_minutes: list[int] | None + stop_losses: Sequence[float] + exit_on_bearish_flip: bool + csv_path: Path | None + out_csv: Path + log_dir: Path + fee_bps: float + slippage_bps: float diff --git a/data.py b/data.py new file mode 100644 index 0000000..1196648 --- /dev/null +++ b/data.py @@ -0,0 +1,24 @@ +from __future__ import annotations +import pandas as pd +from pathlib import Path + +def load_data(start: str, end: str, timeframe_minutes: int, csv_path: Path) -> tuple[pd.DataFrame, pd.DataFrame]: + df_1min = pd.read_csv(csv_path) + df_1min["Timestamp"] = pd.to_datetime(df_1min["Timestamp"], unit="s", utc=True) + df_1min = df_1min[(df_1min["Timestamp"] >= pd.Timestamp(start, tz="UTC")) & + (df_1min["Timestamp"] <= pd.Timestamp(end, tz="UTC"))] \ + .sort_values("Timestamp").reset_index(drop=True) + + if timeframe_minutes != 1: + g = df_1min.set_index("Timestamp").resample(f"{timeframe_minutes}min") + df = pd.DataFrame({ + "Open": g["Open"].first(), + "High": g["High"].max(), + "Low": g["Low"].min(), + "Close": g["Close"].last(), + "Volume": g["Volume"].sum(), + }).dropna().reset_index() + else: + df = df_1min.copy() + + return df_1min, df diff --git a/indicators/__init__.py b/indicators/__init__.py new file mode 100644 index 0000000..016e1fe --- /dev/null +++ b/indicators/__init__.py @@ -0,0 +1,3 @@ +from .supertrend import add_supertrends, compute_meta_trend + +__all__ = ["add_supertrends", "compute_meta_trend"] \ No newline at end of file diff --git a/indicators/supertrend.py b/indicators/supertrend.py new file mode 100644 index 0000000..4a23d11 --- /dev/null +++ b/indicators/supertrend.py @@ -0,0 +1,58 @@ +from __future__ import annotations +import pandas as pd +import numpy as np + + +def _atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series: + hl = (high - low).abs() + hc = (high - close.shift()).abs() + lc = (low - close.shift()).abs() + tr = pd.concat([hl, hc, lc], axis=1).max(axis=1) + return tr.rolling(period, min_periods=period).mean() + + +def supertrend_series(df: pd.DataFrame, length: int, multiplier: float) -> pd.Series: + atr = _atr(df["High"], df["Low"], df["Close"], length) + hl2 = (df["High"] + df["Low"]) / 2 + upper = hl2 + multiplier * atr + lower = hl2 - multiplier * atr + + trend = pd.Series(index=df.index, dtype=float) + dir_up = True + prev_upper = np.nan + prev_lower = np.nan + + for i in range(len(df)): + if i == 0 or pd.isna(atr.iat[i]): + trend.iat[i] = np.nan + prev_upper = upper.iat[i] + prev_lower = lower.iat[i] + continue + + cu = min(upper.iat[i], prev_upper) if dir_up else upper.iat[i] + cl = max(lower.iat[i], prev_lower) if not dir_up else lower.iat[i] + + if df["Close"].iat[i] > cu: + dir_up = True + elif df["Close"].iat[i] < cl: + dir_up = False + + prev_upper = cu if dir_up else upper.iat[i] + prev_lower = lower.iat[i] if dir_up else cl + trend.iat[i] = cl if dir_up else cu + + return trend + + +def add_supertrends(df: pd.DataFrame, settings: list[tuple[int, float]]) -> pd.DataFrame: + out = df.copy() + for length, mult in settings: + col = f"supertrend_{length}_{mult}" + out[col] = supertrend_series(out, length, mult) + out[f"bull_{length}_{mult}"] = (out["Close"] >= out[col]).astype(int) + return out + + +def compute_meta_trend(df: pd.DataFrame, settings: list[tuple[int, float]]) -> pd.Series: + bull_cols = [f"bull_{l}_{m}" for l, m in settings] + return (df[bull_cols].sum(axis=1) == len(bull_cols)).astype(int) \ No newline at end of file diff --git a/intrabar.py b/intrabar.py new file mode 100644 index 0000000..f107456 --- /dev/null +++ b/intrabar.py @@ -0,0 +1,10 @@ +from __future__ import annotations +import pandas as pd + + +def precompute_slices(df: pd.DataFrame) -> pd.DataFrame: + return df # hook for future use + + +def entry_slippage_row(price: float, qty: float, slippage_bps: float) -> float: + return price + price * (slippage_bps / 1e4) \ No newline at end of file diff --git a/logging_utils.py b/logging_utils.py new file mode 100644 index 0000000..284bb7a --- /dev/null +++ b/logging_utils.py @@ -0,0 +1,11 @@ +from __future__ import annotations +from pathlib import Path +import pandas as pd + + +def write_trade_log(trades: list[dict], path: Path) -> None: + if not trades: + return + df = pd.DataFrame(trades) + path.parent.mkdir(parents=True, exist_ok=True) + df.to_csv(path, index=False) \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 8f0fe91..0000000 --- a/main.py +++ /dev/null @@ -1,556 +0,0 @@ -import pandas as pd -import numpy as np -from ta.volatility import AverageTrueRange -import time -import csv -import math -import os - - -def load_data(since, until, csv_file): - df = pd.read_csv(csv_file) - df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s') - df = df[(df['Timestamp'] >= pd.Timestamp(since)) & (df['Timestamp'] <= pd.Timestamp(until))] - return df - -def aggregate_data(df, timeframe): - df = df.set_index('Timestamp') - df = df.resample(timeframe).agg({ - 'Open': 'first', - 'High': 'max', - 'Low': 'min', - 'Close': 'last', - 'Volume': 'sum' - }) - df = df.reset_index() - return df - -def calculate_okx_taker_maker_fee(amount, is_maker=False): - fee_rate = 0.0008 if is_maker else 0.0010 - return amount * fee_rate - -def calculate_supertrend(df, period, multiplier): - """ - Calculate the Supertrend indicator for a given period and multiplier. - Optionally displays progress during calculation. - Args: - df (pd.DataFrame): DataFrame with 'High', 'Low', 'Close' columns. - period (int): ATR period. - multiplier (float): Multiplier for ATR. - progress_step (int): Step interval for progress display. - show_progress (bool): Whether to print progress updates. - Returns: - pd.Series: Supertrend values. - """ - # Ensure we have enough data for ATR calculation - if len(df) < period + 1: - print(f"Warning: Not enough data for ATR period {period}. Need at least {period + 1} rows, got {len(df)}") - return pd.Series([np.nan] * len(df), index=df.index) - - high = df['High'].values - low = df['Low'].values - close = df['Close'].values - - # Calculate True Range first - tr = np.zeros_like(close) - for i in range(1, len(close)): - tr[i] = max( - high[i] - low[i], # Current high - current low - abs(high[i] - close[i-1]), # Current high - previous close - abs(low[i] - close[i-1]) # Current low - previous close - ) - - # Calculate ATR using simple moving average - atr = np.zeros_like(close) - atr[period] = np.mean(tr[1:period+1]) # First ATR value - for i in range(period+1, len(close)): - atr[i] = (atr[i-1] * (period-1) + tr[i]) / period # Exponential-like smoothing - - # Fill initial values with the first valid ATR - atr[:period] = atr[period] if atr[period] > 0 else 0.001 - - hl2 = (high + low) / 2 - upperband = hl2 + (multiplier * atr) - lowerband = hl2 - (multiplier * atr) - - supertrend = np.full_like(close, np.nan) - in_uptrend = True - - supertrend[0] = upperband[0] - total_steps = len(close) - 1 - - for i in range(1, len(close)): - if close[i] > upperband[i-1]: - in_uptrend = True - elif close[i] < lowerband[i-1]: - in_uptrend = False - # else, keep previous trend - - if in_uptrend: - supertrend[i] = max(lowerband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else lowerband[i]) - else: - supertrend[i] = min(upperband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else upperband[i]) - - return pd.Series(supertrend, index=df.index) - -def add_supertrend_indicators(df): - """ - Adds Supertrend indicators to the dataframe for the specified (period, multiplier) pairs. - Args: - df (pd.DataFrame): DataFrame with columns 'High', 'Low', 'Close'. - Returns: - pd.DataFrame: DataFrame with new Supertrend columns added. - """ - supertrend_params = [(12, 3.0), (10, 1.0), (11, 2.0)] - for period, multiplier in supertrend_params: - try: - st_col = f'supertrend_{period}_{multiplier}' - df[st_col] = calculate_supertrend(df, period, multiplier) - except Exception as e: - print(f"Error calculating Supertrend {period}, {multiplier}: {e}") - df[f'supertrend_{period}_{multiplier}'] = np.nan - return df - -def precompute_1min_slice_indices(df_aggregated, df_1min): - """ - Precompute start and end indices for each aggregated bar using searchsorted. - Returns a list of (start_idx, end_idx) tuples for fast iloc slicing. - """ - timestamps = df_aggregated['Timestamp'].values - one_min_timestamps = df_1min['Timestamp'].values - # Ensure both are sorted - sorted_1min = np.argsort(one_min_timestamps) - one_min_timestamps = one_min_timestamps[sorted_1min] - indices = [] - prev_idx = 0 - for i in range(1, len(timestamps)): - start, end = timestamps[i-1], timestamps[i] - # Find indices using searchsorted (right for start, right for end) - start_idx = np.searchsorted(one_min_timestamps, start, side='right') - end_idx = np.searchsorted(one_min_timestamps, end, side='right') - indices.append((start_idx, end_idx)) - return indices, sorted_1min - -def estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate=0.0003, impact_threshold_pct=0.10, impact_slope=0.0010): - """ - Estimate total slippage rate (decimal) using a hybrid model: - - Base slippage: fixed base_slippage_rate (e.g., 0.0003 = 3 bps) - - Extra slippage: if trade size (USD) > impact_threshold_pct * 1-min USD volume, - add impact_slope * (trade_size/threshold - 1) - - Args: - trade_usd_size (float): Trade notional in USD before slippage. - minute_row (pd.Series|None): 1-min bar with 'Volume' and a price ('Close' preferred, fallback 'Open'). - base_slippage_rate (float): Base slippage in decimal. - impact_threshold_pct (float): Threshold as fraction of 1-min volume (e.g., 0.10 = 10%). - impact_slope (float): Rate added per 1x over threshold (decimal). - - Returns: - float: total slippage rate (>= base_slippage_rate). - """ - if minute_row is None: - return float(base_slippage_rate) - try: - minute_base_vol = float(minute_row.get('Volume', 0.0) or 0.0) - minute_price = float(minute_row.get('Close', minute_row.get('Open', 0.0)) or 0.0) - minute_quote_vol = minute_base_vol * minute_price - except Exception: - minute_quote_vol = 0.0 - - if minute_quote_vol <= 0 or impact_threshold_pct <= 0: - return float(base_slippage_rate) - - threshold_quote = minute_quote_vol * impact_threshold_pct - if trade_usd_size <= threshold_quote: - return float(base_slippage_rate) - - over_ratio = (trade_usd_size / threshold_quote) - 1.0 - extra_slippage = max(0.0, impact_slope * over_ratio) - return float(base_slippage_rate + extra_slippage) - -def backtest(timeframe, df_aggregated, df_1min, stop_loss_pct, progress_step=1000, - base_slippage_rate=0.0003, impact_threshold_pct=0.10, impact_slope=0.0010): - """ - Backtest trading strategy based on meta supertrend logic (all three supertrends agree). - Uses signal transitions and open prices for entry/exit to match original implementation. - """ - start_time = time.time() - required_st_cols = ["supertrend_12_3.0", "supertrend_10_1.0", "supertrend_11_2.0"] - for col in required_st_cols: - if col not in df_aggregated.columns: - raise ValueError(f"Missing required Supertrend column: {col}") - - # Calculate trend directions for each supertrend (-1, 0, 1) - trends = [] - for col in required_st_cols: - # Convert supertrend values to trend direction based on close price position - trend = np.where(df_aggregated['Close'] > df_aggregated[col], 1, -1) - trends.append(trend) - - # Stack trends and calculate meta trend (all must agree) - trends_arr = np.stack(trends, axis=1) - meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), - trends_arr[:,0], 0) - - meta_trend_signal = meta_trend #incorrect: should be lagging as it introduces lookahead bias. - # Next step: modify OHLCV predictor to not use supertrend as a feature or anyother feature - # that introduces lookahead bias and predict the next close price. - # - # Old code, not that efficient: - # Add signal lagging to avoid lookahead bias - # meta_trend_signal = np.roll(meta_trend, 1) - # meta_trend_signal[0] = 0 # No signal for first bar - - # Precompute 1-min slice indices for each aggregated bar - slice_indices, sorted_1min = precompute_1min_slice_indices(df_aggregated, df_1min) - df_1min_sorted = df_1min.iloc[sorted_1min].reset_index(drop=True) - one_min_timestamps_sorted = df_1min_sorted['Timestamp'].values - - in_position = False - init_usd = 1000 - usd = init_usd - coin = 0 - nb_stop_loss = 0 - trade_log = [] - equity_curve = [] - trade_results = [] - entry_price = None - entry_time = None - total_slippage_usd = 0.0 - total_traded_usd = 0.0 - - total_steps = len(df_aggregated) - 1 - for i in range(1, len(df_aggregated)): - open_price = df_aggregated['Open'][i] # Use open price for entry/exit - close_price = df_aggregated['Close'][i] - timestamp = df_aggregated['Timestamp'][i] - - # Get previous and current meta trend signals - prev_mt = meta_trend_signal[i-1] if i > 0 else 0 - curr_mt = meta_trend_signal[i] - - # Track equity at each bar - equity = usd + coin * close_price - equity_curve.append((timestamp, equity)) - - # Check stop loss if in position - if in_position: - start_idx, end_idx = slice_indices[i-1] - df_1min_slice = df_1min_sorted.iloc[start_idx:end_idx] - stop_triggered = False - - if not df_1min_slice.empty: - stop_loss_threshold = entry_price * (1 - stop_loss_pct) - below_stop = df_1min_slice['Low'] < stop_loss_threshold - - if below_stop.any(): - first_idx = below_stop.idxmax() - stop_row = df_1min_slice.loc[first_idx] - stop_triggered = True - in_position = False - - # More realistic stop loss fill logic with slippage - if stop_row['Open'] < stop_loss_threshold: - base_exit_price = stop_row['Open'] - else: - base_exit_price = stop_loss_threshold - trade_usd_size = float(coin * base_exit_price) - slip_rate = estimate_slippage_rate(trade_usd_size, stop_row, base_slippage_rate, impact_threshold_pct, impact_slope) - exit_price = base_exit_price * (1.0 - slip_rate) - - exit_time = stop_row['Timestamp'] - gross_usd = coin * exit_price - fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False) - usd = gross_usd - fee - trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0 - total_slippage_usd += trade_usd_size * slip_rate - total_traded_usd += trade_usd_size - trade_results.append(trade_pnl) - trade_log.append({ - 'type': 'stop_loss', - 'time': exit_time, - 'base_price': base_exit_price, - 'effective_price': exit_price, - 'slippage_rate': slip_rate, - 'usd': usd, - 'coin': 0, - 'pnl': trade_pnl, - 'fee': fee - }) - coin = 0 - nb_stop_loss += 1 - entry_price = None - entry_time = None - - if stop_triggered: - continue - - # Entry condition: signal changes TO bullish (prev != 1 and curr == 1) - if not in_position and prev_mt != 1 and curr_mt == 1: - in_position = True - fee = calculate_okx_taker_maker_fee(usd, is_maker=False) - usd_after_fee = usd - fee - # Slippage on buy increases price - try: - ts64 = np.datetime64(timestamp) - idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left')) - minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None - except Exception: - minute_row = None - trade_usd_size = float(usd_after_fee) - slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope) - effective_entry_price = open_price * (1.0 + slip_rate) - coin = usd_after_fee / effective_entry_price - entry_price = effective_entry_price - entry_time = timestamp - usd = 0 - total_slippage_usd += trade_usd_size * slip_rate - total_traded_usd += trade_usd_size - trade_log.append({ - 'type': 'buy', - 'time': timestamp, - 'base_price': open_price, - 'effective_price': effective_entry_price, - 'slippage_rate': slip_rate, - 'usd': usd, - 'coin': coin, - 'fee': fee - }) - - # Exit condition: signal changes TO bearish (prev == 1 and curr == -1) - elif in_position and prev_mt == 1 and curr_mt == -1: - in_position = False - # Slippage on sell reduces price - try: - ts64 = np.datetime64(timestamp) - idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left')) - minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None - except Exception: - minute_row = None - base_exit_price = open_price - trade_usd_size = float(coin * base_exit_price) - slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope) - exit_price = base_exit_price * (1.0 - slip_rate) - exit_time = timestamp - gross_usd = coin * exit_price - fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False) - usd = gross_usd - fee - trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0 - total_slippage_usd += trade_usd_size * slip_rate - total_traded_usd += trade_usd_size - trade_results.append(trade_pnl) - trade_log.append({ - 'type': 'sell', - 'time': exit_time, - 'base_price': base_exit_price, - 'effective_price': exit_price, - 'slippage_rate': slip_rate, - 'usd': usd, - 'coin': 0, - 'pnl': trade_pnl, - 'fee': fee - }) - coin = 0 - entry_price = None - entry_time = None - - if i % progress_step == 0 or i == total_steps: - percent = (i / total_steps) * 100 - print(f"\rTimeframe: {timeframe},\tProgress: {percent:.1f}%\tCurrent equity: {equity:.2f}\033[K", end='', flush=True) - - # Force close any open position at the end - if in_position: - final_open_price = df_aggregated['Open'].iloc[-1] # Use open price for consistency - final_timestamp = df_aggregated['Timestamp'].iloc[-1] - try: - ts64 = np.datetime64(final_timestamp) - idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left')) - minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None - except Exception: - minute_row = None - base_exit_price = final_open_price - trade_usd_size = float(coin * base_exit_price) - slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope) - final_effective_price = base_exit_price * (1.0 - slip_rate) - gross_usd = coin * final_effective_price - fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False) - usd = gross_usd - fee - trade_pnl = (final_effective_price - entry_price) / entry_price if entry_price else 0 - total_slippage_usd += trade_usd_size * slip_rate - total_traded_usd += trade_usd_size - trade_results.append(trade_pnl) - trade_log.append({ - 'type': 'forced_close', - 'time': final_timestamp, - 'base_price': base_exit_price, - 'effective_price': final_effective_price, - 'slippage_rate': slip_rate, - 'usd': usd, - 'coin': 0, - 'pnl': trade_pnl, - 'fee': fee - }) - coin = 0 - in_position = False - entry_price = None - - print() - print(f"Timeframe: {timeframe},\tTotal profit: {usd - init_usd},\tNumber of stop losses: {nb_stop_loss}") - - # --- Performance Metrics --- - equity_arr = np.array([e[1] for e in equity_curve]) - # Handle edge cases for empty or invalid equity data - if len(equity_arr) == 0: - print("Warning: No equity data available") - return None - returns = np.diff(equity_arr) / equity_arr[:-1] - # Filter out infinite and NaN returns - returns = returns[np.isfinite(returns)] - total_return = (equity_arr[-1] - equity_arr[0]) / equity_arr[0] if equity_arr[0] != 0 else 0 - running_max = np.maximum.accumulate(equity_arr) - if equity_arr[-1] <= 0.01: - max_drawdown = -1.0 - else: - drawdowns = (equity_arr - running_max) / running_max - max_drawdown = drawdowns.min() if len(drawdowns) > 0 and np.isfinite(drawdowns).any() else 0 - if len(returns) > 1 and np.std(returns) > 1e-9: - sharpe = np.mean(returns) / np.std(returns) * math.sqrt(252) - else: - sharpe = 0 - wins = [1 for r in trade_results if r > 0] - win_rate = len(wins) / len(trade_results) if trade_results else 0 - num_trades = len(trade_results) - - print(f"Performance Metrics:") - print(f" Total Return: {total_return*100:.2f}%") - print(f" Max Drawdown: {max_drawdown*100:.2f}%") - print(f" Sharpe Ratio: {sharpe:.2f}") - print(f" Win Rate: {win_rate*100:.2f}%") - print(f" Number of Trades: {num_trades}") - print(f" Final Equity: ${equity_arr[-1]:.2f}") - print(f" Initial Equity: ${equity_arr[0]:.2f}") - - # --- Save Trade Log --- - log_dir = "backtest_logs" - os.makedirs(log_dir, exist_ok=True) - # Format stop_loss_pct for filename (e.g., 0.05 -> 0p05) - stop_loss_str = f"{stop_loss_pct:.2f}".replace('.', 'p') - log_path = os.path.join(log_dir, f"trade_log_{timeframe}_sl{stop_loss_str}.csv") - if trade_log: - all_keys = set() - for entry in trade_log: - all_keys.update(entry.keys()) - all_keys = list(all_keys) - - trade_log_filled = [] - for entry in trade_log: - filled_entry = {k: entry.get(k, None) for k in all_keys} - trade_log_filled.append(filled_entry) - - # Calculate total fees for this backtest - total_fees = sum(entry.get('fee', 0) for entry in trade_log) - - # Write summary header row, then trade log header and rows - with open(log_path, 'w', newline='') as f: - writer = csv.writer(f) - summary_header = [ - 'elapsed_time_sec', 'total_return', 'max_drawdown', 'sharpe_ratio', - 'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees', - 'total_slippage_usd', 'avg_slippage_bps' - ] - summary_values = [ - f"{time.time() - start_time:.2f}", - f"{total_return*100:.2f}%", - f"{max_drawdown*100:.2f}%", - f"{sharpe:.2f}", - f"{win_rate*100:.2f}%", - str(num_trades), - f"${equity_arr[-1]:.2f}", - f"${equity_arr[0]:.2f}", - str(nb_stop_loss), - f"${total_fees:.4f}", - f"${total_slippage_usd:.4f}", - f"{(total_slippage_usd / total_traded_usd * 10000.0) if total_traded_usd > 0 else 0:.2f}" - ] - writer.writerow(summary_header) - writer.writerow(summary_values) - writer.writerow([]) # Blank row for separation - dict_writer = csv.DictWriter(f, fieldnames=all_keys) - dict_writer.writeheader() - dict_writer.writerows(trade_log_filled) - - print(f"Trade log saved to {log_path}") - else: - print("No trades to log.") - - # Return summary metrics (excluding elapsed time) - return { - 'timeframe': timeframe, - 'stop_loss': stop_loss_pct, - 'total_return': total_return, - 'max_drawdown': max_drawdown, - 'sharpe_ratio': sharpe, - 'win_rate': win_rate, - 'num_trades': num_trades, - 'final_equity': equity_arr[-1], - 'initial_equity': equity_arr[0], - 'num_stop_losses': nb_stop_loss, - 'total_fees': total_fees if trade_log else 0, - 'total_slippage_usd': total_slippage_usd, - 'avg_slippage_bps': (total_slippage_usd / total_traded_usd * 10000.0) if total_traded_usd > 0 else 0.0 - } - -if __name__ == "__main__": - timeframes = ["5min", "15min", "30min", "1h", "4h", "1d", "2d"] - # timeframes = ["5min", "15min", "1h", "4h", "1d"] - # timeframes = ["30min"] - stoplosses = [0.1, 0.2, 0.3, 0.4, 0.5] - # Slippage configuration (OKX Spot): base in bps, plus volume-impact model - slippage_base_bps = 10 # 10 bps base slippage (realistic, conservative) - impact_threshold_pct = 0.10 # e.g., start impact beyond 10% of 1-min volume - impact_slope = 0.0010 # incremental slippage per 1x over threshold - - # df_1min = load_data('2021-11-01', '2024-10-16', '../data/btcusd_1-min_data.csv') - df_1min = load_data('2021-11-01', '2025-08-19', '../data/btcusd_okx_1-min_data.csv') - - # Prepare summary CSV - summary_csv_path = "backtest_summary.csv" - summary_header = [ - 'timeframe', 'stop_loss', 'total_return', 'max_drawdown', 'sharpe_ratio', - 'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees', - 'total_slippage_usd', 'avg_slippage_bps' - ] - with open(summary_csv_path, 'w', newline='') as summary_file: - writer = csv.DictWriter(summary_file, fieldnames=summary_header) - writer.writeheader() - for timeframe in timeframes: - df_aggregated = aggregate_data(df_1min, timeframe) - df_aggregated = add_supertrend_indicators(df_aggregated) - for stop_loss_pct in stoplosses: - summary = backtest( - timeframe, - df_aggregated, - df_1min, - stop_loss_pct=stop_loss_pct, - base_slippage_rate=slippage_base_bps / 10000.0, - impact_threshold_pct=impact_threshold_pct, - impact_slope=impact_slope - ) - if summary is not None: - # Format values for CSV (e.g., floats as rounded strings) - summary_row = { - 'timeframe': summary['timeframe'], - 'stop_loss': summary['stop_loss'], - 'total_return': f"{summary['total_return']*100:.2f}%", - 'max_drawdown': f"{summary['max_drawdown']*100:.2f}%", - 'sharpe_ratio': f"{summary['sharpe_ratio']:.2f}", - 'win_rate': f"{summary['win_rate']*100:.2f}%", - 'num_trades': summary['num_trades'], - 'final_equity': f"${summary['final_equity']:.2f}", - 'initial_equity': f"${summary['initial_equity']:.2f}", - 'num_stop_losses': summary['num_stop_losses'], - 'total_fees': f"${summary['total_fees']:.4f}", - 'total_slippage_usd': f"${summary['total_slippage_usd']:.4f}", - 'avg_slippage_bps': f"{summary['avg_slippage_bps']:.2f}" - } - writer.writerow(summary_row) diff --git a/market_costs.py b/market_costs.py new file mode 100644 index 0000000..697da71 --- /dev/null +++ b/market_costs.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +TAKER_FEE_BPS_DEFAULT = 10.0 # 0.10% + + +def okx_fee(fee_bps: float, notional_usd: float) -> float: + return notional_usd * (fee_bps / 1e4) + + +def estimate_slippage_rate(slippage_bps: float, notional_usd: float) -> float: + return notional_usd * (slippage_bps / 1e4) \ No newline at end of file diff --git a/metrics.py b/metrics.py new file mode 100644 index 0000000..82d80be --- /dev/null +++ b/metrics.py @@ -0,0 +1,54 @@ +from __future__ import annotations +from dataclasses import dataclass +import numpy as np +import pandas as pd + + +@dataclass +class Perf: + total_return: float + max_drawdown: float + sharpe_ratio: float + win_rate: float + num_trades: int + final_equity: float + initial_equity: float + num_stop_losses: int + total_fees: float + total_slippage_usd: float + avg_slippage_bps: float + + +def compute_metrics(equity_curve: pd.Series, trades: list[dict]) -> Perf: + ret = equity_curve.pct_change().fillna(0.0) + total_return = equity_curve.iat[-1] / equity_curve.iat[0] - 1.0 + cummax = equity_curve.cummax() + dd = (equity_curve / cummax - 1.0).min() + max_drawdown = dd + + if ret.std(ddof=0) > 0: + sharpe = (ret.mean() / ret.std(ddof=0)) * np.sqrt(252 * 24 * 60) # minute bars -> annualized + else: + sharpe = 0.0 + + closes = [t for t in trades if t.get("side") == "SELL"] + wins = [t for t in closes if t.get("pnl", 0.0) > 0] + win_rate = (len(wins) / len(closes)) if closes else 0.0 + + fees = sum(t.get("fee", 0.0) for t in trades) + slip = sum(t.get("slippage", 0.0) for t in trades) + slippage_bps = [t.get("slippage_bps", 0.0) for t in trades if "slippage_bps" in t] + + return Perf( + total_return=total_return, + max_drawdown=max_drawdown, + sharpe_ratio=sharpe, + win_rate=win_rate, + num_trades=len(closes), + final_equity=float(equity_curve.iat[-1]), + initial_equity=float(equity_curve.iat[0]), + num_stop_losses=sum(1 for t in closes if t.get("reason") == "stop"), + total_fees=fees, + total_slippage_usd=slip, + avg_slippage_bps=float(np.mean(slippage_bps)) if slippage_bps else 0.0, + ) \ No newline at end of file diff --git a/trade.py b/trade.py new file mode 100644 index 0000000..78b97c8 --- /dev/null +++ b/trade.py @@ -0,0 +1,52 @@ +from __future__ import annotations +from dataclasses import dataclass +import pandas as pd +from market_costs import okx_fee, estimate_slippage_rate +from intrabar import entry_slippage_row + + +@dataclass +class TradeState: + cash: float = 1000.0 + qty: float = 0.0 + entry_px: float | None = None + max_px: float | None = None + stop_loss_frac: float = 0.02 + fee_bps: float = 10.0 + slippage_bps: float = 2.0 + + +def enter_long(state: TradeState, price: float) -> dict: + if state.qty > 0: + return {} + px = entry_slippage_row(price, 0.0, state.slippage_bps) + qty = state.cash / px + fee = okx_fee(state.fee_bps, state.cash) + state.qty = max(qty - fee / px, 0.0) + state.cash = 0.0 + state.entry_px = px + state.max_px = px + return {"side": "BUY", "price": px, "qty": state.qty, "fee": fee} + + +def maybe_trailing_stop(state: TradeState, price: float) -> float: + if state.qty <= 0: + return float("inf") + state.max_px = max(state.max_px or price, price) + trail_px = state.max_px * (1.0 - state.stop_loss_frac) + return trail_px + + +def exit_long(state: TradeState, price: float) -> dict: + if state.qty <= 0: + return {} + notional = state.qty * price + slip = estimate_slippage_rate(state.slippage_bps, notional) + fee = okx_fee(state.fee_bps, notional) + cash_back = notional - slip - fee + event = {"side": "SELL", "price": price, "qty": state.qty, "fee": fee, "slippage": slip} + state.cash = cash_back + state.qty = 0.0 + state.entry_px = None + state.max_px = None + return event \ No newline at end of file