lowkey_backtest/backtest_mvrv.py

271 lines
11 KiB
Python
Raw Normal View History

import pandas as pd
import numpy as np
import pickle
import os
import strategy_config as config
from trade import TradeState, enter_long, exit_long
from logging_utils import write_trade_log
from metrics import compute_metrics
from pathlib import Path
def backtest_mvrv(
df_features: pd.DataFrame,
df_1min: pd.DataFrame,
initial_cash: float = 10000.0,
log_path: Path | None = None,
test_only: bool = True # NEW: Only backtest on test set to avoid train/test leakage
):
print("--- Starting MVRV Strategy Backtest ---")
# 1. Load Model and Generate Predictions
print(f"Loading model from {config.MODEL_PATH}...")
with open(config.MODEL_PATH, 'rb') as f:
model = pickle.load(f)
# Load split info to identify test set boundary
split_info_path = config.MODEL_PATH.replace('.pkl', '_split.pkl')
if test_only and os.path.exists(split_info_path):
with open(split_info_path, 'rb') as f:
split_info = pickle.load(f)
test_start_idx = split_info['test_start_idx']
print(f"Filtering to TEST SET ONLY (starting at index {test_start_idx})")
print(f" Train size was: {split_info['train_size']}, Test size: {split_info['test_size']}")
# Filter features to test set only
df_features = df_features.iloc[test_start_idx:].copy()
# Filter 1min data to match the test period
test_start_ts = df_features.index[0]
df_1min = df_1min[df_1min['Timestamp'] >= test_start_ts].copy()
print(f"Backtest period: {df_features.index[0]} to {df_features.index[-1]}")
elif test_only:
print("WARNING: Split info not found. Running on FULL dataset (includes training data!).")
# Prepare features for prediction
# Only use columns that were used in training
# We rely on config.FEATURE_NAMES, but we must check what's in df_features
# The model expects specific columns.
X = df_features[config.FEATURE_NAMES]
print("Generating predictions...")
probs = model.predict_proba(X)[:, 1]
df_features['signal_prob'] = probs
# 2. Setup Backtest Loop
state = TradeState(
cash=initial_cash,
fee_bps=config.FEES_PERCENT * 10000, # Convert to bps
slippage_bps=config.SLIPPAGE_PERCENT * 10000
)
equity = []
trades = []
# Track dynamic SL/TP
current_sl_price = 0.0
current_tp_price = 0.0
# Pre-calculate entry signals to speed up loop
# Logic: Prob > Thresh AND Funding > Filter AND (MVRV < Thresh AND NUPL < Thresh)
# Note: features.py handles MVRV/NUPL Z-scores.
# The strategy uses raw NUPL/MVRV for regime filter, or Z-scores?
# Source: (mvrv_z > MVRV_Z_THRESH) | (nupl > NUPL_THRESH) -> is_overheated
# Check if we have 'mvrv_z' and 'nupl' columns in df_features.
# Apply filters
# Defaults if cols missing (safe fallback)
s_prob = df_features['signal_prob']
funding = df_features['funding_rate'] if 'funding_rate' in df_features.columns else pd.Series(0, index=df_features.index)
# Use 'mvrv_z' if available, else 'mvrv' (but Z-score is preferred for normalization)
# The source strategy used 'mvrv_z' > 1.5 for overheated.
mvrv_z = df_features['mvrv_z'] if 'mvrv_z' in df_features.columns else pd.Series(0, index=df_features.index)
# Source used raw 'nupl' > 0.6 for overheated
nupl = df_features['nupl'] if 'nupl' in df_features.columns else pd.Series(0, index=df_features.index)
# Regime Filter: True if NOT overheated
is_overheated = (mvrv_z > config.MVRV_Z_THRESH) | (nupl > config.NUPL_THRESH)
regime_can_trade = ~is_overheated
# Entry Signal
entry_signals = (
(s_prob > config.PROB_THRESHOLD) &
(funding > config.FUNDING_FILTER) &
regime_can_trade
)
df_features['entry_signal'] = entry_signals
print(f"Total Entry Signals: {entry_signals.sum()}")
# Loop
# df_features is 1H. df_1min is 1m.
# We iterate through df_features (hourly steps).
# If in a trade, we check df_1min for SL/TP within that hour.
# If not in a trade, we check for Entry Signal at the close of the hour (or open of next).
# Standard backtesting: Signals calculated on 'Close' are executable at 'Open' of next candle.
# But df_1min covers the interval.
# Let's align carefully.
for i in range(len(df_features) - 1):
# Current 1H candle (completed)
row = df_features.iloc[i]
next_row = df_features.iloc[i+1]
ts_start = row.name # Timestamp of the row (e.g. 10:00)
ts_end = next_row.name # Timestamp of next row (e.g. 11:00)
# Get 1m data for this interval [ts_start, ts_end)
# Note: df_1min['Timestamp'] needs to be datetime
mask = (df_1min['Timestamp'] >= ts_start) & (df_1min['Timestamp'] < ts_end)
chunk_1min = df_1min.loc[mask]
# 1. Manage Existing Position (Exit Logic)
# Store initial qty state to prevent re-entry in same candle if we exited
started_with_position = state.qty > 0
if state.qty > 0:
# Check for SL/TP hits in 1m data
for _, m_row in chunk_1min.iterrows():
m_high = m_row['High']
m_low = m_row['Low']
m_ts = m_row['Timestamp']
# Check SL
if m_low <= current_sl_price:
evt = exit_long(state, current_sl_price) # Exec at SL price
if evt:
prev = trades[-1]
pnl = (evt["price"] - prev["price"]) * prev["qty"]
evt.update({"t": m_ts.isoformat(), "reason": "stop_loss", "pnl": pnl})
trades.append(evt)
break # Exit loop
# Check TP
if m_high >= current_tp_price:
evt = exit_long(state, current_tp_price) # Exec at TP price
if evt:
prev = trades[-1]
pnl = (evt["price"] - prev["price"]) * prev["qty"]
evt.update({"t": m_ts.isoformat(), "reason": "take_profit", "pnl": pnl})
trades.append(evt)
break # Exit loop
# 2. Check for New Entry (if no position)
# Logic: If signal was True at 'row' (completed candle), we enter at Open of 'next_row' (or first 1m candle of next hour)
# Actually, we can enter immediately at the start of the interval if the signal was from the *previous* completed candle.
# Here 'row' is the current interval processing.
# If 'entry_signal' is True for 'row', it means at the end of 10:00 we have a signal.
# We should enter at 11:00 (which is start of next interval).
# So we check entry_signal of 'row', and if True, we enter at first available price in 'chunk_1min'??
# WAIT. 'chunk_1min' is [ts_start, ts_end).
# If row is 10:00 (meaning data for 09:00-10:00?), standard pandas resample labels left or right?
# Usually 10:00 label means 10:00-11:00 or 09:00-10:00?
# prepare_data used resample('1h').
# Pandas default for 1h is usually start of bin (left).
# So 10:00 row contains data from 10:00 to 11:00.
# We can only know the signal at 11:00 (Close of the candle).
# So we can execute at 11:00 (start of next bin).
# So: processing interval i (10:00-11:00).
# We check signal from i-1 (09:00-10:00).
# If i-1 had signal, we enter at start of i.
if state.qty <= 0 and not started_with_position:
# Check previous row signal
if i > 0:
prev_row = df_features.iloc[i-1]
if prev_row['entry_signal']:
# Enter Long
# Price = Open of current interval (or first 1m open)
entry_price = row['open']
if not chunk_1min.empty:
entry_price = chunk_1min.iloc[0]['Open']
# Calculate ATR-based SL/TP
atr = prev_row['atr']
if pd.isna(atr) or atr == 0:
atr = row['open'] * 0.01 # Fallback 1%
sl_dist = atr * config.SL_ATR_MULT
tp_dist = atr * config.TP_ATR_MULT
current_sl_price = entry_price - sl_dist
current_tp_price = entry_price + tp_dist
evt = enter_long(state, entry_price)
if evt:
evt.update({
"t": ts_start.isoformat(),
"reason": "signal_entry",
"sl": current_sl_price,
"tp": current_tp_price
})
trades.append(evt)
# Update Equity Curve (mark-to-market at close of hour)
current_price = row['close']
val = state.cash + (state.qty * current_price)
equity.append({'timestamp': ts_start, 'equity': val})
# Create Equity Series
equity_df = pd.DataFrame(equity).set_index('timestamp')
equity_curve = equity_df['equity']
# Save Logs
if log_path:
write_trade_log(trades, log_path)
# Metrics (hourly bars: 252 trading days * 24 hours = 6048 periods/year)
perf = compute_metrics(equity_curve, trades, periods_per_year=252 * 24)
# Print Summary
print("\n--- Backtest Summary ---")
print(f"Total Return: {perf.total_return * 100:.2f}%")
print(f"Sharpe Ratio: {perf.sharpe_ratio:.2f}")
print(f"Max Drawdown: {perf.max_drawdown * 100:.2f}%")
print(f"Total Trades: {perf.num_trades}")
return perf, equity_curve, trades
import argparse
def run():
parser = argparse.ArgumentParser()
parser.add_argument("--csv", required=True, help="Path to 1m/15m OHLCV CSV")
args = parser.parse_args()
# Load 1M Data
print(f"Loading 1m/15m data from {args.csv}...")
df_1min = pd.read_csv(args.csv)
# Ensure Timestamp
if 'Timestamp' in df_1min.columns:
ts_max = df_1min['Timestamp'].max()
if ts_max < 3000000000:
unit = 's'
elif ts_max < 3000000000000:
unit = 'ms'
else:
unit = None
df_1min['Timestamp'] = pd.to_datetime(df_1min['Timestamp'], unit=unit)
elif 'Date' in df_1min.columns:
df_1min['Timestamp'] = pd.to_datetime(df_1min['Date'])
df_1min = df_1min.sort_values('Timestamp')
# Load Features (1H)
print(f"Loading features from {config.FEATURES_PATH}...")
if not os.path.exists(config.FEATURES_PATH):
print("Error: features.csv not found. Run prepare_data.py first.")
return
df_features = pd.read_csv(config.FEATURES_PATH, parse_dates=['timestamp'], index_col='timestamp')
# Run Backtest
backtest_mvrv(df_features, df_1min, log_path=Path("logs/mvrv_trade_log.csv"))
if __name__ == "__main__":
run()