#!/usr/bin/env python3 from __future__ import annotations import argparse from dataclasses import dataclass from pathlib import Path import numpy as np import pandas as pd from hmmlearn.hmm import GaussianHMM from sklearn.preprocessing import StandardScaler # ------------------------------- CLI ----------------------------------------- @dataclass class CLI: btc_csv: Path eth_csv: Path resample_rules: list[str] n_states: int horizon_min: int # CV params cv_splits: int cv_test_bars: int cv_gap_bars: int cv_seed: int cv_since: str | None # restrict sampling to recent era def parse_args() -> CLI: p = argparse.ArgumentParser(description="BTC/ETH regime modeling with randomized time splits") p.add_argument("--btc", type=Path, default=Path("btcusd_1-min_data.csv")) p.add_argument("--eth", type=Path, default=Path("ethusd_1min_ohlc.csv")) p.add_argument("--rules", default="30min,45min,1H", help="Comma-separated pandas offsets") p.add_argument("--states", type=int, default=3) p.add_argument("--horizon", type=int, default=60) # randomized CV controls p.add_argument("--cv_splits", type=int, default=8, help="number of random test windows") p.add_argument("--cv_test_bars", type=int, default=500, help="length of each test window in bars") p.add_argument("--cv_gap_bars", type=int, default=24, help="embargo gap before test window") p.add_argument("--cv_seed", type=int, default=7, help="rng seed for reproducibility") p.add_argument("--cv_since", default=None, help="only sample test starts at/after this date (e.g. 2023-01-01)") a = p.parse_args() rules = [r.strip() for r in a.rules.split(",") if r.strip()] return CLI(a.btc, a.eth, rules, a.states, a.horizon, a.cv_splits, a.cv_test_bars, a.cv_gap_bars, a.cv_seed, a.cv_since) # ------------------------------ IO / CLEAN ----------------------------------- def _norm_headers(df: pd.DataFrame) -> pd.DataFrame: df = df.rename(columns={c: c.strip().lower() for c in df.columns}) if "unix" in df.columns: df = df.rename(columns={"unix": "timestamp"}) if "date" in df.columns: df = df.rename(columns={"date": "timestamp"}) return df def _load_bitstamp_csv(path: Path, prefix: str) -> pd.DataFrame: df = pd.read_csv(path) df = _norm_headers(df) if "timestamp" not in df.columns: raise ValueError(f"Missing timestamp in {path}") df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True, errors="coerce") df = df.dropna(subset=["timestamp"]).set_index("timestamp").sort_index() for c in ("open","high","low","close","volume"): if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce", downcast="float") df = df[["open","high","low","close","volume"]].dropna() return df.add_prefix(prefix + "_") def _align_minutely(btc: pd.DataFrame, eth: pd.DataFrame) -> pd.DataFrame: idx = btc.index.intersection(eth.index) df = btc.reindex(idx).join(eth.reindex(idx), how="inner") return df.ffill(limit=60).dropna() # --------------------------- FEATURES (same as before) ----------------------- def build_features(df: pd.DataFrame, rule: str, horizon_min: int) -> pd.DataFrame: df = df.copy() df["btc_ret"] = np.log(df["btc_close"]).diff() df["eth_ret"] = np.log(df["eth_close"]).diff() df["ratio"] = df["eth_close"]/df["btc_close"] df["ratio_ret"] = np.log(df["ratio"]).diff() for win in (15,30,60,120,240,360): df[f"rv_{win}m"] = df["ratio_ret"].rolling(win, min_periods=win).std() for win in (60,240,1440): ma = df["ratio"].rolling(win, min_periods=win).mean() df[f"trend_{win}m"] = df["ratio"]/(ma+1e-12)-1.0 for win in (60,120,240): df[f"corr_{win}m"] = df["btc_ret"].rolling(win, min_periods=win).corr(df["eth_ret"]) cov_120 = df["eth_ret"].rolling(120, min_periods=120).cov(df["btc_ret"]) var_120 = df["btc_ret"].rolling(120, min_periods=120).var() df["beta_2h"] = cov_120/(var_120+1e-12) std_b = df["btc_ret"].rolling(120, min_periods=120).std() std_e = df["eth_ret"].rolling(120, min_periods=120).std() df["divergence_2h"] = np.abs(df["btc_ret"]/(std_b+1e-12) - df["eth_ret"]/(std_e+1e-12)) df["volratio"] = np.log((df["eth_volume"]+1e-9)/(df["btc_volume"]+1e-9)) df["vol_sum"] = np.log(df["eth_volume"]+df["btc_volume"]+1e-9) df["vol_diff"] = (df["eth_volume"]-df["btc_volume"])/(df["eth_volume"]+df["btc_volume"]+1e-9) df["rv_2h"] = df.get("rv_120m", df["ratio_ret"].rolling(120, min_periods=120).std()) df["corr_2h"] = df.get("corr_120m", df["btc_ret"].rolling(120, min_periods=120).corr(df["eth_ret"])) df["ratio_trend"] = df.get("trend_1440m", df["ratio"]/(df["ratio"].rolling(1440, min_periods=1440).mean()+1e-12)-1.0) agg = {"btc_close":"last","eth_close":"last","ratio":"last","ratio_ret":"sum"} for c in df.columns: if c not in agg: agg[c] = "mean" g = df.resample(rule).agg(agg).dropna() step_min = max(1, int(pd.Timedelta(rule).total_seconds()//60)) ahead = max(1, int(round(horizon_min/step_min))) g["fut_ret"] = g["ratio_ret"].shift(-ahead) return g.dropna() def feature_matrix(g: pd.DataFrame) -> tuple[np.ndarray,np.ndarray,list[str]]: ban = {"fut_ret","btc_close","eth_close","ratio"} keep = ("rv_","corr_","trend_","beta_","divergence_","vol") feats = [] if "ratio_ret" in g.columns: feats.append("ratio_ret") feats += [c for c in g.columns if c not in ban and c!="ratio_ret" and any(c.startswith(p) for p in keep)] if not feats: feats = ["ratio_ret","rv_30m","rv_2h","corr_2h","ratio_trend","volratio"] X = g[feats].astype(np.float32).values y = g["fut_ret"].astype(np.float32).values return X,y,feats # ------------------------- Randomized time splits ----------------------------- def sample_random_splits( g: pd.DataFrame, n_splits: int, test_bars: int, gap_bars: int, seed: int, since: str | None ): rng = np.random.default_rng(seed) idx = g.index if since is not None: idx = idx[idx >= pd.Timestamp(since, tz="UTC")] # valid start indices ensure full test window fits valid = np.arange(len(idx) - test_bars) if len(valid) <= 0: raise ValueError("Not enough data for requested test window") starts = rng.choice(valid, size=min(n_splits, len(valid)), replace=False) for s in np.sort(starts): test_start = idx[s] test_end = idx[s + test_bars - 1] # train uses all data strictly before (test_start - gap) embargo_end = idx[max(0, s - gap_bars - 1)] if s - gap_bars - 1 >= 0 else None train = g.loc[:embargo_end] if embargo_end is not None else g.iloc[0:0] test = g.loc[test_start:test_end] if len(train) == 0 or len(test) < test_bars: # skip degenerate continue yield train, test, (test_start, test_end) # ------------------------------ Model / Fit ----------------------------------- def fit_and_predict_train_test(train: pd.DataFrame, test: pd.DataFrame, n_states: int): Xtr, ytr, feats = feature_matrix(train) Xte, yte, _ = feature_matrix(test) scaler = StandardScaler() Xtr_s = scaler.fit_transform(Xtr) Xte_s = scaler.transform(Xte) hmm = GaussianHMM(n_components=n_states, covariance_type="diag", n_iter=300, random_state=7) hmm.fit(Xtr_s) st_tr = hmm.predict(Xtr_s) st_te = hmm.predict(Xte_s) means = {s: float(np.nanmean(ytr[st_tr == s])) for s in range(n_states)} small = np.nanpercentile(np.abs(list(means.values())), 30) state_to_stance = {s: (1 if m > +small else (-1 if m < -small else 0)) for s, m in means.items()} preds = np.vectorize(state_to_stance.get)(st_te).astype(np.int8) return preds, yte, state_to_stance, feats def metrics(y: np.ndarray, preds: np.ndarray, rule: str) -> dict[str,float]: T = min(len(y), len(preds)); y, preds = y[:T], preds[:T] pnl = preds * y hit = (np.sign(preds) == np.sign(y)).mean() if T else np.nan bars_per_day = int(round(24 * 60 / max(1, int(pd.Timedelta(rule).total_seconds() // 60)))) ann = np.sqrt(365 * bars_per_day) sharpe = float(np.nanmean(pnl) / (np.nanstd(pnl) + 1e-12) * ann) return {"hit_rate": float(hit), "ann_sharpe": sharpe, "n_points": int(T)} # ------------------------------ Runner --------------------------------------- def run_rule_mc(minute: pd.DataFrame, rule: str, n_states: int, horizon_min: int, cv) -> dict: g = build_features(minute, rule, horizon_min) rows = [] for train, test, (ts, te) in sample_random_splits(g, cv.cv_splits, cv.cv_test_bars, cv.cv_gap_bars, cv.cv_seed, cv.cv_since): preds, ytest, state_map, feats = fit_and_predict_train_test(train, test, n_states) m = metrics(ytest, preds, rule) rows.append({"hit_rate": m["hit_rate"], "ann_sharpe": m["ann_sharpe"], "n_points": m["n_points"], "test_span": (ts, te)}) if not rows: return {"rule": rule, "hit_mean": np.nan, "sharpe_mean": np.nan, "splits": 0, "hit_std": np.nan, "sharpe_std": np.nan} hits = np.array([r["hit_rate"] for r in rows], dtype=float) sharpes = np.array([r["ann_sharpe"] for r in rows], dtype=float) return { "rule": rule, "hit_mean": float(np.nanmean(hits)), "hit_std": float(np.nanstd(hits)), "sharpe_mean": float(np.nanmean(sharpes)), "sharpe_std": float(np.nanstd(sharpes)), "splits": len(rows), } # ------------------------------ MAIN ----------------------------------------- def main(args: CLI) -> None: btc = _load_bitstamp_csv(args.btc_csv, "btc") eth = _load_bitstamp_csv(args.eth_csv, "eth") minute = _align_minutely(btc, eth) class CV: pass cv = CV(); cv.cv_splits=args.cv_splits; cv.cv_test_bars=args.cv_test_bars; cv.cv_gap_bars=args.cv_gap_bars cv.cv_seed=args.cv_seed; cv.cv_since=args.cv_since results = [run_rule_mc(minute, rule, args.n_states, args.horizon_min, cv) for rule in args.resample_rules] df = pd.DataFrame(results).sort_values(by="sharpe_mean", ascending=False) print("# Randomized time-split comparison") print(f"States={args.n_states} HorizonMin={args.horizon_min} Splits={args.cv_splits} TestBars={args.cv_test_bars} GapBars={args.cv_gap_bars} Since={args.cv_since}") if not df.empty: df["hit"] = df["hit_mean"].round(4).astype(str) + " ± " + df["hit_std"].round(4).astype(str) df["sharpe"]= df["sharpe_mean"].round(4).astype(str) + " ± " + df["sharpe_std"].round(4).astype(str) print(df[["rule","splits","hit","sharpe"]].to_string(index=False)) else: print("No valid splits found") if __name__ == "__main__": args = parse_args() main(args)