diff --git a/.gitignore b/.gitignore index 0dbf2f2..736352e 100644 --- a/.gitignore +++ b/.gitignore @@ -168,3 +168,5 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +./logs/ +*.csv \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index bb1ca3d..237a81e 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,16 +1,22 @@ { - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ - - { - "name": "Python Debugger: main.py", - "type": "debugpy", - "request": "launch", - "program": "main.py", - "console": "integratedTerminal" - } + { + "name": "Python Debugger: cli.py", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/cli.py", + "console": "integratedTerminal", + "cwd": "${workspaceFolder}", + "args": [ + "2025-03-01", "2025-09-22", + "--timeframes-minutes", "60", "240", "480", + "--stop-loss", "0.02", "0.05", + "--exit-on-bearish-flip", + "--csv", "../data/btcusd_1-min_data.csv" + ], + "env": { "PYTHONPATH": "${workspaceFolder}" } + } ] -} \ No newline at end of file + } + \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..b9b60b2 --- /dev/null +++ b/__init__.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +__all__ = [ + "config", + "data", + "indicators", + "market_costs", + "intrabar", + "trade", + "metrics", + "logging_utils", + "backtest", +] \ No newline at end of file diff --git a/backtest.py b/backtest.py new file mode 100644 index 0000000..d4300cd --- /dev/null +++ b/backtest.py @@ -0,0 +1,70 @@ +from __future__ import annotations +import pandas as pd +from pathlib import Path +from trade import TradeState, enter_long, exit_long, maybe_trailing_stop +from indicators import add_supertrends, compute_meta_trend +from metrics import compute_metrics +from logging_utils import write_trade_log + +DEFAULT_ST_SETTINGS = [(12, 3.0), (10, 1.0), (11, 2.0)] + +def backtest( + df: pd.DataFrame, + df_1min: pd.DataFrame, + timeframe_minutes: int, + stop_loss: float, + exit_on_bearish_flip: bool, + fee_bps: float, + slippage_bps: float, + log_path: Path | None = None, +): + df = add_supertrends(df, DEFAULT_ST_SETTINGS) + df["meta_bull"] = compute_meta_trend(df, DEFAULT_ST_SETTINGS) + + state = TradeState(stop_loss_frac=stop_loss, fee_bps=fee_bps, slippage_bps=slippage_bps) + equity, trades = [], [] + + for i, row in df.iterrows(): + price = float(row["Close"]) + ts = pd.Timestamp(row["Timestamp"]) + + if state.qty <= 0 and row["meta_bull"] == 1: + evt = enter_long(state, price) + if evt: + evt.update({"t": ts.isoformat(), "reason": "bull_flip"}) + trades.append(evt) + + start = ts + end = df["Timestamp"].iat[i + 1] if i + 1 < len(df) else ts + pd.Timedelta(minutes=timeframe_minutes) + + if state.qty > 0: + win = df_1min[(df_1min["Timestamp"] >= start) & (df_1min["Timestamp"] < end)] + for _, m in win.iterrows(): + hi = float(m["High"]) + lo = float(m["Low"]) + state.max_px = max(state.max_px or hi, hi) + trail = state.max_px * (1.0 - state.stop_loss_frac) + if lo <= trail: + evt = exit_long(state, trail) + if evt: + prev = trades[-1] + pnl = (evt["price"] - (prev.get("price") or evt["price"])) * (prev.get("qty") or 0.0) + evt.update({"t": pd.Timestamp(m["Timestamp"]).isoformat(), "reason": "stop", "pnl": pnl}) + trades.append(evt) + break + + if state.qty > 0 and exit_on_bearish_flip and row["meta_bull"] == 0: + evt = exit_long(state, price) + if evt: + prev = trades[-1] + pnl = (evt["price"] - (prev.get("price") or evt["price"])) * (prev.get("qty") or 0.0) + evt.update({"t": ts.isoformat(), "reason": "bearish_flip", "pnl": pnl}) + trades.append(evt) + + equity.append(state.cash + state.qty * price) + + equity_curve = pd.Series(equity, index=df["Timestamp"]) + if log_path: + write_trade_log(trades, log_path) + perf = compute_metrics(equity_curve, trades) + return perf, equity_curve, trades diff --git a/cli.py b/cli.py new file mode 100644 index 0000000..58b1493 --- /dev/null +++ b/cli.py @@ -0,0 +1,80 @@ +from __future__ import annotations +import argparse +from pathlib import Path +import pandas as pd + +from config import CLIConfig +from data import load_data +from backtest import backtest + +def parse_args() -> CLIConfig: + p = argparse.ArgumentParser(prog="bt", description="Simple supertrend backtester") + p.add_argument("start") + p.add_argument("end") + p.add_argument("--timeframe-minutes", type=int, default=15) # single TF + p.add_argument("--timeframes-minutes", nargs="+", type=int) # multi TF: e.g. 5 15 60 240 + p.add_argument("--stop-loss", dest="stop_losses", type=float, nargs="+", default=[0.02, 0.05]) + p.add_argument("--exit-on-bearish-flip", action="store_true") + p.add_argument("--csv", dest="csv_path", type=Path, required=True) + p.add_argument("--out-csv", type=Path, default=Path("summary.csv")) + p.add_argument("--log-dir", type=Path, default=Path("./logs")) + p.add_argument("--fee-bps", type=float, default=10.0) + p.add_argument("--slippage-bps", type=float, default=2.0) + a = p.parse_args() + + return CLIConfig( + start=a.start, + end=a.end, + timeframe_minutes=a.timeframe_minutes, + timeframes_minutes=a.timeframes_minutes, + stop_losses=a.stop_losses, + exit_on_bearish_flip=a.exit_on_bearish_flip, + csv_path=a.csv_path, + out_csv=a.out_csv, + log_dir=a.log_dir, + fee_bps=a.fee_bps, + slippage_bps=a.slippage_bps, + ) + +def main(): + cfg = parse_args() + frames = cfg.timeframes_minutes or [cfg.timeframe_minutes] + + rows: list[dict] = [] + for tfm in frames: + df_1min, df = load_data(cfg.start, cfg.end, tfm, cfg.csv_path) + for sl in cfg.stop_losses: + log_path = cfg.log_dir / f"{tfm}m_sl{sl:.2%}.csv" + perf, equity, _ = backtest( + df=df, + df_1min=df_1min, + timeframe_minutes=tfm, + stop_loss=sl, + exit_on_bearish_flip=cfg.exit_on_bearish_flip, + fee_bps=cfg.fee_bps, + slippage_bps=cfg.slippage_bps, + log_path=log_path, + ) + rows.append({ + "timeframe": f"{tfm}min", + "stop_loss": sl, + "exit_on_bearish_flip": cfg.exit_on_bearish_flip, + "total_return": f"{perf.total_return:.2%}", + "max_drawdown": f"{perf.max_drawdown:.2%}", + "sharpe_ratio": f"{perf.sharpe_ratio:.2f}", + "win_rate": f"{perf.win_rate:.2%}", + "num_trades": perf.num_trades, + "final_equity": f"${perf.final_equity:.2f}", + "initial_equity": f"${perf.initial_equity:.2f}", + "num_stop_losses": perf.num_stop_losses, + "total_fees": perf.total_fees, + "total_slippage_usd": perf.total_slippage_usd, + "avg_slippage_bps": perf.avg_slippage_bps, + }) + + out = pd.DataFrame(rows) + out.to_csv(cfg.out_csv, index=False) + print(out.to_string(index=False)) + +if __name__ == "__main__": + main() diff --git a/config.py b/config.py new file mode 100644 index 0000000..70c9f3d --- /dev/null +++ b/config.py @@ -0,0 +1,18 @@ +from __future__ import annotations +from dataclasses import dataclass +from pathlib import Path +from typing import Sequence + +@dataclass +class CLIConfig: + start: str + end: str + timeframe_minutes: int + timeframes_minutes: list[int] | None + stop_losses: Sequence[float] + exit_on_bearish_flip: bool + csv_path: Path | None + out_csv: Path + log_dir: Path + fee_bps: float + slippage_bps: float diff --git a/data.py b/data.py new file mode 100644 index 0000000..1196648 --- /dev/null +++ b/data.py @@ -0,0 +1,24 @@ +from __future__ import annotations +import pandas as pd +from pathlib import Path + +def load_data(start: str, end: str, timeframe_minutes: int, csv_path: Path) -> tuple[pd.DataFrame, pd.DataFrame]: + df_1min = pd.read_csv(csv_path) + df_1min["Timestamp"] = pd.to_datetime(df_1min["Timestamp"], unit="s", utc=True) + df_1min = df_1min[(df_1min["Timestamp"] >= pd.Timestamp(start, tz="UTC")) & + (df_1min["Timestamp"] <= pd.Timestamp(end, tz="UTC"))] \ + .sort_values("Timestamp").reset_index(drop=True) + + if timeframe_minutes != 1: + g = df_1min.set_index("Timestamp").resample(f"{timeframe_minutes}min") + df = pd.DataFrame({ + "Open": g["Open"].first(), + "High": g["High"].max(), + "Low": g["Low"].min(), + "Close": g["Close"].last(), + "Volume": g["Volume"].sum(), + }).dropna().reset_index() + else: + df = df_1min.copy() + + return df_1min, df diff --git a/indicators/__init__.py b/indicators/__init__.py new file mode 100644 index 0000000..016e1fe --- /dev/null +++ b/indicators/__init__.py @@ -0,0 +1,3 @@ +from .supertrend import add_supertrends, compute_meta_trend + +__all__ = ["add_supertrends", "compute_meta_trend"] \ No newline at end of file diff --git a/indicators/supertrend.py b/indicators/supertrend.py new file mode 100644 index 0000000..4a23d11 --- /dev/null +++ b/indicators/supertrend.py @@ -0,0 +1,58 @@ +from __future__ import annotations +import pandas as pd +import numpy as np + + +def _atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int) -> pd.Series: + hl = (high - low).abs() + hc = (high - close.shift()).abs() + lc = (low - close.shift()).abs() + tr = pd.concat([hl, hc, lc], axis=1).max(axis=1) + return tr.rolling(period, min_periods=period).mean() + + +def supertrend_series(df: pd.DataFrame, length: int, multiplier: float) -> pd.Series: + atr = _atr(df["High"], df["Low"], df["Close"], length) + hl2 = (df["High"] + df["Low"]) / 2 + upper = hl2 + multiplier * atr + lower = hl2 - multiplier * atr + + trend = pd.Series(index=df.index, dtype=float) + dir_up = True + prev_upper = np.nan + prev_lower = np.nan + + for i in range(len(df)): + if i == 0 or pd.isna(atr.iat[i]): + trend.iat[i] = np.nan + prev_upper = upper.iat[i] + prev_lower = lower.iat[i] + continue + + cu = min(upper.iat[i], prev_upper) if dir_up else upper.iat[i] + cl = max(lower.iat[i], prev_lower) if not dir_up else lower.iat[i] + + if df["Close"].iat[i] > cu: + dir_up = True + elif df["Close"].iat[i] < cl: + dir_up = False + + prev_upper = cu if dir_up else upper.iat[i] + prev_lower = lower.iat[i] if dir_up else cl + trend.iat[i] = cl if dir_up else cu + + return trend + + +def add_supertrends(df: pd.DataFrame, settings: list[tuple[int, float]]) -> pd.DataFrame: + out = df.copy() + for length, mult in settings: + col = f"supertrend_{length}_{mult}" + out[col] = supertrend_series(out, length, mult) + out[f"bull_{length}_{mult}"] = (out["Close"] >= out[col]).astype(int) + return out + + +def compute_meta_trend(df: pd.DataFrame, settings: list[tuple[int, float]]) -> pd.Series: + bull_cols = [f"bull_{l}_{m}" for l, m in settings] + return (df[bull_cols].sum(axis=1) == len(bull_cols)).astype(int) \ No newline at end of file diff --git a/intrabar.py b/intrabar.py new file mode 100644 index 0000000..f107456 --- /dev/null +++ b/intrabar.py @@ -0,0 +1,10 @@ +from __future__ import annotations +import pandas as pd + + +def precompute_slices(df: pd.DataFrame) -> pd.DataFrame: + return df # hook for future use + + +def entry_slippage_row(price: float, qty: float, slippage_bps: float) -> float: + return price + price * (slippage_bps / 1e4) \ No newline at end of file diff --git a/logging_utils.py b/logging_utils.py new file mode 100644 index 0000000..284bb7a --- /dev/null +++ b/logging_utils.py @@ -0,0 +1,11 @@ +from __future__ import annotations +from pathlib import Path +import pandas as pd + + +def write_trade_log(trades: list[dict], path: Path) -> None: + if not trades: + return + df = pd.DataFrame(trades) + path.parent.mkdir(parents=True, exist_ok=True) + df.to_csv(path, index=False) \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 8f0fe91..0000000 --- a/main.py +++ /dev/null @@ -1,556 +0,0 @@ -import pandas as pd -import numpy as np -from ta.volatility import AverageTrueRange -import time -import csv -import math -import os - - -def load_data(since, until, csv_file): - df = pd.read_csv(csv_file) - df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s') - df = df[(df['Timestamp'] >= pd.Timestamp(since)) & (df['Timestamp'] <= pd.Timestamp(until))] - return df - -def aggregate_data(df, timeframe): - df = df.set_index('Timestamp') - df = df.resample(timeframe).agg({ - 'Open': 'first', - 'High': 'max', - 'Low': 'min', - 'Close': 'last', - 'Volume': 'sum' - }) - df = df.reset_index() - return df - -def calculate_okx_taker_maker_fee(amount, is_maker=False): - fee_rate = 0.0008 if is_maker else 0.0010 - return amount * fee_rate - -def calculate_supertrend(df, period, multiplier): - """ - Calculate the Supertrend indicator for a given period and multiplier. - Optionally displays progress during calculation. - Args: - df (pd.DataFrame): DataFrame with 'High', 'Low', 'Close' columns. - period (int): ATR period. - multiplier (float): Multiplier for ATR. - progress_step (int): Step interval for progress display. - show_progress (bool): Whether to print progress updates. - Returns: - pd.Series: Supertrend values. - """ - # Ensure we have enough data for ATR calculation - if len(df) < period + 1: - print(f"Warning: Not enough data for ATR period {period}. Need at least {period + 1} rows, got {len(df)}") - return pd.Series([np.nan] * len(df), index=df.index) - - high = df['High'].values - low = df['Low'].values - close = df['Close'].values - - # Calculate True Range first - tr = np.zeros_like(close) - for i in range(1, len(close)): - tr[i] = max( - high[i] - low[i], # Current high - current low - abs(high[i] - close[i-1]), # Current high - previous close - abs(low[i] - close[i-1]) # Current low - previous close - ) - - # Calculate ATR using simple moving average - atr = np.zeros_like(close) - atr[period] = np.mean(tr[1:period+1]) # First ATR value - for i in range(period+1, len(close)): - atr[i] = (atr[i-1] * (period-1) + tr[i]) / period # Exponential-like smoothing - - # Fill initial values with the first valid ATR - atr[:period] = atr[period] if atr[period] > 0 else 0.001 - - hl2 = (high + low) / 2 - upperband = hl2 + (multiplier * atr) - lowerband = hl2 - (multiplier * atr) - - supertrend = np.full_like(close, np.nan) - in_uptrend = True - - supertrend[0] = upperband[0] - total_steps = len(close) - 1 - - for i in range(1, len(close)): - if close[i] > upperband[i-1]: - in_uptrend = True - elif close[i] < lowerband[i-1]: - in_uptrend = False - # else, keep previous trend - - if in_uptrend: - supertrend[i] = max(lowerband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else lowerband[i]) - else: - supertrend[i] = min(upperband[i], supertrend[i-1] if not np.isnan(supertrend[i-1]) else upperband[i]) - - return pd.Series(supertrend, index=df.index) - -def add_supertrend_indicators(df): - """ - Adds Supertrend indicators to the dataframe for the specified (period, multiplier) pairs. - Args: - df (pd.DataFrame): DataFrame with columns 'High', 'Low', 'Close'. - Returns: - pd.DataFrame: DataFrame with new Supertrend columns added. - """ - supertrend_params = [(12, 3.0), (10, 1.0), (11, 2.0)] - for period, multiplier in supertrend_params: - try: - st_col = f'supertrend_{period}_{multiplier}' - df[st_col] = calculate_supertrend(df, period, multiplier) - except Exception as e: - print(f"Error calculating Supertrend {period}, {multiplier}: {e}") - df[f'supertrend_{period}_{multiplier}'] = np.nan - return df - -def precompute_1min_slice_indices(df_aggregated, df_1min): - """ - Precompute start and end indices for each aggregated bar using searchsorted. - Returns a list of (start_idx, end_idx) tuples for fast iloc slicing. - """ - timestamps = df_aggregated['Timestamp'].values - one_min_timestamps = df_1min['Timestamp'].values - # Ensure both are sorted - sorted_1min = np.argsort(one_min_timestamps) - one_min_timestamps = one_min_timestamps[sorted_1min] - indices = [] - prev_idx = 0 - for i in range(1, len(timestamps)): - start, end = timestamps[i-1], timestamps[i] - # Find indices using searchsorted (right for start, right for end) - start_idx = np.searchsorted(one_min_timestamps, start, side='right') - end_idx = np.searchsorted(one_min_timestamps, end, side='right') - indices.append((start_idx, end_idx)) - return indices, sorted_1min - -def estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate=0.0003, impact_threshold_pct=0.10, impact_slope=0.0010): - """ - Estimate total slippage rate (decimal) using a hybrid model: - - Base slippage: fixed base_slippage_rate (e.g., 0.0003 = 3 bps) - - Extra slippage: if trade size (USD) > impact_threshold_pct * 1-min USD volume, - add impact_slope * (trade_size/threshold - 1) - - Args: - trade_usd_size (float): Trade notional in USD before slippage. - minute_row (pd.Series|None): 1-min bar with 'Volume' and a price ('Close' preferred, fallback 'Open'). - base_slippage_rate (float): Base slippage in decimal. - impact_threshold_pct (float): Threshold as fraction of 1-min volume (e.g., 0.10 = 10%). - impact_slope (float): Rate added per 1x over threshold (decimal). - - Returns: - float: total slippage rate (>= base_slippage_rate). - """ - if minute_row is None: - return float(base_slippage_rate) - try: - minute_base_vol = float(minute_row.get('Volume', 0.0) or 0.0) - minute_price = float(minute_row.get('Close', minute_row.get('Open', 0.0)) or 0.0) - minute_quote_vol = minute_base_vol * minute_price - except Exception: - minute_quote_vol = 0.0 - - if minute_quote_vol <= 0 or impact_threshold_pct <= 0: - return float(base_slippage_rate) - - threshold_quote = minute_quote_vol * impact_threshold_pct - if trade_usd_size <= threshold_quote: - return float(base_slippage_rate) - - over_ratio = (trade_usd_size / threshold_quote) - 1.0 - extra_slippage = max(0.0, impact_slope * over_ratio) - return float(base_slippage_rate + extra_slippage) - -def backtest(timeframe, df_aggregated, df_1min, stop_loss_pct, progress_step=1000, - base_slippage_rate=0.0003, impact_threshold_pct=0.10, impact_slope=0.0010): - """ - Backtest trading strategy based on meta supertrend logic (all three supertrends agree). - Uses signal transitions and open prices for entry/exit to match original implementation. - """ - start_time = time.time() - required_st_cols = ["supertrend_12_3.0", "supertrend_10_1.0", "supertrend_11_2.0"] - for col in required_st_cols: - if col not in df_aggregated.columns: - raise ValueError(f"Missing required Supertrend column: {col}") - - # Calculate trend directions for each supertrend (-1, 0, 1) - trends = [] - for col in required_st_cols: - # Convert supertrend values to trend direction based on close price position - trend = np.where(df_aggregated['Close'] > df_aggregated[col], 1, -1) - trends.append(trend) - - # Stack trends and calculate meta trend (all must agree) - trends_arr = np.stack(trends, axis=1) - meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]), - trends_arr[:,0], 0) - - meta_trend_signal = meta_trend #incorrect: should be lagging as it introduces lookahead bias. - # Next step: modify OHLCV predictor to not use supertrend as a feature or anyother feature - # that introduces lookahead bias and predict the next close price. - # - # Old code, not that efficient: - # Add signal lagging to avoid lookahead bias - # meta_trend_signal = np.roll(meta_trend, 1) - # meta_trend_signal[0] = 0 # No signal for first bar - - # Precompute 1-min slice indices for each aggregated bar - slice_indices, sorted_1min = precompute_1min_slice_indices(df_aggregated, df_1min) - df_1min_sorted = df_1min.iloc[sorted_1min].reset_index(drop=True) - one_min_timestamps_sorted = df_1min_sorted['Timestamp'].values - - in_position = False - init_usd = 1000 - usd = init_usd - coin = 0 - nb_stop_loss = 0 - trade_log = [] - equity_curve = [] - trade_results = [] - entry_price = None - entry_time = None - total_slippage_usd = 0.0 - total_traded_usd = 0.0 - - total_steps = len(df_aggregated) - 1 - for i in range(1, len(df_aggregated)): - open_price = df_aggregated['Open'][i] # Use open price for entry/exit - close_price = df_aggregated['Close'][i] - timestamp = df_aggregated['Timestamp'][i] - - # Get previous and current meta trend signals - prev_mt = meta_trend_signal[i-1] if i > 0 else 0 - curr_mt = meta_trend_signal[i] - - # Track equity at each bar - equity = usd + coin * close_price - equity_curve.append((timestamp, equity)) - - # Check stop loss if in position - if in_position: - start_idx, end_idx = slice_indices[i-1] - df_1min_slice = df_1min_sorted.iloc[start_idx:end_idx] - stop_triggered = False - - if not df_1min_slice.empty: - stop_loss_threshold = entry_price * (1 - stop_loss_pct) - below_stop = df_1min_slice['Low'] < stop_loss_threshold - - if below_stop.any(): - first_idx = below_stop.idxmax() - stop_row = df_1min_slice.loc[first_idx] - stop_triggered = True - in_position = False - - # More realistic stop loss fill logic with slippage - if stop_row['Open'] < stop_loss_threshold: - base_exit_price = stop_row['Open'] - else: - base_exit_price = stop_loss_threshold - trade_usd_size = float(coin * base_exit_price) - slip_rate = estimate_slippage_rate(trade_usd_size, stop_row, base_slippage_rate, impact_threshold_pct, impact_slope) - exit_price = base_exit_price * (1.0 - slip_rate) - - exit_time = stop_row['Timestamp'] - gross_usd = coin * exit_price - fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False) - usd = gross_usd - fee - trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0 - total_slippage_usd += trade_usd_size * slip_rate - total_traded_usd += trade_usd_size - trade_results.append(trade_pnl) - trade_log.append({ - 'type': 'stop_loss', - 'time': exit_time, - 'base_price': base_exit_price, - 'effective_price': exit_price, - 'slippage_rate': slip_rate, - 'usd': usd, - 'coin': 0, - 'pnl': trade_pnl, - 'fee': fee - }) - coin = 0 - nb_stop_loss += 1 - entry_price = None - entry_time = None - - if stop_triggered: - continue - - # Entry condition: signal changes TO bullish (prev != 1 and curr == 1) - if not in_position and prev_mt != 1 and curr_mt == 1: - in_position = True - fee = calculate_okx_taker_maker_fee(usd, is_maker=False) - usd_after_fee = usd - fee - # Slippage on buy increases price - try: - ts64 = np.datetime64(timestamp) - idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left')) - minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None - except Exception: - minute_row = None - trade_usd_size = float(usd_after_fee) - slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope) - effective_entry_price = open_price * (1.0 + slip_rate) - coin = usd_after_fee / effective_entry_price - entry_price = effective_entry_price - entry_time = timestamp - usd = 0 - total_slippage_usd += trade_usd_size * slip_rate - total_traded_usd += trade_usd_size - trade_log.append({ - 'type': 'buy', - 'time': timestamp, - 'base_price': open_price, - 'effective_price': effective_entry_price, - 'slippage_rate': slip_rate, - 'usd': usd, - 'coin': coin, - 'fee': fee - }) - - # Exit condition: signal changes TO bearish (prev == 1 and curr == -1) - elif in_position and prev_mt == 1 and curr_mt == -1: - in_position = False - # Slippage on sell reduces price - try: - ts64 = np.datetime64(timestamp) - idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left')) - minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None - except Exception: - minute_row = None - base_exit_price = open_price - trade_usd_size = float(coin * base_exit_price) - slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope) - exit_price = base_exit_price * (1.0 - slip_rate) - exit_time = timestamp - gross_usd = coin * exit_price - fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False) - usd = gross_usd - fee - trade_pnl = (exit_price - entry_price) / entry_price if entry_price else 0 - total_slippage_usd += trade_usd_size * slip_rate - total_traded_usd += trade_usd_size - trade_results.append(trade_pnl) - trade_log.append({ - 'type': 'sell', - 'time': exit_time, - 'base_price': base_exit_price, - 'effective_price': exit_price, - 'slippage_rate': slip_rate, - 'usd': usd, - 'coin': 0, - 'pnl': trade_pnl, - 'fee': fee - }) - coin = 0 - entry_price = None - entry_time = None - - if i % progress_step == 0 or i == total_steps: - percent = (i / total_steps) * 100 - print(f"\rTimeframe: {timeframe},\tProgress: {percent:.1f}%\tCurrent equity: {equity:.2f}\033[K", end='', flush=True) - - # Force close any open position at the end - if in_position: - final_open_price = df_aggregated['Open'].iloc[-1] # Use open price for consistency - final_timestamp = df_aggregated['Timestamp'].iloc[-1] - try: - ts64 = np.datetime64(final_timestamp) - idx_min = int(np.searchsorted(one_min_timestamps_sorted, ts64, side='left')) - minute_row = df_1min_sorted.iloc[idx_min] if 0 <= idx_min < len(df_1min_sorted) else None - except Exception: - minute_row = None - base_exit_price = final_open_price - trade_usd_size = float(coin * base_exit_price) - slip_rate = estimate_slippage_rate(trade_usd_size, minute_row, base_slippage_rate, impact_threshold_pct, impact_slope) - final_effective_price = base_exit_price * (1.0 - slip_rate) - gross_usd = coin * final_effective_price - fee = calculate_okx_taker_maker_fee(gross_usd, is_maker=False) - usd = gross_usd - fee - trade_pnl = (final_effective_price - entry_price) / entry_price if entry_price else 0 - total_slippage_usd += trade_usd_size * slip_rate - total_traded_usd += trade_usd_size - trade_results.append(trade_pnl) - trade_log.append({ - 'type': 'forced_close', - 'time': final_timestamp, - 'base_price': base_exit_price, - 'effective_price': final_effective_price, - 'slippage_rate': slip_rate, - 'usd': usd, - 'coin': 0, - 'pnl': trade_pnl, - 'fee': fee - }) - coin = 0 - in_position = False - entry_price = None - - print() - print(f"Timeframe: {timeframe},\tTotal profit: {usd - init_usd},\tNumber of stop losses: {nb_stop_loss}") - - # --- Performance Metrics --- - equity_arr = np.array([e[1] for e in equity_curve]) - # Handle edge cases for empty or invalid equity data - if len(equity_arr) == 0: - print("Warning: No equity data available") - return None - returns = np.diff(equity_arr) / equity_arr[:-1] - # Filter out infinite and NaN returns - returns = returns[np.isfinite(returns)] - total_return = (equity_arr[-1] - equity_arr[0]) / equity_arr[0] if equity_arr[0] != 0 else 0 - running_max = np.maximum.accumulate(equity_arr) - if equity_arr[-1] <= 0.01: - max_drawdown = -1.0 - else: - drawdowns = (equity_arr - running_max) / running_max - max_drawdown = drawdowns.min() if len(drawdowns) > 0 and np.isfinite(drawdowns).any() else 0 - if len(returns) > 1 and np.std(returns) > 1e-9: - sharpe = np.mean(returns) / np.std(returns) * math.sqrt(252) - else: - sharpe = 0 - wins = [1 for r in trade_results if r > 0] - win_rate = len(wins) / len(trade_results) if trade_results else 0 - num_trades = len(trade_results) - - print(f"Performance Metrics:") - print(f" Total Return: {total_return*100:.2f}%") - print(f" Max Drawdown: {max_drawdown*100:.2f}%") - print(f" Sharpe Ratio: {sharpe:.2f}") - print(f" Win Rate: {win_rate*100:.2f}%") - print(f" Number of Trades: {num_trades}") - print(f" Final Equity: ${equity_arr[-1]:.2f}") - print(f" Initial Equity: ${equity_arr[0]:.2f}") - - # --- Save Trade Log --- - log_dir = "backtest_logs" - os.makedirs(log_dir, exist_ok=True) - # Format stop_loss_pct for filename (e.g., 0.05 -> 0p05) - stop_loss_str = f"{stop_loss_pct:.2f}".replace('.', 'p') - log_path = os.path.join(log_dir, f"trade_log_{timeframe}_sl{stop_loss_str}.csv") - if trade_log: - all_keys = set() - for entry in trade_log: - all_keys.update(entry.keys()) - all_keys = list(all_keys) - - trade_log_filled = [] - for entry in trade_log: - filled_entry = {k: entry.get(k, None) for k in all_keys} - trade_log_filled.append(filled_entry) - - # Calculate total fees for this backtest - total_fees = sum(entry.get('fee', 0) for entry in trade_log) - - # Write summary header row, then trade log header and rows - with open(log_path, 'w', newline='') as f: - writer = csv.writer(f) - summary_header = [ - 'elapsed_time_sec', 'total_return', 'max_drawdown', 'sharpe_ratio', - 'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees', - 'total_slippage_usd', 'avg_slippage_bps' - ] - summary_values = [ - f"{time.time() - start_time:.2f}", - f"{total_return*100:.2f}%", - f"{max_drawdown*100:.2f}%", - f"{sharpe:.2f}", - f"{win_rate*100:.2f}%", - str(num_trades), - f"${equity_arr[-1]:.2f}", - f"${equity_arr[0]:.2f}", - str(nb_stop_loss), - f"${total_fees:.4f}", - f"${total_slippage_usd:.4f}", - f"{(total_slippage_usd / total_traded_usd * 10000.0) if total_traded_usd > 0 else 0:.2f}" - ] - writer.writerow(summary_header) - writer.writerow(summary_values) - writer.writerow([]) # Blank row for separation - dict_writer = csv.DictWriter(f, fieldnames=all_keys) - dict_writer.writeheader() - dict_writer.writerows(trade_log_filled) - - print(f"Trade log saved to {log_path}") - else: - print("No trades to log.") - - # Return summary metrics (excluding elapsed time) - return { - 'timeframe': timeframe, - 'stop_loss': stop_loss_pct, - 'total_return': total_return, - 'max_drawdown': max_drawdown, - 'sharpe_ratio': sharpe, - 'win_rate': win_rate, - 'num_trades': num_trades, - 'final_equity': equity_arr[-1], - 'initial_equity': equity_arr[0], - 'num_stop_losses': nb_stop_loss, - 'total_fees': total_fees if trade_log else 0, - 'total_slippage_usd': total_slippage_usd, - 'avg_slippage_bps': (total_slippage_usd / total_traded_usd * 10000.0) if total_traded_usd > 0 else 0.0 - } - -if __name__ == "__main__": - timeframes = ["5min", "15min", "30min", "1h", "4h", "1d", "2d"] - # timeframes = ["5min", "15min", "1h", "4h", "1d"] - # timeframes = ["30min"] - stoplosses = [0.1, 0.2, 0.3, 0.4, 0.5] - # Slippage configuration (OKX Spot): base in bps, plus volume-impact model - slippage_base_bps = 10 # 10 bps base slippage (realistic, conservative) - impact_threshold_pct = 0.10 # e.g., start impact beyond 10% of 1-min volume - impact_slope = 0.0010 # incremental slippage per 1x over threshold - - # df_1min = load_data('2021-11-01', '2024-10-16', '../data/btcusd_1-min_data.csv') - df_1min = load_data('2021-11-01', '2025-08-19', '../data/btcusd_okx_1-min_data.csv') - - # Prepare summary CSV - summary_csv_path = "backtest_summary.csv" - summary_header = [ - 'timeframe', 'stop_loss', 'total_return', 'max_drawdown', 'sharpe_ratio', - 'win_rate', 'num_trades', 'final_equity', 'initial_equity', 'num_stop_losses', 'total_fees', - 'total_slippage_usd', 'avg_slippage_bps' - ] - with open(summary_csv_path, 'w', newline='') as summary_file: - writer = csv.DictWriter(summary_file, fieldnames=summary_header) - writer.writeheader() - for timeframe in timeframes: - df_aggregated = aggregate_data(df_1min, timeframe) - df_aggregated = add_supertrend_indicators(df_aggregated) - for stop_loss_pct in stoplosses: - summary = backtest( - timeframe, - df_aggregated, - df_1min, - stop_loss_pct=stop_loss_pct, - base_slippage_rate=slippage_base_bps / 10000.0, - impact_threshold_pct=impact_threshold_pct, - impact_slope=impact_slope - ) - if summary is not None: - # Format values for CSV (e.g., floats as rounded strings) - summary_row = { - 'timeframe': summary['timeframe'], - 'stop_loss': summary['stop_loss'], - 'total_return': f"{summary['total_return']*100:.2f}%", - 'max_drawdown': f"{summary['max_drawdown']*100:.2f}%", - 'sharpe_ratio': f"{summary['sharpe_ratio']:.2f}", - 'win_rate': f"{summary['win_rate']*100:.2f}%", - 'num_trades': summary['num_trades'], - 'final_equity': f"${summary['final_equity']:.2f}", - 'initial_equity': f"${summary['initial_equity']:.2f}", - 'num_stop_losses': summary['num_stop_losses'], - 'total_fees': f"${summary['total_fees']:.4f}", - 'total_slippage_usd': f"${summary['total_slippage_usd']:.4f}", - 'avg_slippage_bps': f"{summary['avg_slippage_bps']:.2f}" - } - writer.writerow(summary_row) diff --git a/market_costs.py b/market_costs.py new file mode 100644 index 0000000..697da71 --- /dev/null +++ b/market_costs.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +TAKER_FEE_BPS_DEFAULT = 10.0 # 0.10% + + +def okx_fee(fee_bps: float, notional_usd: float) -> float: + return notional_usd * (fee_bps / 1e4) + + +def estimate_slippage_rate(slippage_bps: float, notional_usd: float) -> float: + return notional_usd * (slippage_bps / 1e4) \ No newline at end of file diff --git a/metrics.py b/metrics.py new file mode 100644 index 0000000..82d80be --- /dev/null +++ b/metrics.py @@ -0,0 +1,54 @@ +from __future__ import annotations +from dataclasses import dataclass +import numpy as np +import pandas as pd + + +@dataclass +class Perf: + total_return: float + max_drawdown: float + sharpe_ratio: float + win_rate: float + num_trades: int + final_equity: float + initial_equity: float + num_stop_losses: int + total_fees: float + total_slippage_usd: float + avg_slippage_bps: float + + +def compute_metrics(equity_curve: pd.Series, trades: list[dict]) -> Perf: + ret = equity_curve.pct_change().fillna(0.0) + total_return = equity_curve.iat[-1] / equity_curve.iat[0] - 1.0 + cummax = equity_curve.cummax() + dd = (equity_curve / cummax - 1.0).min() + max_drawdown = dd + + if ret.std(ddof=0) > 0: + sharpe = (ret.mean() / ret.std(ddof=0)) * np.sqrt(252 * 24 * 60) # minute bars -> annualized + else: + sharpe = 0.0 + + closes = [t for t in trades if t.get("side") == "SELL"] + wins = [t for t in closes if t.get("pnl", 0.0) > 0] + win_rate = (len(wins) / len(closes)) if closes else 0.0 + + fees = sum(t.get("fee", 0.0) for t in trades) + slip = sum(t.get("slippage", 0.0) for t in trades) + slippage_bps = [t.get("slippage_bps", 0.0) for t in trades if "slippage_bps" in t] + + return Perf( + total_return=total_return, + max_drawdown=max_drawdown, + sharpe_ratio=sharpe, + win_rate=win_rate, + num_trades=len(closes), + final_equity=float(equity_curve.iat[-1]), + initial_equity=float(equity_curve.iat[0]), + num_stop_losses=sum(1 for t in closes if t.get("reason") == "stop"), + total_fees=fees, + total_slippage_usd=slip, + avg_slippage_bps=float(np.mean(slippage_bps)) if slippage_bps else 0.0, + ) \ No newline at end of file diff --git a/trade.py b/trade.py new file mode 100644 index 0000000..78b97c8 --- /dev/null +++ b/trade.py @@ -0,0 +1,52 @@ +from __future__ import annotations +from dataclasses import dataclass +import pandas as pd +from market_costs import okx_fee, estimate_slippage_rate +from intrabar import entry_slippage_row + + +@dataclass +class TradeState: + cash: float = 1000.0 + qty: float = 0.0 + entry_px: float | None = None + max_px: float | None = None + stop_loss_frac: float = 0.02 + fee_bps: float = 10.0 + slippage_bps: float = 2.0 + + +def enter_long(state: TradeState, price: float) -> dict: + if state.qty > 0: + return {} + px = entry_slippage_row(price, 0.0, state.slippage_bps) + qty = state.cash / px + fee = okx_fee(state.fee_bps, state.cash) + state.qty = max(qty - fee / px, 0.0) + state.cash = 0.0 + state.entry_px = px + state.max_px = px + return {"side": "BUY", "price": px, "qty": state.qty, "fee": fee} + + +def maybe_trailing_stop(state: TradeState, price: float) -> float: + if state.qty <= 0: + return float("inf") + state.max_px = max(state.max_px or price, price) + trail_px = state.max_px * (1.0 - state.stop_loss_frac) + return trail_px + + +def exit_long(state: TradeState, price: float) -> dict: + if state.qty <= 0: + return {} + notional = state.qty * price + slip = estimate_slippage_rate(state.slippage_bps, notional) + fee = okx_fee(state.fee_bps, notional) + cash_back = notional - slip - fee + event = {"side": "SELL", "price": price, "qty": state.qty, "fee": fee, "slippage": slip} + state.cash = cash_back + state.qty = 0.0 + state.entry_px = None + state.max_px = None + return event \ No newline at end of file