507 lines
21 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pandas as pd
from hmmlearn.hmm import GaussianHMM
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, mutual_info_regression, RFE
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
import warnings
warnings.filterwarnings('ignore')
# ------------------------------- CLI -----------------------------------------
@dataclass
class CLI:
btc_csv: Path
eth_csv: Path
resample_rules: list[str]
n_states: int
horizon_min: int
# CV params
cv_splits: int
cv_test_bars: int
cv_gap_bars: int
cv_seed: int
cv_since: str | None # restrict sampling to recent era
# Feature selection params
feature_selection_method: str # 'mutual_info', 'rfe', 'random_forest', 'none'
n_features: int # number of features to select
# Enhanced CV params
cv_method: str # 'random', 'expanding', 'rolling'
def parse_args() -> CLI:
p = argparse.ArgumentParser(description="BTC/ETH regime modeling with robust CV and feature selection")
p.add_argument("--btc", type=Path, default=Path("btcusd_1-min_data.csv"))
p.add_argument("--eth", type=Path, default=Path("ethusd_1min_ohlc.csv"))
p.add_argument("--rules", default="30min,45min,1H", help="Comma-separated pandas offsets")
p.add_argument("--states", type=int, default=3)
p.add_argument("--horizon", type=int, default=60)
# randomized CV controls
p.add_argument("--cv_splits", type=int, default=8, help="number of random test windows")
p.add_argument("--cv_test_bars", type=int, default=500, help="length of each test window in bars")
p.add_argument("--cv_gap_bars", type=int, default=24, help="embargo gap before test window")
p.add_argument("--cv_seed", type=int, default=7, help="rng seed for reproducibility")
p.add_argument("--cv_since", default=None, help="only sample test starts at/after this date (e.g. 2023-01-01)")
# Feature selection
p.add_argument("--feature_selection", default="mutual_info",
choices=['mutual_info', 'rfe', 'random_forest', 'none'],
help="Feature selection method")
p.add_argument("--n_features", type=int, default=10, help="Number of features to select")
# Enhanced CV method
p.add_argument("--cv_method", default="random", choices=['random', 'expanding', 'rolling'],
help="Cross-validation method")
a = p.parse_args()
rules = [r.strip() for r in a.rules.split(",") if r.strip()]
return CLI(a.btc, a.eth, rules, a.states, a.horizon, a.cv_splits, a.cv_test_bars,
a.cv_gap_bars, a.cv_seed, a.cv_since, a.feature_selection, a.n_features, a.cv_method)
# ------------------------------ IO / CLEAN -----------------------------------
def _norm_headers(df: pd.DataFrame) -> pd.DataFrame:
df = df.rename(columns={c: c.strip().lower() for c in df.columns})
if "unix" in df.columns: df = df.rename(columns={"unix": "timestamp"})
if "date" in df.columns: df = df.rename(columns={"date": "timestamp"})
return df
def _load_bitstamp_csv(path: Path, prefix: str) -> pd.DataFrame:
df = pd.read_csv(path)
df = _norm_headers(df)
if "timestamp" not in df.columns: raise ValueError(f"Missing timestamp in {path}")
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True, errors="coerce")
df = df.dropna(subset=["timestamp"]).set_index("timestamp").sort_index()
for c in ("open","high","low","close","volume"):
if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce", downcast="float")
df = df[["open","high","low","close","volume"]].dropna()
return df.add_prefix(prefix + "_")
def _align_minutely(btc: pd.DataFrame, eth: pd.DataFrame) -> pd.DataFrame:
idx = btc.index.intersection(eth.index)
df = btc.reindex(idx).join(eth.reindex(idx), how="inner")
return df.ffill(limit=60).dropna()
# --------------------------- FEATURES (enhanced) -----------------------
def build_features(df: pd.DataFrame, rule: str, horizon_min: int) -> pd.DataFrame:
df = df.copy()
df["btc_ret"] = np.log(df["btc_close"]).diff()
df["eth_ret"] = np.log(df["eth_close"]).diff()
df["ratio"] = df["eth_close"]/df["btc_close"]
df["ratio_ret"] = np.log(df["ratio"]).diff()
# Volatility features
for win in (15,30,60,120,240,360):
df[f"rv_{win}m"] = df["ratio_ret"].rolling(win, min_periods=win).std()
# Trend features
for win in (60,240,1440):
ma = df["ratio"].rolling(win, min_periods=win).mean()
df[f"trend_{win}m"] = df["ratio"]/(ma+1e-12)-1.0
# Correlation features
for win in (60,120,240):
df[f"corr_{win}m"] = df["btc_ret"].rolling(win, min_periods=win).corr(df["eth_ret"])
# Beta and risk features
cov_120 = df["eth_ret"].rolling(120, min_periods=120).cov(df["btc_ret"])
var_120 = df["btc_ret"].rolling(120, min_periods=120).var()
df["beta_2h"] = cov_120/(var_120+1e-12)
std_b = df["btc_ret"].rolling(120, min_periods=120).std()
std_e = df["eth_ret"].rolling(120, min_periods=120).std()
df["divergence_2h"] = np.abs(df["btc_ret"]/(std_b+1e-12) - df["eth_ret"]/(std_e+1e-12))
# Volume features
df["volratio"] = np.log((df["eth_volume"]+1e-9)/(df["btc_volume"]+1e-9))
df["vol_sum"] = np.log(df["eth_volume"]+df["btc_volume"]+1e-9)
df["vol_diff"] = (df["eth_volume"]-df["btc_volume"])/(df["eth_volume"]+df["btc_volume"]+1e-9)
# Additional momentum features
df["momentum_1h"] = df["ratio_ret"].rolling(60).sum()
df["momentum_4h"] = df["ratio_ret"].rolling(240).sum()
# Mean reversion features
for win in (60, 120, 240):
rolling_mean = df["ratio_ret"].rolling(win).mean()
rolling_std = df["ratio_ret"].rolling(win).std()
df[f"zscore_{win}m"] = (df["ratio_ret"] - rolling_mean) / (rolling_std + 1e-12)
# Price position features
for win in (240, 1440):
high = df["ratio"].rolling(win).max()
low = df["ratio"].rolling(win).min()
df[f"position_{win}m"] = (df["ratio"] - low) / (high - low + 1e-12)
# Aggregate to target timeframe
agg = {"btc_close":"last","eth_close":"last","ratio":"last","ratio_ret":"sum"}
for c in df.columns:
if c not in agg: agg[c] = "mean"
g = df.resample(rule).agg(agg).dropna()
step_min = max(1, int(pd.Timedelta(rule).total_seconds()//60))
ahead = max(1, int(round(horizon_min/step_min)))
g["fut_ret"] = g["ratio_ret"].shift(-ahead)
return g.dropna()
def feature_matrix(g: pd.DataFrame) -> tuple[np.ndarray,np.ndarray,list[str]]:
ban = {"fut_ret","btc_close","eth_close","ratio"}
keep = ("rv_","corr_","trend_","beta_","divergence_","vol","momentum_","zscore_","position_")
feats = []
if "ratio_ret" in g.columns:
feats.append("ratio_ret")
feats += [c for c in g.columns if c not in ban and c!="ratio_ret" and any(c.startswith(p) for p in keep)]
if not feats:
feats = ["ratio_ret","rv_30m","rv_2h","corr_2h","ratio_trend","volratio","momentum_1h","zscore_60m"]
X = g[feats].astype(np.float32).values
y = g["fut_ret"].astype(np.float32).values
return X, y, feats
# ------------------------- Enhanced Feature Selection ------------------------
def select_features(X_train: np.ndarray, y_train: np.ndarray,
X_test: np.ndarray, feature_names: list[str],
method: str, n_features: int) -> tuple[np.ndarray, np.ndarray, list[str]]:
"""
Apply feature selection to training and test data
"""
if method == "none" or n_features >= len(feature_names):
return X_train, X_test, feature_names
if n_features <= 0:
n_features = max(1, len(feature_names) // 2)
try:
if method == "mutual_info":
selector = SelectKBest(score_func=mutual_info_regression, k=n_features)
X_train_selected = selector.fit_transform(X_train, y_train)
X_test_selected = selector.transform(X_test)
selected_indices = selector.get_support(indices=True)
selected_features = [feature_names[i] for i in selected_indices]
elif method == "rfe":
# Use linear regression as base estimator for RFE
estimator = LinearRegression()
selector = RFE(estimator, n_features_to_select=n_features, step=1)
X_train_selected = selector.fit_transform(X_train, y_train)
X_test_selected = selector.transform(X_test)
selected_indices = selector.get_support(indices=True)
selected_features = [feature_names[i] for i in selected_indices]
elif method == "random_forest":
# Use feature importance from random forest
rf = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)
rf.fit(X_train, y_train)
importances = rf.feature_importances_
selected_indices = np.argsort(importances)[-n_features:]
X_train_selected = X_train[:, selected_indices]
X_test_selected = X_test[:, selected_indices]
selected_features = [feature_names[i] for i in selected_indices]
else:
return X_train, X_test, feature_names
print(f" Selected {len(selected_features)} features: {selected_features}")
return X_train_selected, X_test_selected, selected_features
except Exception as e:
print(f" Feature selection failed: {e}, using all features")
return X_train, X_test, feature_names
# ------------------------- Robust Cross-Validation Methods -------------------
def sample_random_splits(
g: pd.DataFrame,
n_splits: int,
test_bars: int,
gap_bars: int,
seed: int,
since: str | None
):
"""Original random sampling method"""
rng = np.random.default_rng(seed)
idx = g.index
if since is not None:
idx = idx[idx >= pd.Timestamp(since, tz="UTC")]
valid = np.arange(len(idx) - test_bars)
if len(valid) <= 0:
raise ValueError("Not enough data for requested test window")
starts = rng.choice(valid, size=min(n_splits, len(valid)), replace=False)
for s in np.sort(starts):
test_start = idx[s]
test_end = idx[s + test_bars - 1]
embargo_end = idx[max(0, s - gap_bars - 1)] if s - gap_bars - 1 >= 0 else None
train = g.loc[:embargo_end] if embargo_end is not None else g.iloc[0:0]
test = g.loc[test_start:test_end]
if len(train) == 0 or len(test) < test_bars:
continue
yield train, test, (test_start, test_end)
def sample_expanding_window_splits(
g: pd.DataFrame,
n_splits: int,
test_bars: int,
gap_bars: int,
since: str | None
):
"""Expanding window CV - training set grows over time"""
idx = g.index
if since is not None:
idx = idx[idx >= pd.Timestamp(since, tz="UTC")]
total_bars = len(idx)
min_train_size = test_bars * 2 # Minimum training set size
# Calculate split points
available_splits = max(1, (total_bars - min_train_size - test_bars - gap_bars) // test_bars)
n_splits = min(n_splits, available_splits)
if n_splits <= 0:
raise ValueError("Not enough data for expanding window splits")
for i in range(n_splits):
test_start_idx = min_train_size + gap_bars + (i * test_bars)
test_end_idx = test_start_idx + test_bars - 1
if test_end_idx >= total_bars:
break
train_end_idx = test_start_idx - gap_bars - 1
train = g.iloc[:train_end_idx + 1]
test = g.iloc[test_start_idx:test_end_idx + 1]
if len(train) < min_train_size or len(test) < test_bars:
continue
yield train, test, (idx[test_start_idx], idx[test_end_idx])
def sample_rolling_window_splits(
g: pd.DataFrame,
n_splits: int,
test_bars: int,
gap_bars: int,
since: str | None
):
"""Rolling window CV - fixed training window size"""
idx = g.index
if since is not None:
idx = idx[idx >= pd.Timestamp(since, tz="UTC")]
total_bars = len(idx)
train_bars = test_bars * 3 # Fixed training window size
# Calculate split points
available_splits = max(1, (total_bars - train_bars - test_bars - gap_bars) // test_bars)
n_splits = min(n_splits, available_splits)
if n_splits <= 0:
raise ValueError("Not enough data for rolling window splits")
for i in range(n_splits):
train_start_idx = i * test_bars
train_end_idx = train_start_idx + train_bars - 1
test_start_idx = train_end_idx + gap_bars + 1
test_end_idx = test_start_idx + test_bars - 1
if test_end_idx >= total_bars:
break
train = g.iloc[train_start_idx:train_end_idx + 1]
test = g.iloc[test_start_idx:test_end_idx + 1]
if len(train) < train_bars or len(test) < test_bars:
continue
yield train, test, (idx[test_start_idx], idx[test_end_idx])
def get_cv_splits(g: pd.DataFrame, cv_method: str, n_splits: int, test_bars: int,
gap_bars: int, seed: int, since: str | None):
"""Dispatch to appropriate CV method"""
if cv_method == "random":
return sample_random_splits(g, n_splits, test_bars, gap_bars, seed, since)
elif cv_method == "expanding":
return sample_expanding_window_splits(g, n_splits, test_bars, gap_bars, since)
elif cv_method == "rolling":
return sample_rolling_window_splits(g, n_splits, test_bars, gap_bars, since)
else:
raise ValueError(f"Unknown CV method: {cv_method}")
# ------------------------------ Model / Fit -----------------------------------
def fit_and_predict_train_test(train: pd.DataFrame, test: pd.DataFrame,
n_states: int, feature_selection_method: str,
n_features: int):
Xtr, ytr, feats = feature_matrix(train)
Xte, yte, _ = feature_matrix(test)
# Apply feature selection
Xtr_sel, Xte_sel, selected_feats = select_features(
Xtr, ytr, Xte, feats, feature_selection_method, n_features
)
scaler = StandardScaler()
Xtr_s = scaler.fit_transform(Xtr_sel)
Xte_s = scaler.transform(Xte_sel)
hmm = GaussianHMM(n_components=n_states, covariance_type="diag", n_iter=300, random_state=7)
hmm.fit(Xtr_s)
st_tr = hmm.predict(Xtr_s)
st_te = hmm.predict(Xte_s)
means = {s: float(np.nanmean(ytr[st_tr == s])) for s in range(n_states)}
small = np.nanpercentile(np.abs(list(means.values())), 30)
state_to_stance = {s: (1 if m > +small else (-1 if m < -small else 0)) for s, m in means.items()}
preds = np.vectorize(state_to_stance.get)(st_te).astype(np.int8)
return preds, yte, state_to_stance, selected_feats, hmm
def metrics(y: np.ndarray, preds: np.ndarray, rule: str) -> dict[str,float]:
T = min(len(y), len(preds)); y, preds = y[:T], preds[:T]
pnl = preds * y
hit = (np.sign(preds) == np.sign(y)).mean() if T else np.nan
bars_per_day = int(round(24 * 60 / max(1, int(pd.Timedelta(rule).total_seconds() // 60))))
ann = np.sqrt(365 * bars_per_day)
sharpe = float(np.nanmean(pnl) / (np.nanstd(pnl) + 1e-12) * ann)
# Additional metrics
positive_returns = (pnl > 0).sum()
total_trades = len(pnl)
win_rate = positive_returns / total_trades if total_trades > 0 else 0
profit_factor = abs(pnl[pnl > 0].sum() / (pnl[pnl < 0].sum() + 1e-12))
return {
"hit_rate": float(hit),
"ann_sharpe": sharpe,
"n_points": int(T),
"win_rate": win_rate,
"profit_factor": profit_factor,
"total_return": float(pnl.sum())
}
# ------------------------------ Runner ---------------------------------------
def run_rule_mc(minute: pd.DataFrame, rule: str, n_states: int,
horizon_min: int, cv, feature_selection_method: str,
n_features: int) -> dict:
g = build_features(minute, rule, horizon_min)
rows = []
feature_importance = {}
for i, (train, test, (ts, te)) in enumerate(get_cv_splits(g, cv.cv_method, cv.cv_splits,
cv.cv_test_bars, cv.cv_gap_bars,
cv.cv_seed, cv.cv_since)):
print(f" Split {i+1}: Train {len(train)} bars, Test {len(test)} bars")
preds, ytest, state_map, feats, hmm = fit_and_predict_train_test(
train, test, n_states, feature_selection_method, n_features
)
m = metrics(ytest, preds, rule)
rows.append({
"hit_rate": m["hit_rate"],
"ann_sharpe": m["ann_sharpe"],
"n_points": m["n_points"],
"test_span": (ts, te),
"win_rate": m["win_rate"],
"profit_factor": m["profit_factor"],
"total_return": m["total_return"]
})
# Track feature usage
for feat in feats:
feature_importance[feat] = feature_importance.get(feat, 0) + 1
if not rows:
return {
"rule": rule, "hit_mean": np.nan, "sharpe_mean": np.nan,
"splits": 0, "hit_std": np.nan, "sharpe_std": np.nan,
"win_rate_mean": np.nan, "profit_factor_mean": np.nan
}
hits = np.array([r["hit_rate"] for r in rows], dtype=float)
sharpes = np.array([r["ann_sharpe"] for r in rows], dtype=float)
win_rates = np.array([r["win_rate"] for r in rows], dtype=float)
profit_factors = np.array([r["profit_factor"] for r in rows], dtype=float)
# Sort features by importance
sorted_features = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)
top_features = [f[0] for f in sorted_features[:5]] # Top 5 most frequently selected features
return {
"rule": rule,
"hit_mean": float(np.nanmean(hits)),
"hit_std": float(np.nanstd(hits)),
"sharpe_mean": float(np.nanmean(sharpes)),
"sharpe_std": float(np.nanstd(sharpes)),
"win_rate_mean": float(np.nanmean(win_rates)),
"profit_factor_mean": float(np.nanmean(profit_factors)),
"splits": len(rows),
"top_features": top_features
}
# ------------------------------ MAIN -----------------------------------------
def main(args: CLI) -> None:
print("Loading data...")
btc = _load_bitstamp_csv(args.btc_csv, "btc")
eth = _load_bitstamp_csv(args.eth_csv, "eth")
minute = _align_minutely(btc, eth)
print(f"Aligned data: {len(minute)} minutes")
class CV: pass
cv = CV()
cv.cv_splits = args.cv_splits
cv.cv_test_bars = args.cv_test_bars
cv.cv_gap_bars = args.cv_gap_bars
cv.cv_seed = args.cv_seed
cv.cv_since = args.cv_since
cv.cv_method = args.cv_method
results = []
for rule in args.resample_rules:
print(f"\nProcessing rule: {rule}")
result = run_rule_mc(minute, rule, args.n_states, args.horizon_min, cv,
args.feature_selection_method, args.n_features)
results.append(result)
df = pd.DataFrame(results).sort_values(by="sharpe_mean", ascending=False)
print("\n" + "="*80)
print("ENHANCED RESULTS: Randomized time-split comparison with Feature Selection")
print("="*80)
print(f"States={args.n_states} | HorizonMin={args.horizon_min} | Splits={args.cv_splits}")
print(f"TestBars={args.cv_test_bars} | GapBars={args.cv_gap_bars} | Since={args.cv_since}")
print(f"CV Method={args.cv_method} | Feature Selection={args.feature_selection_method} | N Features={args.n_features}")
print("="*80)
if not df.empty:
df["hit"] = df["hit_mean"].round(4).astype(str) + " ± " + df["hit_std"].round(4).astype(str)
df["sharpe"] = df["sharpe_mean"].round(4).astype(str) + " ± " + df["sharpe_std"].round(4).astype(str)
df["win_rate"] = (df["win_rate_mean"] * 100).round(2).astype(str) + "%"
df["profit_factor"] = df["profit_factor_mean"].round(3).astype(str)
display_cols = ["rule", "splits", "hit", "sharpe", "win_rate", "profit_factor"]
print(df[display_cols].to_string(index=False))
# Show top features for best performing rule
best_rule = df.iloc[0]
print(f"\nTop features for best rule '{best_rule['rule']}':")
for i, feat in enumerate(best_rule.get('top_features', [])[:5]):
print(f" {i+1}. {feat}")
else:
print("No valid splits found")
if __name__ == "__main__":
args = parse_args()
main(args)