""" Regime Detection Research Script with Walk-Forward Training. Tests multiple holding horizons to find optimal parameters without look-ahead bias. Usage: uv run python research/regime_detection.py [options] Options: --days DAYS Number of days of data (default: 90) --start DATE Start date (YYYY-MM-DD), overrides --days --end DATE End date (YYYY-MM-DD), defaults to now """ import argparse import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import pandas as pd import numpy as np import ta from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import classification_report, f1_score from engine.data_manager import DataManager from engine.market import MarketType from engine.logging_config import get_logger logger = get_logger(__name__) # Configuration TRAIN_RATIO = 0.7 # 70% train, 30% test PROFIT_THRESHOLD = 0.005 # 0.5% profit target STOP_LOSS_PCT = 0.06 # 6% stop loss Z_WINDOW = 24 FEE_RATE = 0.001 # 0.1% round-trip fee DEFAULT_DAYS = 90 # Default lookback period in days def load_data(days: int = DEFAULT_DAYS, start_date: str = None, end_date: str = None): """ Load and align BTC/ETH data. Args: days: Number of days of historical data (default: 90) start_date: Optional start date (YYYY-MM-DD), overrides days end_date: Optional end date (YYYY-MM-DD), defaults to now Returns: Tuple of (df_btc, df_eth) DataFrames """ dm = DataManager() df_btc = dm.load_data("okx", "BTC-USDT", "1h", MarketType.SPOT) df_eth = dm.load_data("okx", "ETH-USDT", "1h", MarketType.SPOT) # Determine date range if end_date: end = pd.Timestamp(end_date, tz="UTC") else: end = pd.Timestamp.now(tz="UTC") if start_date: start = pd.Timestamp(start_date, tz="UTC") else: start = end - pd.Timedelta(days=days) df_btc = df_btc[(df_btc.index >= start) & (df_btc.index <= end)] df_eth = df_eth[(df_eth.index >= start) & (df_eth.index <= end)] # Align indices common = df_btc.index.intersection(df_eth.index) df_btc = df_btc.loc[common] df_eth = df_eth.loc[common] logger.info(f"Loaded {len(common)} aligned hourly bars from {start} to {end}") return df_btc, df_eth def load_cryptoquant_data(): """Load CryptoQuant on-chain data if available.""" try: cq_path = "data/cq_training_data.csv" cq_df = pd.read_csv(cq_path, index_col='timestamp', parse_dates=True) if cq_df.index.tz is None: cq_df.index = cq_df.index.tz_localize('UTC') logger.info(f"Loaded CryptoQuant data: {len(cq_df)} rows") return cq_df except Exception as e: logger.warning(f"CryptoQuant data not available: {e}") return None def calculate_features(df_btc, df_eth, cq_df=None): """Calculate all features for the model.""" spread = df_eth['close'] / df_btc['close'] # Z-Score rolling_mean = spread.rolling(window=Z_WINDOW).mean() rolling_std = spread.rolling(window=Z_WINDOW).std() z_score = (spread - rolling_mean) / rolling_std # Technicals spread_rsi = ta.momentum.RSIIndicator(spread, window=14).rsi() spread_roc = spread.pct_change(periods=5) * 100 spread_change_1h = spread.pct_change(periods=1) # Volume vol_ratio = df_eth['volume'] / df_btc['volume'] vol_ratio_ma = vol_ratio.rolling(window=12).mean() # Volatility ret_btc = df_btc['close'].pct_change() ret_eth = df_eth['close'].pct_change() vol_btc = ret_btc.rolling(window=Z_WINDOW).std() vol_eth = ret_eth.rolling(window=Z_WINDOW).std() vol_spread_ratio = vol_eth / vol_btc features = pd.DataFrame(index=spread.index) features['spread'] = spread features['z_score'] = z_score features['spread_rsi'] = spread_rsi features['spread_roc'] = spread_roc features['spread_change_1h'] = spread_change_1h features['vol_ratio'] = vol_ratio features['vol_ratio_rel'] = vol_ratio / vol_ratio_ma features['vol_diff_ratio'] = vol_spread_ratio # Add CQ features if available if cq_df is not None: cq_aligned = cq_df.reindex(features.index, method='ffill') if 'btc_funding' in cq_aligned.columns and 'eth_funding' in cq_aligned.columns: cq_aligned['funding_diff'] = cq_aligned['eth_funding'] - cq_aligned['btc_funding'] if 'btc_inflow' in cq_aligned.columns and 'eth_inflow' in cq_aligned.columns: cq_aligned['inflow_ratio'] = cq_aligned['eth_inflow'] / (cq_aligned['btc_inflow'] + 1) features = features.join(cq_aligned) return features.dropna() def calculate_targets(features, horizon): """ Calculate target labels for a given horizon. Uses path-dependent labeling: Success is hitting Profit Target BEFORE Stop Loss. """ spread = features['spread'].values z_score = features['z_score'].values n = len(spread) targets = np.zeros(n, dtype=int) # Create valid mask (rows with complete future data) valid_mask = np.zeros(n, dtype=bool) valid_mask[:n-horizon] = True # Only iterate relevant rows for efficiency candidates = np.where((z_score > 1.0) | (z_score < -1.0))[0] for i in candidates: if i + horizon >= n: continue entry_price = spread[i] future_prices = spread[i+1 : i+1+horizon] if z_score[i] > 1.0: # Short target_price = entry_price * (1 - PROFIT_THRESHOLD) stop_price = entry_price * (1 + STOP_LOSS_PCT) # Identify first hit indices hit_tp = future_prices <= target_price hit_sl = future_prices >= stop_price if not np.any(hit_tp): targets[i] = 0 # Target never hit elif not np.any(hit_sl): targets[i] = 1 # Target hit, SL never hit else: first_tp_idx = np.argmax(hit_tp) first_sl_idx = np.argmax(hit_sl) # Success if TP hit before SL if first_tp_idx < first_sl_idx: targets[i] = 1 else: targets[i] = 0 else: # Long target_price = entry_price * (1 + PROFIT_THRESHOLD) stop_price = entry_price * (1 - STOP_LOSS_PCT) hit_tp = future_prices >= target_price hit_sl = future_prices <= stop_price if not np.any(hit_tp): targets[i] = 0 elif not np.any(hit_sl): targets[i] = 1 else: first_tp_idx = np.argmax(hit_tp) first_sl_idx = np.argmax(hit_sl) if first_tp_idx < first_sl_idx: targets[i] = 1 else: targets[i] = 0 return targets, pd.Series(valid_mask, index=features.index), None, None def calculate_mae(features, predictions, test_idx, horizon): """Calculate Maximum Adverse Excursion for predicted trades.""" test_features = features.loc[test_idx] spread = test_features['spread'] z_score = test_features['z_score'] mae_values = [] for i, (idx, pred) in enumerate(zip(test_idx, predictions)): if pred != 1: continue entry_spread = spread.loc[idx] z = z_score.loc[idx] # Get future spread values future_idx = features.index.get_loc(idx) future_end = min(future_idx + horizon, len(features)) future_spreads = features['spread'].iloc[future_idx:future_end] if len(future_spreads) < 2: continue if z > 1.0: # Short trade max_adverse = (future_spreads.max() - entry_spread) / entry_spread else: # Long trade max_adverse = (entry_spread - future_spreads.min()) / entry_spread mae_values.append(max_adverse * 100) # As percentage return np.mean(mae_values) if mae_values else 0.0 def calculate_net_profit(features, predictions, test_idx, horizon): """ Calculate estimated net profit including fees. Enforces 'one trade at a time' and simulates SL/TP exits. """ test_features = features.loc[test_idx] spread = test_features['spread'] z_score = test_features['z_score'] total_pnl = 0.0 n_trades = 0 # Track when we are free to trade again next_trade_idx = 0 # Pre-calculate indices for speed all_indices = features.index for i, (idx, pred) in enumerate(zip(test_idx, predictions)): # Skip if we are still in a trade if i < next_trade_idx: continue if pred != 1: continue entry_spread = spread.loc[idx] z = z_score.loc[idx] # Get future spread values current_loc = features.index.get_loc(idx) future_end_loc = min(current_loc + horizon, len(features)) future_spreads = features['spread'].iloc[current_loc+1 : future_end_loc] if len(future_spreads) < 1: continue pnl = 0.0 trade_duration = len(future_spreads) if z > 1.0: # Short trade tp_price = entry_spread * (1 - PROFIT_THRESHOLD) sl_price = entry_spread * (1 + STOP_LOSS_PCT) hit_tp = future_spreads <= tp_price hit_sl = future_spreads >= sl_price # Check what happened first first_tp = np.argmax(hit_tp.values) if hit_tp.any() else 99999 first_sl = np.argmax(hit_sl.values) if hit_sl.any() else 99999 if first_sl < first_tp and first_sl < 99999: # Stopped out exit_price = future_spreads.iloc[first_sl] # Approx SL price # Use exact SL price for realistic simulation? Or close # Let's use the close price of the bar where it crossed pnl = (entry_spread - exit_price) / entry_spread trade_duration = first_sl + 1 elif first_tp < first_sl and first_tp < 99999: # Take profit exit_price = future_spreads.iloc[first_tp] pnl = (entry_spread - exit_price) / entry_spread trade_duration = first_tp + 1 else: # Held to horizon exit_price = future_spreads.iloc[-1] pnl = (entry_spread - exit_price) / entry_spread else: # Long trade tp_price = entry_spread * (1 + PROFIT_THRESHOLD) sl_price = entry_spread * (1 - STOP_LOSS_PCT) hit_tp = future_spreads >= tp_price hit_sl = future_spreads <= sl_price first_tp = np.argmax(hit_tp.values) if hit_tp.any() else 99999 first_sl = np.argmax(hit_sl.values) if hit_sl.any() else 99999 if first_sl < first_tp and first_sl < 99999: # Stopped out exit_price = future_spreads.iloc[first_sl] pnl = (exit_price - entry_spread) / entry_spread trade_duration = first_sl + 1 elif first_tp < first_sl and first_tp < 99999: # Take profit exit_price = future_spreads.iloc[first_tp] pnl = (exit_price - entry_spread) / entry_spread trade_duration = first_tp + 1 else: # Held to horizon exit_price = future_spreads.iloc[-1] pnl = (exit_price - entry_spread) / entry_spread # Subtract fees net_pnl = pnl - FEE_RATE total_pnl += net_pnl n_trades += 1 # Set next available trade index next_trade_idx = i + trade_duration return total_pnl, n_trades def test_horizon(features, horizon): """Test a single horizon with walk-forward training.""" # Calculate targets targets, valid_mask, _, _ = calculate_targets(features, horizon) # Walk-forward split n_samples = len(features) train_size = int(n_samples * TRAIN_RATIO) train_features = features.iloc[:train_size] test_features = features.iloc[train_size:] train_targets = targets[:train_size] test_targets = targets[train_size:] train_valid = valid_mask.iloc[:train_size] test_valid = valid_mask.iloc[train_size:] # Prepare training data (only valid rows) exclude = ['spread'] cols = [c for c in features.columns if c not in exclude] X_train = train_features[cols].fillna(0).replace([np.inf, -np.inf], 0) X_train_valid = X_train[train_valid] y_train_valid = train_targets[train_valid] if len(X_train_valid) < 50: return None # Not enough training data # Train model model = RandomForestClassifier( n_estimators=300, max_depth=5, min_samples_leaf=30, class_weight={0: 1, 1: 3}, random_state=42 ) model.fit(X_train_valid, y_train_valid) # Predict on test set X_test = test_features[cols].fillna(0).replace([np.inf, -np.inf], 0) predictions = model.predict(X_test) # Only evaluate on valid test rows (those with complete future data) test_valid_mask = test_valid.values y_test_valid = test_targets[test_valid_mask] pred_valid = predictions[test_valid_mask] if len(y_test_valid) < 10: return None # Calculate metrics f1 = f1_score(y_test_valid, pred_valid, zero_division=0) # Calculate MAE and Net Profit on ALL test predictions (not just valid targets) test_idx = test_features.index avg_mae = calculate_mae(features, predictions, test_idx, horizon) net_pnl, n_trades = calculate_net_profit(features, predictions, test_idx, horizon) return { 'horizon': horizon, 'f1_score': f1, 'avg_mae': avg_mae, 'net_pnl': net_pnl, 'n_trades': n_trades, 'train_samples': len(X_train_valid), 'test_samples': len(X_test) } def test_horizons(features, horizons): """Test multiple horizons and return comparison.""" results = [] print("\n" + "=" * 80) print("WALK-FORWARD HORIZON OPTIMIZATION") print(f"Train Ratio: {TRAIN_RATIO*100:.0f}% | Profit Target: {PROFIT_THRESHOLD*100:.1f}% | Stop Loss: {STOP_LOSS_PCT*100:.1f}% | Fee Rate: {FEE_RATE*100:.2f}%") print("=" * 80) for h in horizons: result = test_horizon(features, h) if result: results.append(result) print(f"Horizon {h:3d}h: F1={result['f1_score']:.3f}, " f"MAE={result['avg_mae']:.2f}%, " f"Net PnL={result['net_pnl']*100:.2f}%, " f"Trades={result['n_trades']}") return results def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Regime detection research - test multiple horizons" ) parser.add_argument( "--days", type=int, default=DEFAULT_DAYS, help=f"Number of days of data (default: {DEFAULT_DAYS})" ) parser.add_argument( "--start", type=str, default=None, help="Start date (YYYY-MM-DD), overrides --days" ) parser.add_argument( "--end", type=str, default=None, help="End date (YYYY-MM-DD), defaults to now" ) parser.add_argument( "--output", type=str, default="research/horizon_optimization_results.csv", help="Output CSV path" ) parser.add_argument( "--output-horizon", type=str, default=None, help="Path to save the best horizon (integer) to a file" ) return parser.parse_args() def main(): """Main research function.""" args = parse_args() # Load data with dynamic date range df_btc, df_eth = load_data( days=args.days, start_date=args.start, end_date=args.end ) cq_df = load_cryptoquant_data() # Calculate features features = calculate_features(df_btc, df_eth, cq_df) logger.info(f"Calculated {len(features)} feature rows with {len(features.columns)} columns") # Test horizons from 6h to 150h horizons = list(range(6, 151, 6)) # 6, 12, 18, ..., 150 results = test_horizons(features, horizons) if not results: print("No valid results!") return None # Find best by different metrics results_df = pd.DataFrame(results) print("\n" + "=" * 80) print("BEST HORIZONS BY METRIC") print("=" * 80) best_f1 = results_df.loc[results_df['f1_score'].idxmax()] print(f"Best F1 Score: {best_f1['horizon']:.0f}h (F1={best_f1['f1_score']:.3f})") best_pnl = results_df.loc[results_df['net_pnl'].idxmax()] print(f"Best Net PnL: {best_pnl['horizon']:.0f}h (PnL={best_pnl['net_pnl']*100:.2f}%)") lowest_mae = results_df.loc[results_df['avg_mae'].idxmin()] print(f"Lowest MAE: {lowest_mae['horizon']:.0f}h (MAE={lowest_mae['avg_mae']:.2f}%)") # Save results results_df.to_csv(args.output, index=False) print(f"\nResults saved to {args.output}") # Save best horizon if requested if args.output_horizon: best_h = int(best_pnl['horizon']) with open(args.output_horizon, 'w') as f: f.write(str(best_h)) print(f"Best horizon {best_h}h saved to {args.output_horizon}") return results_df if __name__ == "__main__": main()