421 lines
14 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pandas as pd
from hmmlearn.hmm import GaussianHMM
from sklearn.preprocessing import StandardScaler
import joblib
# ============================== CLI ==========================================
@dataclass
class CLI:
btc_csv: Path
eth_csv: Path
resample_rules: list[str]
n_states: int
horizon_min: int
folder_save_path: str | None
# CV params
cv_splits: int
cv_test_bars: int
cv_gap_bars: int
cv_seed: int
cv_since: str | None # restrict sampling to recent era
def parse_args() -> CLI:
p = argparse.ArgumentParser(description="BTC/ETH regime modeling with properly embargoed time splits")
p.add_argument("--btc", type=Path, default=Path("btcusd_1-min_data.csv"))
p.add_argument("--eth", type=Path, default=Path("ethusd_1min_ohlc.csv"))
p.add_argument("--rules", default="30min,45min,1H", help="Comma-separated pandas offsets")
p.add_argument("--states", type=int, default=3)
p.add_argument("--horizon", type=int, default=60, help="Forward horizon in minutes for the target")
p.add_argument("--folder_save_path", default=None, help="Folder path to save fitted HMM models (optional)")
# randomized CV controls
p.add_argument("--cv_splits", type=int, default=8, help="number of random test windows")
p.add_argument("--cv_test_bars", type=int, default=500, help="length of each test window in bars")
p.add_argument("--cv_gap_bars", type=int, default=24, help="extra embargo bars beyond the minimum computed gap")
p.add_argument("--cv_seed", type=int, default=7, help="rng seed for reproducibility")
p.add_argument("--cv_since", default=None, help="only sample test starts at/after this date (e.g. 2023-01-01)")
a = p.parse_args()
rules = [r.strip() for r in a.rules.split(",") if r.strip()]
return CLI(
btc_csv=a.btc,
eth_csv=a.eth,
resample_rules=rules,
n_states=a.states,
horizon_min=a.horizon,
folder_save_path=a.folder_save_path,
cv_splits=a.cv_splits,
cv_test_bars=a.cv_test_bars,
cv_gap_bars=a.cv_gap_bars,
cv_seed=a.cv_seed,
cv_since=a.cv_since,
)
# ============================ IO / CLEAN =====================================
def _norm_headers(df: pd.DataFrame) -> pd.DataFrame:
df = df.rename(columns={c: c.strip().lower() for c in df.columns})
if "unix" in df.columns:
df = df.rename(columns={"unix": "timestamp"})
if "date" in df.columns:
df = df.rename(columns={"date": "timestamp"})
return df
def _load_bitstamp_csv(path: Path, prefix: str) -> pd.DataFrame:
df = pd.read_csv(path)
df = _norm_headers(df)
if "timestamp" not in df.columns:
raise ValueError(f"Missing timestamp in {path}")
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True, errors="coerce")
df = df.dropna(subset=["timestamp"]).set_index("timestamp").sort_index()
for c in ("open", "high", "low", "close", "volume"):
if c in df.columns:
df[c] = pd.to_numeric(df[c], errors="coerce", downcast="float")
df = df[["open", "high", "low", "close", "volume"]].dropna()
return df.add_prefix(prefix + "_")
def _align_minutely(btc: pd.DataFrame, eth: pd.DataFrame) -> pd.DataFrame:
idx = btc.index.intersection(eth.index)
df = btc.reindex(idx).join(eth.reindex(idx), how="inner")
return df.ffill(limit=60).dropna()
# ======================= FEATURES / TARGET ===================================
def build_features(df: pd.DataFrame, rule: str, horizon_min: int) -> pd.DataFrame:
df = df.copy()
# base returns
df["btc_ret"] = np.log(df["btc_close"]).diff()
df["eth_ret"] = np.log(df["eth_close"]).diff()
df["ratio"] = df["eth_close"] / df["btc_close"]
df["ratio_ret"] = np.log(df["ratio"]).diff()
# volatility (minutes)
for win in (15, 30, 60, 120, 240, 360):
df[f"rv_{win}m"] = df["ratio_ret"].rolling(win, min_periods=win).std()
# trend vs long MA (minutes)
for win in (60, 240, 1440):
ma = df["ratio"].rolling(win, min_periods=win).mean()
df[f"trend_{win}m"] = df["ratio"] / (ma + 1e-12) - 1.0
# rolling correlation (minutes)
for win in (60, 120, 240):
df[f"corr_{win}m"] = df["btc_ret"].rolling(win, min_periods=win).corr(df["eth_ret"])
# beta-like measure over 120m
cov_120 = df["eth_ret"].rolling(120, min_periods=120).cov(df["btc_ret"])
var_120 = df["btc_ret"].rolling(120, min_periods=120).var()
df["beta_2h"] = cov_120 / (var_120 + 1e-12)
# divergence and volume structure
std_b = df["btc_ret"].rolling(120, min_periods=120).std()
std_e = df["eth_ret"].rolling(120, min_periods=120).std()
df["divergence_2h"] = np.abs(df["btc_ret"] / (std_b + 1e-12) - df["eth_ret"] / (std_e + 1e-12))
df["volratio"] = np.log((df["eth_volume"] + 1e-9) / (df["btc_volume"] + 1e-9))
df["vol_sum"] = np.log(df["eth_volume"] + df["btc_volume"] + 1e-9)
df["vol_diff"] = (df["eth_volume"] - df["btc_volume"]) / (df["eth_volume"] + df["btc_volume"] + 1e-9)
# convenience aliases
df["rv_2h"] = df.get("rv_120m", df["ratio_ret"].rolling(120, min_periods=120).std())
df["corr_2h"] = df.get("corr_120m", df["btc_ret"].rolling(120, min_periods=120).corr(df["eth_ret"]))
df["ratio_trend"] = df.get(
"trend_1440m",
df["ratio"] / (df["ratio"].rolling(1440, min_periods=1440).mean() + 1e-12) - 1.0,
)
# aggregate to rule
agg = {"btc_close": "last", "eth_close": "last", "ratio": "last", "ratio_ret": "sum"}
for c in df.columns:
if c not in agg:
agg[c] = "mean"
g = df.resample(rule).agg(agg).dropna()
step_min = max(1, int(pd.Timedelta(rule).total_seconds() // 60))
ahead = max(1, int(round(horizon_min / step_min)))
g["fut_ret"] = g["ratio_ret"].shift(-ahead)
return g.dropna()
def feature_matrix(g: pd.DataFrame) -> tuple[np.ndarray, np.ndarray, list[str]]:
ban = {"fut_ret", "btc_close", "eth_close", "ratio"}
keep = ("rv_", "corr_", "trend_", "beta_", "divergence_", "vol")
feats: list[str] = []
if "ratio_ret" in g.columns:
feats.append("ratio_ret")
feats += [
c for c in g.columns
if c not in ban and c != "ratio_ret" and any(c.startswith(p) for p in keep)
]
if not feats:
feats = ["ratio_ret", "rv_30m", "rv_2h", "corr_2h", "ratio_trend", "volratio"]
X = g[feats].astype(np.float32).values
y = g["fut_ret"].astype(np.float32).values
return X, y, feats
# ====================== OVERLAP-/LEAKAGE-AWARE UTILITIES =====================
def max_lookback_minutes() -> int:
# From feature construction: the maximum rolling window is 1440 minutes.
return 1440
def bars_from_minutes(rule: str, minutes: int) -> int:
step_min = max(1, int(pd.Timedelta(rule).total_seconds() // 60))
return int(np.ceil(minutes / step_min))
def sample_random_splits(
g: pd.DataFrame,
rule: str,
n_splits: int,
test_bars: int,
gap_bars_extra: int,
seed: int,
since: str | None,
horizon_min: int,
):
"""
Random test windows with an embargo that guarantees disjoint information sets.
Embargo (in bars) = max(gap_bars_extra, ceil((max_lookback + horizon_min)/rule_minutes)).
Train uses only data strictly before (test_start - embargo).
"""
rng = np.random.default_rng(seed)
idx = g.index
if since is not None:
idx = idx[idx >= pd.Timestamp(since, tz="UTC")]
if len(idx) <= test_bars:
return
# Compute minimal embargo based on lookback + horizon
gap_min_bars = bars_from_minutes(rule, max_lookback_minutes() + horizon_min)
embargo_bars = int(max(gap_bars_extra, gap_min_bars))
# Valid start indices ensure full test window fits
valid = np.arange(len(idx) - test_bars)
if len(valid) <= 0:
return
starts = rng.choice(valid, size=min(n_splits, len(valid)), replace=False)
starts = np.sort(starts)
for s in starts:
test_start = idx[s]
test_end = idx[s + test_bars - 1]
# Train: strictly before test_start - embargo_bars
left_end_pos = s - embargo_bars - 1
if left_end_pos < 0:
# No room for non-overlapping training information
continue
embargo_end = idx[left_end_pos]
train = g.loc[:embargo_end]
test = g.loc[test_start:test_end]
if len(train) == 0 or len(test) < test_bars:
continue
yield train, test, (test_start, test_end), embargo_bars
# ============================ MODEL / FIT ====================================
def fit_and_predict_train_test(
train: pd.DataFrame,
test: pd.DataFrame,
n_states: int,
full_save_path: str | None = None,
):
Xtr, ytr, feats = feature_matrix(train)
Xte, yte, _ = feature_matrix(test)
scaler = StandardScaler()
Xtr_s = scaler.fit_transform(Xtr)
Xte_s = scaler.transform(Xte)
hmm = GaussianHMM(n_components=n_states, covariance_type="diag", n_iter=300, random_state=7)
hmm.fit(Xtr_s)
st_tr = hmm.predict(Xtr_s)
st_te = hmm.predict(Xte_s)
# Map HMM states to stances using state-wise mean of future returns in TRAIN
means = {s: float(np.nanmean(ytr[st_tr == s])) for s in range(n_states)}
small = np.nanpercentile(np.abs(list(means.values())), 30)
state_to_stance = {s: (1 if m > +small else (-1 if m < -small else 0)) for s, m in means.items()}
preds = np.vectorize(state_to_stance.get)(st_te).astype(np.int8)
if full_save_path:
Path(full_save_path).parent.mkdir(parents=True, exist_ok=True)
joblib.dump(
{"hmm": hmm, "scaler": scaler, "features": feats, "state_to_stance": state_to_stance},
full_save_path,
)
print(f"Model saved: {full_save_path}")
return preds, yte, state_to_stance, feats
# ============================= METRICS =======================================
def metrics_nonoverlap(y: np.ndarray, preds: np.ndarray, rule: str, horizon_min: int) -> dict[str, float]:
"""
Score only every 'ahead'-th point to remove overlap of forward windows.
Adjust annualization for reduced sampling frequency.
"""
T = min(len(y), len(preds))
if T == 0:
return {"hit_rate": np.nan, "ann_sharpe": np.nan, "n_points": 0}
y = y[:T]
preds = preds[:T]
step_min = max(1, int(pd.Timedelta(rule).total_seconds() // 60))
ahead = max(1, int(round(horizon_min / step_min)))
# Use the last index of each non-overlapping forward window
idx = np.arange(ahead - 1, T, ahead)
if len(idx) == 0:
return {"hit_rate": np.nan, "ann_sharpe": np.nan, "n_points": 0}
y_s = y[idx]
p_s = preds[idx]
pnl = p_s * y_s
hit = float((np.sign(p_s) == np.sign(y_s)).mean())
bars_per_day = int(round(24 * 60 / step_min))
# We only take one observation per 'ahead' bars
eff_obs_per_day = bars_per_day / ahead
ann = np.sqrt(365 * max(eff_obs_per_day, 1e-12))
sharpe = float(np.nanmean(pnl) / (np.nanstd(pnl) + 1e-12) * ann)
return {"hit_rate": hit, "ann_sharpe": sharpe, "n_points": int(len(idx))}
# ============================== RUNNER =======================================
def run_rule_mc(
minute: pd.DataFrame,
rule: str,
n_states: int,
horizon_min: int,
cv: object,
folder_save_path: str | None,
) -> dict:
g = build_features(minute, rule, horizon_min)
rows = []
for train, test, (ts, te), embargo_bars in sample_random_splits(
g=g,
rule=rule,
n_splits=cv.cv_splits,
test_bars=cv.cv_test_bars,
gap_bars_extra=cv.cv_gap_bars,
seed=cv.cv_seed,
since=cv.cv_since,
horizon_min=horizon_min,
):
full_save_path = None
if folder_save_path:
full_save_path = f"{folder_save_path}/hmm_btc_eth_{rule}_{horizon_min}.joblib"
preds, ytest, state_map, feats = fit_and_predict_train_test(
train, test, n_states, full_save_path
)
m = metrics_nonoverlap(ytest, preds, rule, horizon_min)
rows.append(
{
"hit_rate": m["hit_rate"],
"ann_sharpe": m["ann_sharpe"],
"n_points": m["n_points"],
"test_span": (ts, te),
"embargo_bars": embargo_bars,
}
)
if not rows:
return {
"rule": rule,
"hit_mean": np.nan,
"hit_std": np.nan,
"sharpe_mean": np.nan,
"sharpe_std": np.nan,
"splits": 0,
}
hits = np.array([r["hit_rate"] for r in rows], dtype=float)
sharpes = np.array([r["ann_sharpe"] for r in rows], dtype=float)
return {
"rule": rule,
"hit_mean": float(np.nanmean(hits)),
"hit_std": float(np.nanstd(hits)),
"sharpe_mean": float(np.nanmean(sharpes)),
"sharpe_std": float(np.nanstd(sharpes)),
"splits": len(rows),
}
def main(args: CLI) -> None:
btc = _load_bitstamp_csv(args.btc_csv, "btc")
eth = _load_bitstamp_csv(args.eth_csv, "eth")
minute = _align_minutely(btc, eth)
class CV:
pass
cv = CV()
cv.cv_splits = args.cv_splits
cv.cv_test_bars = args.cv_test_bars
cv.cv_gap_bars = args.cv_gap_bars
cv.cv_seed = args.cv_seed
cv.cv_since = args.cv_since
results = [
run_rule_mc(minute, rule, args.n_states, args.horizon_min, cv, args.folder_save_path)
for rule in args.resample_rules
]
df = pd.DataFrame(results).sort_values(by="sharpe_mean", ascending=False)
print("# Randomized time-split comparison (embargo = max(user_gap, ceil((lookback+horizon)/rule)))")
print(
f"States={args.n_states} HorizonMin={args.horizon_min} Splits={args.cv_splits} "
f"TestBars={args.cv_test_bars} ExtraGapBars={args.cv_gap_bars} Since={args.cv_since}"
)
if not df.empty:
df["hit"] = df["hit_mean"].round(4).astype(str) + " ± " + df["hit_std"].round(4).astype(str)
df["sharpe"] = df["sharpe_mean"].round(4).astype(str) + " ± " + df["sharpe_std"].round(4).astype(str)
print(df[["rule", "splits", "hit", "sharpe"]].to_string(index=False))
else:
print("No valid splits found")
if __name__ == "__main__":
args = parse_args()
main(args)