#!/usr/bin/env python3 from __future__ import annotations import argparse from dataclasses import dataclass from pathlib import Path import numpy as np import pandas as pd from hmmlearn.hmm import GaussianHMM from sklearn.preprocessing import StandardScaler from sklearn.feature_selection import SelectKBest, mutual_info_regression, RFE from sklearn.linear_model import LinearRegression from sklearn.ensemble import RandomForestRegressor import warnings warnings.filterwarnings('ignore') # ------------------------------- CLI ----------------------------------------- @dataclass class CLI: btc_csv: Path eth_csv: Path resample_rules: list[str] n_states: int horizon_min: int # CV params cv_splits: int cv_test_bars: int cv_gap_bars: int cv_seed: int cv_since: str | None # restrict sampling to recent era # Feature selection params feature_selection_method: str # 'mutual_info', 'rfe', 'random_forest', 'none' n_features: int # number of features to select # Enhanced CV params cv_method: str # 'random', 'expanding', 'rolling' def parse_args() -> CLI: p = argparse.ArgumentParser(description="BTC/ETH regime modeling with robust CV and feature selection") p.add_argument("--btc", type=Path, default=Path("btcusd_1-min_data.csv")) p.add_argument("--eth", type=Path, default=Path("ethusd_1min_ohlc.csv")) p.add_argument("--rules", default="30min,45min,1H", help="Comma-separated pandas offsets") p.add_argument("--states", type=int, default=3) p.add_argument("--horizon", type=int, default=60) # randomized CV controls p.add_argument("--cv_splits", type=int, default=8, help="number of random test windows") p.add_argument("--cv_test_bars", type=int, default=500, help="length of each test window in bars") p.add_argument("--cv_gap_bars", type=int, default=24, help="embargo gap before test window") p.add_argument("--cv_seed", type=int, default=7, help="rng seed for reproducibility") p.add_argument("--cv_since", default=None, help="only sample test starts at/after this date (e.g. 2023-01-01)") # Feature selection p.add_argument("--feature_selection", default="mutual_info", choices=['mutual_info', 'rfe', 'random_forest', 'none'], help="Feature selection method") p.add_argument("--n_features", type=int, default=10, help="Number of features to select") # Enhanced CV method p.add_argument("--cv_method", default="random", choices=['random', 'expanding', 'rolling'], help="Cross-validation method") a = p.parse_args() rules = [r.strip() for r in a.rules.split(",") if r.strip()] return CLI(a.btc, a.eth, rules, a.states, a.horizon, a.cv_splits, a.cv_test_bars, a.cv_gap_bars, a.cv_seed, a.cv_since, a.feature_selection, a.n_features, a.cv_method) # ------------------------------ IO / CLEAN ----------------------------------- def _norm_headers(df: pd.DataFrame) -> pd.DataFrame: df = df.rename(columns={c: c.strip().lower() for c in df.columns}) if "unix" in df.columns: df = df.rename(columns={"unix": "timestamp"}) if "date" in df.columns: df = df.rename(columns={"date": "timestamp"}) return df def _load_bitstamp_csv(path: Path, prefix: str) -> pd.DataFrame: df = pd.read_csv(path) df = _norm_headers(df) if "timestamp" not in df.columns: raise ValueError(f"Missing timestamp in {path}") df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True, errors="coerce") df = df.dropna(subset=["timestamp"]).set_index("timestamp").sort_index() for c in ("open","high","low","close","volume"): if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce", downcast="float") df = df[["open","high","low","close","volume"]].dropna() return df.add_prefix(prefix + "_") def _align_minutely(btc: pd.DataFrame, eth: pd.DataFrame) -> pd.DataFrame: idx = btc.index.intersection(eth.index) df = btc.reindex(idx).join(eth.reindex(idx), how="inner") return df.ffill(limit=60).dropna() # --------------------------- FEATURES (enhanced) ----------------------- def build_features(df: pd.DataFrame, rule: str, horizon_min: int) -> pd.DataFrame: df = df.copy() df["btc_ret"] = np.log(df["btc_close"]).diff() df["eth_ret"] = np.log(df["eth_close"]).diff() df["ratio"] = df["eth_close"]/df["btc_close"] df["ratio_ret"] = np.log(df["ratio"]).diff() # Volatility features for win in (15,30,60,120,240,360): df[f"rv_{win}m"] = df["ratio_ret"].rolling(win, min_periods=win).std() # Trend features for win in (60,240,1440): ma = df["ratio"].rolling(win, min_periods=win).mean() df[f"trend_{win}m"] = df["ratio"]/(ma+1e-12)-1.0 # Correlation features for win in (60,120,240): df[f"corr_{win}m"] = df["btc_ret"].rolling(win, min_periods=win).corr(df["eth_ret"]) # Beta and risk features cov_120 = df["eth_ret"].rolling(120, min_periods=120).cov(df["btc_ret"]) var_120 = df["btc_ret"].rolling(120, min_periods=120).var() df["beta_2h"] = cov_120/(var_120+1e-12) std_b = df["btc_ret"].rolling(120, min_periods=120).std() std_e = df["eth_ret"].rolling(120, min_periods=120).std() df["divergence_2h"] = np.abs(df["btc_ret"]/(std_b+1e-12) - df["eth_ret"]/(std_e+1e-12)) # Volume features df["volratio"] = np.log((df["eth_volume"]+1e-9)/(df["btc_volume"]+1e-9)) df["vol_sum"] = np.log(df["eth_volume"]+df["btc_volume"]+1e-9) df["vol_diff"] = (df["eth_volume"]-df["btc_volume"])/(df["eth_volume"]+df["btc_volume"]+1e-9) # Additional momentum features df["momentum_1h"] = df["ratio_ret"].rolling(60).sum() df["momentum_4h"] = df["ratio_ret"].rolling(240).sum() # Mean reversion features for win in (60, 120, 240): rolling_mean = df["ratio_ret"].rolling(win).mean() rolling_std = df["ratio_ret"].rolling(win).std() df[f"zscore_{win}m"] = (df["ratio_ret"] - rolling_mean) / (rolling_std + 1e-12) # Price position features for win in (240, 1440): high = df["ratio"].rolling(win).max() low = df["ratio"].rolling(win).min() df[f"position_{win}m"] = (df["ratio"] - low) / (high - low + 1e-12) # Aggregate to target timeframe agg = {"btc_close":"last","eth_close":"last","ratio":"last","ratio_ret":"sum"} for c in df.columns: if c not in agg: agg[c] = "mean" g = df.resample(rule).agg(agg).dropna() step_min = max(1, int(pd.Timedelta(rule).total_seconds()//60)) ahead = max(1, int(round(horizon_min/step_min))) g["fut_ret"] = g["ratio_ret"].shift(-ahead) return g.dropna() def feature_matrix(g: pd.DataFrame) -> tuple[np.ndarray,np.ndarray,list[str]]: ban = {"fut_ret","btc_close","eth_close","ratio"} keep = ("rv_","corr_","trend_","beta_","divergence_","vol","momentum_","zscore_","position_") feats = [] if "ratio_ret" in g.columns: feats.append("ratio_ret") feats += [c for c in g.columns if c not in ban and c!="ratio_ret" and any(c.startswith(p) for p in keep)] if not feats: feats = ["ratio_ret","rv_30m","rv_2h","corr_2h","ratio_trend","volratio","momentum_1h","zscore_60m"] X = g[feats].astype(np.float32).values y = g["fut_ret"].astype(np.float32).values return X, y, feats # ------------------------- Enhanced Feature Selection ------------------------ def select_features(X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, feature_names: list[str], method: str, n_features: int) -> tuple[np.ndarray, np.ndarray, list[str]]: """ Apply feature selection to training and test data """ if method == "none" or n_features >= len(feature_names): return X_train, X_test, feature_names if n_features <= 0: n_features = max(1, len(feature_names) // 2) try: if method == "mutual_info": selector = SelectKBest(score_func=mutual_info_regression, k=n_features) X_train_selected = selector.fit_transform(X_train, y_train) X_test_selected = selector.transform(X_test) selected_indices = selector.get_support(indices=True) selected_features = [feature_names[i] for i in selected_indices] elif method == "rfe": # Use linear regression as base estimator for RFE estimator = LinearRegression() selector = RFE(estimator, n_features_to_select=n_features, step=1) X_train_selected = selector.fit_transform(X_train, y_train) X_test_selected = selector.transform(X_test) selected_indices = selector.get_support(indices=True) selected_features = [feature_names[i] for i in selected_indices] elif method == "random_forest": # Use feature importance from random forest rf = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1) rf.fit(X_train, y_train) importances = rf.feature_importances_ selected_indices = np.argsort(importances)[-n_features:] X_train_selected = X_train[:, selected_indices] X_test_selected = X_test[:, selected_indices] selected_features = [feature_names[i] for i in selected_indices] else: return X_train, X_test, feature_names print(f" Selected {len(selected_features)} features: {selected_features}") return X_train_selected, X_test_selected, selected_features except Exception as e: print(f" Feature selection failed: {e}, using all features") return X_train, X_test, feature_names # ------------------------- Robust Cross-Validation Methods ------------------- def sample_random_splits( g: pd.DataFrame, n_splits: int, test_bars: int, gap_bars: int, seed: int, since: str | None ): """Original random sampling method""" rng = np.random.default_rng(seed) idx = g.index if since is not None: idx = idx[idx >= pd.Timestamp(since, tz="UTC")] valid = np.arange(len(idx) - test_bars) if len(valid) <= 0: raise ValueError("Not enough data for requested test window") starts = rng.choice(valid, size=min(n_splits, len(valid)), replace=False) for s in np.sort(starts): test_start = idx[s] test_end = idx[s + test_bars - 1] embargo_end = idx[max(0, s - gap_bars - 1)] if s - gap_bars - 1 >= 0 else None train = g.loc[:embargo_end] if embargo_end is not None else g.iloc[0:0] test = g.loc[test_start:test_end] if len(train) == 0 or len(test) < test_bars: continue yield train, test, (test_start, test_end) def sample_expanding_window_splits( g: pd.DataFrame, n_splits: int, test_bars: int, gap_bars: int, since: str | None ): """Expanding window CV - training set grows over time""" idx = g.index if since is not None: idx = idx[idx >= pd.Timestamp(since, tz="UTC")] total_bars = len(idx) min_train_size = test_bars * 2 # Minimum training set size # Calculate split points available_splits = max(1, (total_bars - min_train_size - test_bars - gap_bars) // test_bars) n_splits = min(n_splits, available_splits) if n_splits <= 0: raise ValueError("Not enough data for expanding window splits") for i in range(n_splits): test_start_idx = min_train_size + gap_bars + (i * test_bars) test_end_idx = test_start_idx + test_bars - 1 if test_end_idx >= total_bars: break train_end_idx = test_start_idx - gap_bars - 1 train = g.iloc[:train_end_idx + 1] test = g.iloc[test_start_idx:test_end_idx + 1] if len(train) < min_train_size or len(test) < test_bars: continue yield train, test, (idx[test_start_idx], idx[test_end_idx]) def sample_rolling_window_splits( g: pd.DataFrame, n_splits: int, test_bars: int, gap_bars: int, since: str | None ): """Rolling window CV - fixed training window size""" idx = g.index if since is not None: idx = idx[idx >= pd.Timestamp(since, tz="UTC")] total_bars = len(idx) train_bars = test_bars * 3 # Fixed training window size # Calculate split points available_splits = max(1, (total_bars - train_bars - test_bars - gap_bars) // test_bars) n_splits = min(n_splits, available_splits) if n_splits <= 0: raise ValueError("Not enough data for rolling window splits") for i in range(n_splits): train_start_idx = i * test_bars train_end_idx = train_start_idx + train_bars - 1 test_start_idx = train_end_idx + gap_bars + 1 test_end_idx = test_start_idx + test_bars - 1 if test_end_idx >= total_bars: break train = g.iloc[train_start_idx:train_end_idx + 1] test = g.iloc[test_start_idx:test_end_idx + 1] if len(train) < train_bars or len(test) < test_bars: continue yield train, test, (idx[test_start_idx], idx[test_end_idx]) def get_cv_splits(g: pd.DataFrame, cv_method: str, n_splits: int, test_bars: int, gap_bars: int, seed: int, since: str | None): """Dispatch to appropriate CV method""" if cv_method == "random": return sample_random_splits(g, n_splits, test_bars, gap_bars, seed, since) elif cv_method == "expanding": return sample_expanding_window_splits(g, n_splits, test_bars, gap_bars, since) elif cv_method == "rolling": return sample_rolling_window_splits(g, n_splits, test_bars, gap_bars, since) else: raise ValueError(f"Unknown CV method: {cv_method}") # ------------------------------ Model / Fit ----------------------------------- def fit_and_predict_train_test(train: pd.DataFrame, test: pd.DataFrame, n_states: int, feature_selection_method: str, n_features: int): Xtr, ytr, feats = feature_matrix(train) Xte, yte, _ = feature_matrix(test) # Apply feature selection Xtr_sel, Xte_sel, selected_feats = select_features( Xtr, ytr, Xte, feats, feature_selection_method, n_features ) scaler = StandardScaler() Xtr_s = scaler.fit_transform(Xtr_sel) Xte_s = scaler.transform(Xte_sel) hmm = GaussianHMM(n_components=n_states, covariance_type="diag", n_iter=300, random_state=7) hmm.fit(Xtr_s) st_tr = hmm.predict(Xtr_s) st_te = hmm.predict(Xte_s) means = {s: float(np.nanmean(ytr[st_tr == s])) for s in range(n_states)} small = np.nanpercentile(np.abs(list(means.values())), 30) state_to_stance = {s: (1 if m > +small else (-1 if m < -small else 0)) for s, m in means.items()} preds = np.vectorize(state_to_stance.get)(st_te).astype(np.int8) return preds, yte, state_to_stance, selected_feats, hmm def metrics(y: np.ndarray, preds: np.ndarray, rule: str) -> dict[str,float]: T = min(len(y), len(preds)); y, preds = y[:T], preds[:T] pnl = preds * y hit = (np.sign(preds) == np.sign(y)).mean() if T else np.nan bars_per_day = int(round(24 * 60 / max(1, int(pd.Timedelta(rule).total_seconds() // 60)))) ann = np.sqrt(365 * bars_per_day) sharpe = float(np.nanmean(pnl) / (np.nanstd(pnl) + 1e-12) * ann) # Additional metrics positive_returns = (pnl > 0).sum() total_trades = len(pnl) win_rate = positive_returns / total_trades if total_trades > 0 else 0 profit_factor = abs(pnl[pnl > 0].sum() / (pnl[pnl < 0].sum() + 1e-12)) return { "hit_rate": float(hit), "ann_sharpe": sharpe, "n_points": int(T), "win_rate": win_rate, "profit_factor": profit_factor, "total_return": float(pnl.sum()) } # ------------------------------ Runner --------------------------------------- def run_rule_mc(minute: pd.DataFrame, rule: str, n_states: int, horizon_min: int, cv, feature_selection_method: str, n_features: int) -> dict: g = build_features(minute, rule, horizon_min) rows = [] feature_importance = {} for i, (train, test, (ts, te)) in enumerate(get_cv_splits(g, cv.cv_method, cv.cv_splits, cv.cv_test_bars, cv.cv_gap_bars, cv.cv_seed, cv.cv_since)): print(f" Split {i+1}: Train {len(train)} bars, Test {len(test)} bars") preds, ytest, state_map, feats, hmm = fit_and_predict_train_test( train, test, n_states, feature_selection_method, n_features ) m = metrics(ytest, preds, rule) rows.append({ "hit_rate": m["hit_rate"], "ann_sharpe": m["ann_sharpe"], "n_points": m["n_points"], "test_span": (ts, te), "win_rate": m["win_rate"], "profit_factor": m["profit_factor"], "total_return": m["total_return"] }) # Track feature usage for feat in feats: feature_importance[feat] = feature_importance.get(feat, 0) + 1 if not rows: return { "rule": rule, "hit_mean": np.nan, "sharpe_mean": np.nan, "splits": 0, "hit_std": np.nan, "sharpe_std": np.nan, "win_rate_mean": np.nan, "profit_factor_mean": np.nan } hits = np.array([r["hit_rate"] for r in rows], dtype=float) sharpes = np.array([r["ann_sharpe"] for r in rows], dtype=float) win_rates = np.array([r["win_rate"] for r in rows], dtype=float) profit_factors = np.array([r["profit_factor"] for r in rows], dtype=float) # Sort features by importance sorted_features = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True) top_features = [f[0] for f in sorted_features[:5]] # Top 5 most frequently selected features return { "rule": rule, "hit_mean": float(np.nanmean(hits)), "hit_std": float(np.nanstd(hits)), "sharpe_mean": float(np.nanmean(sharpes)), "sharpe_std": float(np.nanstd(sharpes)), "win_rate_mean": float(np.nanmean(win_rates)), "profit_factor_mean": float(np.nanmean(profit_factors)), "splits": len(rows), "top_features": top_features } # ------------------------------ MAIN ----------------------------------------- def main(args: CLI) -> None: print("Loading data...") btc = _load_bitstamp_csv(args.btc_csv, "btc") eth = _load_bitstamp_csv(args.eth_csv, "eth") minute = _align_minutely(btc, eth) print(f"Aligned data: {len(minute)} minutes") class CV: pass cv = CV() cv.cv_splits = args.cv_splits cv.cv_test_bars = args.cv_test_bars cv.cv_gap_bars = args.cv_gap_bars cv.cv_seed = args.cv_seed cv.cv_since = args.cv_since cv.cv_method = args.cv_method results = [] for rule in args.resample_rules: print(f"\nProcessing rule: {rule}") result = run_rule_mc(minute, rule, args.n_states, args.horizon_min, cv, args.feature_selection_method, args.n_features) results.append(result) df = pd.DataFrame(results).sort_values(by="sharpe_mean", ascending=False) print("\n" + "="*80) print("ENHANCED RESULTS: Randomized time-split comparison with Feature Selection") print("="*80) print(f"States={args.n_states} | HorizonMin={args.horizon_min} | Splits={args.cv_splits}") print(f"TestBars={args.cv_test_bars} | GapBars={args.cv_gap_bars} | Since={args.cv_since}") print(f"CV Method={args.cv_method} | Feature Selection={args.feature_selection_method} | N Features={args.n_features}") print("="*80) if not df.empty: df["hit"] = df["hit_mean"].round(4).astype(str) + " ± " + df["hit_std"].round(4).astype(str) df["sharpe"] = df["sharpe_mean"].round(4).astype(str) + " ± " + df["sharpe_std"].round(4).astype(str) df["win_rate"] = (df["win_rate_mean"] * 100).round(2).astype(str) + "%" df["profit_factor"] = df["profit_factor_mean"].round(3).astype(str) display_cols = ["rule", "splits", "hit", "sharpe", "win_rate", "profit_factor"] print(df[display_cols].to_string(index=False)) # Show top features for best performing rule best_rule = df.iloc[0] print(f"\nTop features for best rule '{best_rule['rule']}':") for i, feat in enumerate(best_rule.get('top_features', [])[:5]): print(f" {i+1}. {feat}") else: print("No valid splits found") if __name__ == "__main__": args = parse_args() main(args)