Files
lowkey_backtest/strategies/regime_strategy.py
Simon Moisy 10bb371054 Implement Regime Reversion Strategy and remove regime_detection.py
- Introduced `RegimeReversionStrategy` for ML-based regime detection and mean reversion trading.
- Added feature engineering and model training logic within the new strategy.
- Removed the deprecated `regime_detection.py` file to streamline the codebase.
- Updated the strategy factory to include the new regime strategy configuration.
2026-01-13 21:55:34 +08:00

281 lines
12 KiB
Python

import pandas as pd
import numpy as np
import ta
import vectorbt as vbt
from sklearn.ensemble import RandomForestClassifier
from strategies.base import BaseStrategy
from engine.market import MarketType
from engine.data_manager import DataManager
from engine.logging_config import get_logger
logger = get_logger(__name__)
class RegimeReversionStrategy(BaseStrategy):
"""
ML-Based Regime Detection & Mean Reversion Strategy.
Logic:
1. Tracks the BTC/ETH Spread and its Z-Score (24h window).
2. Uses a Random Forest model to predict if an extreme Z-Score will revert profitably.
3. Features: Spread Technicals (RSI, ROC) + On-Chain Flows (Inflow, Funding).
4. Entry: When Model Probability > 0.5.
5. Exit: Z-Score reversion to 0 or SL/TP.
Walk-Forward Training:
- Trains on first `train_ratio` of data (default 70%)
- Generates signals only for remaining test period (30%)
- Eliminates look-ahead bias for realistic backtest results
"""
def __init__(self,
model_path: str = "data/regime_model.pkl",
horizon: int = 96, # 4 Days based on research
z_window: int = 24,
stop_loss: float = 0.06, # 6% to survive 2% avg MAE
take_profit: float = 0.05, # Swing target
train_ratio: float = 0.7 # Walk-forward: train on first 70%
):
super().__init__()
self.model_path = model_path
self.horizon = horizon
self.z_window = z_window
self.stop_loss = stop_loss
self.take_profit = take_profit
self.train_ratio = train_ratio
# Default Strategy Config
self.default_market_type = MarketType.PERPETUAL
self.default_leverage = 1
self.dm = DataManager()
self.model = None
self.feature_cols = None
self.train_end_idx = None # Will store the training cutoff point
def run(self, close, **kwargs):
"""
Execute the strategy logic.
We assume this strategy is run on ETH-USDT (the active asset).
We will fetch BTC-USDT internally to calculate the spread.
"""
# 1. Identify Context
# We need BTC data aligned with the incoming ETH 'close' series
start_date = close.index.min()
end_date = close.index.max()
logger.info("Fetching BTC context data...")
try:
# Load BTC data (Context) - Must match the timeframe of the backtest
# Research was done on 1h candles, so strategy should be run on 1h
df_btc = self.dm.load_data("okx", "BTC-USDT", "1h", MarketType.SPOT)
# Align BTC to ETH (close)
df_btc = df_btc.reindex(close.index, method='ffill')
btc_close = df_btc['close']
except Exception as e:
logger.error(f"Failed to load BTC context: {e}")
empty = self.create_empty_signals(close)
return empty, empty, empty, empty
# 2. Construct DataFrames for Feature Engineering
# We need volume/high/low for features, but 'run' signature primarily gives 'close'.
# kwargs might have high/low/volume if passed by Backtester.run_strategy
eth_vol = kwargs.get('volume')
if eth_vol is None:
logger.warning("Volume data missing. Feature calculation might fail.")
# Fallback or error handling
eth_vol = pd.Series(0, index=close.index)
# Construct dummy dfs for prepare_features
# We only really need Close and Volume for the current feature set
df_a = pd.DataFrame({'close': btc_close, 'volume': df_btc['volume']})
df_b = pd.DataFrame({'close': close, 'volume': eth_vol})
# 3. Load On-Chain Data (CryptoQuant)
# We use the saved CSV for training/inference
# In a live setting, this would query the API for recent data
cq_df = None
try:
cq_path = "data/cq_training_data.csv"
cq_df = pd.read_csv(cq_path, index_col='timestamp', parse_dates=True)
if cq_df.index.tz is None:
cq_df.index = cq_df.index.tz_localize('UTC')
except Exception:
logger.warning("CryptoQuant data not found. Running without on-chain features.")
# 4. Calculate Features
features = self.prepare_features(df_a, df_b, cq_df)
# 5. Walk-Forward Split
# Train on first `train_ratio` of data, test on remainder
n_samples = len(features)
train_size = int(n_samples * self.train_ratio)
train_features = features.iloc[:train_size]
test_features = features.iloc[train_size:]
train_end_date = train_features.index[-1]
test_start_date = test_features.index[0]
logger.info(
f"Walk-Forward Split: Train={len(train_features)} bars "
f"(until {train_end_date.strftime('%Y-%m-%d')}), "
f"Test={len(test_features)} bars "
f"(from {test_start_date.strftime('%Y-%m-%d')})"
)
# 6. Train Model on Training Period ONLY
if self.model is None:
logger.info("Training Regime Model on training period only...")
self.model, self.feature_cols = self.train_model(train_features)
# 7. Predict on TEST Period ONLY
# Use valid columns only
X_test = test_features[self.feature_cols].fillna(0)
X_test = X_test.replace([np.inf, -np.inf], 0)
# Predict Probabilities for test period
probs = self.model.predict_proba(X_test)[:, 1]
# 8. Generate Entry Signals (TEST period only)
# If Z > 1 (Spread High, ETH Expensive) -> Short ETH
# If Z < -1 (Spread Low, ETH Cheap) -> Long ETH
short_signal_test = (probs > 0.5) & (test_features['z_score'].values > 1.0)
long_signal_test = (probs > 0.5) & (test_features['z_score'].values < -1.0)
# Create full-length signal series (False for training period)
long_entries = pd.Series(False, index=close.index)
short_entries = pd.Series(False, index=close.index)
# Map test signals to their correct indices
test_idx = test_features.index
for i, idx in enumerate(test_idx):
if idx in close.index:
long_entries.loc[idx] = bool(long_signal_test[i])
short_entries.loc[idx] = bool(short_signal_test[i])
# 9. Generate Exits
# Exit when Z-Score crosses back through 0 (mean reversion complete)
z_reindexed = features['z_score'].reindex(close.index, fill_value=0)
# Exit Long when Z > 0, Exit Short when Z < 0
long_exits = z_reindexed > 0
short_exits = z_reindexed < 0
# Log signal counts for verification
n_long = long_entries.sum()
n_short = short_entries.sum()
logger.info(f"Generated {n_long} long signals, {n_short} short signals (test period only)")
return long_entries, long_exits, short_entries, short_exits
def prepare_features(self, df_btc, df_eth, cq_df=None):
"""Replicate research feature engineering"""
# Align
common = df_btc.index.intersection(df_eth.index)
df_a = df_btc.loc[common].copy()
df_b = df_eth.loc[common].copy()
# Spread
spread = df_b['close'] / df_a['close']
# Z-Score
rolling_mean = spread.rolling(window=self.z_window).mean()
rolling_std = spread.rolling(window=self.z_window).std()
z_score = (spread - rolling_mean) / rolling_std
# Technicals
spread_rsi = ta.momentum.RSIIndicator(spread, window=14).rsi()
spread_roc = spread.pct_change(periods=5) * 100
spread_change_1h = spread.pct_change(periods=1)
# Volume
vol_ratio = df_b['volume'] / df_a['volume']
vol_ratio_ma = vol_ratio.rolling(window=12).mean()
# Volatility
ret_a = df_a['close'].pct_change()
ret_b = df_b['close'].pct_change()
vol_a = ret_a.rolling(window=self.z_window).std()
vol_b = ret_b.rolling(window=self.z_window).std()
vol_spread_ratio = vol_b / vol_a
features = pd.DataFrame(index=spread.index)
features['spread'] = spread
features['z_score'] = z_score
features['spread_rsi'] = spread_rsi
features['spread_roc'] = spread_roc
features['spread_change_1h'] = spread_change_1h
features['vol_ratio'] = vol_ratio
features['vol_ratio_rel'] = vol_ratio / vol_ratio_ma
features['vol_diff_ratio'] = vol_spread_ratio
# CQ Merge
if cq_df is not None:
cq_aligned = cq_df.reindex(features.index, method='ffill')
if 'btc_funding' in cq_aligned.columns and 'eth_funding' in cq_aligned.columns:
cq_aligned['funding_diff'] = cq_aligned['eth_funding'] - cq_aligned['btc_funding']
if 'btc_inflow' in cq_aligned.columns and 'eth_inflow' in cq_aligned.columns:
cq_aligned['inflow_ratio'] = cq_aligned['eth_inflow'] / (cq_aligned['btc_inflow'] + 1)
features = features.join(cq_aligned)
return features.dropna()
def train_model(self, train_features):
"""
Train Random Forest on training data only.
This method receives ONLY the training subset of features,
ensuring no look-ahead bias. The model learns from historical
patterns and is then applied to unseen test data.
Args:
train_features: DataFrame containing features for training period only
"""
threshold = 0.005
horizon = self.horizon
# Define targets using ONLY training data
# For Short Spread (Z > 1): Did spread drop below target within horizon?
future_min = train_features['spread'].rolling(window=horizon).min().shift(-horizon)
target_short = train_features['spread'] * (1 - threshold)
success_short = (train_features['z_score'] > 1.0) & (future_min < target_short)
# For Long Spread (Z < -1): Did spread rise above target within horizon?
future_max = train_features['spread'].rolling(window=horizon).max().shift(-horizon)
target_long = train_features['spread'] * (1 + threshold)
success_long = (train_features['z_score'] < -1.0) & (future_max > target_long)
targets = np.select([success_short, success_long], [1, 1], default=0)
# Build model
model = RandomForestClassifier(
n_estimators=300, max_depth=5, min_samples_leaf=30,
class_weight={0: 1, 1: 3}, random_state=42
)
# Exclude non-feature columns
exclude = ['spread']
cols = [c for c in train_features.columns if c not in exclude]
# Clean features
X_train = train_features[cols].fillna(0)
X_train = X_train.replace([np.inf, -np.inf], 0)
# Remove rows with NaN targets (from rolling window at end of training period)
valid_mask = ~np.isnan(targets) & ~np.isinf(targets)
# Also check for rows where future data doesn't exist (shift created NaNs)
valid_mask = valid_mask & (future_min.notna().values) & (future_max.notna().values)
X_train_clean = X_train[valid_mask]
targets_clean = targets[valid_mask]
logger.info(f"Training on {len(X_train_clean)} valid samples (removed {len(X_train) - len(X_train_clean)} with incomplete future data)")
model.fit(X_train_clean, targets_clean)
return model, cols