507 lines
21 KiB
Python
507 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
import argparse
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
import numpy as np
|
|
import pandas as pd
|
|
from hmmlearn.hmm import GaussianHMM
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.feature_selection import SelectKBest, mutual_info_regression, RFE
|
|
from sklearn.linear_model import LinearRegression
|
|
from sklearn.ensemble import RandomForestRegressor
|
|
import warnings
|
|
warnings.filterwarnings('ignore')
|
|
|
|
# ------------------------------- CLI -----------------------------------------
|
|
@dataclass
|
|
class CLI:
|
|
btc_csv: Path
|
|
eth_csv: Path
|
|
resample_rules: list[str]
|
|
n_states: int
|
|
horizon_min: int
|
|
# CV params
|
|
cv_splits: int
|
|
cv_test_bars: int
|
|
cv_gap_bars: int
|
|
cv_seed: int
|
|
cv_since: str | None # restrict sampling to recent era
|
|
# Feature selection params
|
|
feature_selection_method: str # 'mutual_info', 'rfe', 'random_forest', 'none'
|
|
n_features: int # number of features to select
|
|
# Enhanced CV params
|
|
cv_method: str # 'random', 'expanding', 'rolling'
|
|
|
|
def parse_args() -> CLI:
|
|
p = argparse.ArgumentParser(description="BTC/ETH regime modeling with robust CV and feature selection")
|
|
p.add_argument("--btc", type=Path, default=Path("btcusd_1-min_data.csv"))
|
|
p.add_argument("--eth", type=Path, default=Path("ethusd_1min_ohlc.csv"))
|
|
p.add_argument("--rules", default="30min,45min,1H", help="Comma-separated pandas offsets")
|
|
p.add_argument("--states", type=int, default=3)
|
|
p.add_argument("--horizon", type=int, default=60)
|
|
|
|
# randomized CV controls
|
|
p.add_argument("--cv_splits", type=int, default=8, help="number of random test windows")
|
|
p.add_argument("--cv_test_bars", type=int, default=500, help="length of each test window in bars")
|
|
p.add_argument("--cv_gap_bars", type=int, default=24, help="embargo gap before test window")
|
|
p.add_argument("--cv_seed", type=int, default=7, help="rng seed for reproducibility")
|
|
p.add_argument("--cv_since", default=None, help="only sample test starts at/after this date (e.g. 2023-01-01)")
|
|
|
|
# Feature selection
|
|
p.add_argument("--feature_selection", default="mutual_info",
|
|
choices=['mutual_info', 'rfe', 'random_forest', 'none'],
|
|
help="Feature selection method")
|
|
p.add_argument("--n_features", type=int, default=10, help="Number of features to select")
|
|
|
|
# Enhanced CV method
|
|
p.add_argument("--cv_method", default="random", choices=['random', 'expanding', 'rolling'],
|
|
help="Cross-validation method")
|
|
|
|
a = p.parse_args()
|
|
rules = [r.strip() for r in a.rules.split(",") if r.strip()]
|
|
return CLI(a.btc, a.eth, rules, a.states, a.horizon, a.cv_splits, a.cv_test_bars,
|
|
a.cv_gap_bars, a.cv_seed, a.cv_since, a.feature_selection, a.n_features, a.cv_method)
|
|
|
|
# ------------------------------ IO / CLEAN -----------------------------------
|
|
def _norm_headers(df: pd.DataFrame) -> pd.DataFrame:
|
|
df = df.rename(columns={c: c.strip().lower() for c in df.columns})
|
|
if "unix" in df.columns: df = df.rename(columns={"unix": "timestamp"})
|
|
if "date" in df.columns: df = df.rename(columns={"date": "timestamp"})
|
|
return df
|
|
|
|
def _load_bitstamp_csv(path: Path, prefix: str) -> pd.DataFrame:
|
|
df = pd.read_csv(path)
|
|
df = _norm_headers(df)
|
|
if "timestamp" not in df.columns: raise ValueError(f"Missing timestamp in {path}")
|
|
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True, errors="coerce")
|
|
df = df.dropna(subset=["timestamp"]).set_index("timestamp").sort_index()
|
|
for c in ("open","high","low","close","volume"):
|
|
if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce", downcast="float")
|
|
df = df[["open","high","low","close","volume"]].dropna()
|
|
return df.add_prefix(prefix + "_")
|
|
|
|
def _align_minutely(btc: pd.DataFrame, eth: pd.DataFrame) -> pd.DataFrame:
|
|
idx = btc.index.intersection(eth.index)
|
|
df = btc.reindex(idx).join(eth.reindex(idx), how="inner")
|
|
return df.ffill(limit=60).dropna()
|
|
|
|
# --------------------------- FEATURES (enhanced) -----------------------
|
|
def build_features(df: pd.DataFrame, rule: str, horizon_min: int) -> pd.DataFrame:
|
|
df = df.copy()
|
|
df["btc_ret"] = np.log(df["btc_close"]).diff()
|
|
df["eth_ret"] = np.log(df["eth_close"]).diff()
|
|
df["ratio"] = df["eth_close"]/df["btc_close"]
|
|
df["ratio_ret"] = np.log(df["ratio"]).diff()
|
|
|
|
# Volatility features
|
|
for win in (15,30,60,120,240,360):
|
|
df[f"rv_{win}m"] = df["ratio_ret"].rolling(win, min_periods=win).std()
|
|
|
|
# Trend features
|
|
for win in (60,240,1440):
|
|
ma = df["ratio"].rolling(win, min_periods=win).mean()
|
|
df[f"trend_{win}m"] = df["ratio"]/(ma+1e-12)-1.0
|
|
|
|
# Correlation features
|
|
for win in (60,120,240):
|
|
df[f"corr_{win}m"] = df["btc_ret"].rolling(win, min_periods=win).corr(df["eth_ret"])
|
|
|
|
# Beta and risk features
|
|
cov_120 = df["eth_ret"].rolling(120, min_periods=120).cov(df["btc_ret"])
|
|
var_120 = df["btc_ret"].rolling(120, min_periods=120).var()
|
|
df["beta_2h"] = cov_120/(var_120+1e-12)
|
|
|
|
std_b = df["btc_ret"].rolling(120, min_periods=120).std()
|
|
std_e = df["eth_ret"].rolling(120, min_periods=120).std()
|
|
df["divergence_2h"] = np.abs(df["btc_ret"]/(std_b+1e-12) - df["eth_ret"]/(std_e+1e-12))
|
|
|
|
# Volume features
|
|
df["volratio"] = np.log((df["eth_volume"]+1e-9)/(df["btc_volume"]+1e-9))
|
|
df["vol_sum"] = np.log(df["eth_volume"]+df["btc_volume"]+1e-9)
|
|
df["vol_diff"] = (df["eth_volume"]-df["btc_volume"])/(df["eth_volume"]+df["btc_volume"]+1e-9)
|
|
|
|
# Additional momentum features
|
|
df["momentum_1h"] = df["ratio_ret"].rolling(60).sum()
|
|
df["momentum_4h"] = df["ratio_ret"].rolling(240).sum()
|
|
|
|
# Mean reversion features
|
|
for win in (60, 120, 240):
|
|
rolling_mean = df["ratio_ret"].rolling(win).mean()
|
|
rolling_std = df["ratio_ret"].rolling(win).std()
|
|
df[f"zscore_{win}m"] = (df["ratio_ret"] - rolling_mean) / (rolling_std + 1e-12)
|
|
|
|
# Price position features
|
|
for win in (240, 1440):
|
|
high = df["ratio"].rolling(win).max()
|
|
low = df["ratio"].rolling(win).min()
|
|
df[f"position_{win}m"] = (df["ratio"] - low) / (high - low + 1e-12)
|
|
|
|
# Aggregate to target timeframe
|
|
agg = {"btc_close":"last","eth_close":"last","ratio":"last","ratio_ret":"sum"}
|
|
for c in df.columns:
|
|
if c not in agg: agg[c] = "mean"
|
|
|
|
g = df.resample(rule).agg(agg).dropna()
|
|
step_min = max(1, int(pd.Timedelta(rule).total_seconds()//60))
|
|
ahead = max(1, int(round(horizon_min/step_min)))
|
|
g["fut_ret"] = g["ratio_ret"].shift(-ahead)
|
|
|
|
return g.dropna()
|
|
|
|
def feature_matrix(g: pd.DataFrame) -> tuple[np.ndarray,np.ndarray,list[str]]:
|
|
ban = {"fut_ret","btc_close","eth_close","ratio"}
|
|
keep = ("rv_","corr_","trend_","beta_","divergence_","vol","momentum_","zscore_","position_")
|
|
feats = []
|
|
|
|
if "ratio_ret" in g.columns:
|
|
feats.append("ratio_ret")
|
|
feats += [c for c in g.columns if c not in ban and c!="ratio_ret" and any(c.startswith(p) for p in keep)]
|
|
|
|
if not feats:
|
|
feats = ["ratio_ret","rv_30m","rv_2h","corr_2h","ratio_trend","volratio","momentum_1h","zscore_60m"]
|
|
|
|
X = g[feats].astype(np.float32).values
|
|
y = g["fut_ret"].astype(np.float32).values
|
|
|
|
return X, y, feats
|
|
|
|
# ------------------------- Enhanced Feature Selection ------------------------
|
|
def select_features(X_train: np.ndarray, y_train: np.ndarray,
|
|
X_test: np.ndarray, feature_names: list[str],
|
|
method: str, n_features: int) -> tuple[np.ndarray, np.ndarray, list[str]]:
|
|
"""
|
|
Apply feature selection to training and test data
|
|
"""
|
|
if method == "none" or n_features >= len(feature_names):
|
|
return X_train, X_test, feature_names
|
|
|
|
if n_features <= 0:
|
|
n_features = max(1, len(feature_names) // 2)
|
|
|
|
try:
|
|
if method == "mutual_info":
|
|
selector = SelectKBest(score_func=mutual_info_regression, k=n_features)
|
|
X_train_selected = selector.fit_transform(X_train, y_train)
|
|
X_test_selected = selector.transform(X_test)
|
|
selected_indices = selector.get_support(indices=True)
|
|
selected_features = [feature_names[i] for i in selected_indices]
|
|
|
|
elif method == "rfe":
|
|
# Use linear regression as base estimator for RFE
|
|
estimator = LinearRegression()
|
|
selector = RFE(estimator, n_features_to_select=n_features, step=1)
|
|
X_train_selected = selector.fit_transform(X_train, y_train)
|
|
X_test_selected = selector.transform(X_test)
|
|
selected_indices = selector.get_support(indices=True)
|
|
selected_features = [feature_names[i] for i in selected_indices]
|
|
|
|
elif method == "random_forest":
|
|
# Use feature importance from random forest
|
|
rf = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)
|
|
rf.fit(X_train, y_train)
|
|
importances = rf.feature_importances_
|
|
selected_indices = np.argsort(importances)[-n_features:]
|
|
X_train_selected = X_train[:, selected_indices]
|
|
X_test_selected = X_test[:, selected_indices]
|
|
selected_features = [feature_names[i] for i in selected_indices]
|
|
|
|
else:
|
|
return X_train, X_test, feature_names
|
|
|
|
print(f" Selected {len(selected_features)} features: {selected_features}")
|
|
return X_train_selected, X_test_selected, selected_features
|
|
|
|
except Exception as e:
|
|
print(f" Feature selection failed: {e}, using all features")
|
|
return X_train, X_test, feature_names
|
|
|
|
# ------------------------- Robust Cross-Validation Methods -------------------
|
|
def sample_random_splits(
|
|
g: pd.DataFrame,
|
|
n_splits: int,
|
|
test_bars: int,
|
|
gap_bars: int,
|
|
seed: int,
|
|
since: str | None
|
|
):
|
|
"""Original random sampling method"""
|
|
rng = np.random.default_rng(seed)
|
|
idx = g.index
|
|
|
|
if since is not None:
|
|
idx = idx[idx >= pd.Timestamp(since, tz="UTC")]
|
|
|
|
valid = np.arange(len(idx) - test_bars)
|
|
if len(valid) <= 0:
|
|
raise ValueError("Not enough data for requested test window")
|
|
|
|
starts = rng.choice(valid, size=min(n_splits, len(valid)), replace=False)
|
|
|
|
for s in np.sort(starts):
|
|
test_start = idx[s]
|
|
test_end = idx[s + test_bars - 1]
|
|
embargo_end = idx[max(0, s - gap_bars - 1)] if s - gap_bars - 1 >= 0 else None
|
|
train = g.loc[:embargo_end] if embargo_end is not None else g.iloc[0:0]
|
|
test = g.loc[test_start:test_end]
|
|
|
|
if len(train) == 0 or len(test) < test_bars:
|
|
continue
|
|
|
|
yield train, test, (test_start, test_end)
|
|
|
|
def sample_expanding_window_splits(
|
|
g: pd.DataFrame,
|
|
n_splits: int,
|
|
test_bars: int,
|
|
gap_bars: int,
|
|
since: str | None
|
|
):
|
|
"""Expanding window CV - training set grows over time"""
|
|
idx = g.index
|
|
if since is not None:
|
|
idx = idx[idx >= pd.Timestamp(since, tz="UTC")]
|
|
|
|
total_bars = len(idx)
|
|
min_train_size = test_bars * 2 # Minimum training set size
|
|
|
|
# Calculate split points
|
|
available_splits = max(1, (total_bars - min_train_size - test_bars - gap_bars) // test_bars)
|
|
n_splits = min(n_splits, available_splits)
|
|
|
|
if n_splits <= 0:
|
|
raise ValueError("Not enough data for expanding window splits")
|
|
|
|
for i in range(n_splits):
|
|
test_start_idx = min_train_size + gap_bars + (i * test_bars)
|
|
test_end_idx = test_start_idx + test_bars - 1
|
|
|
|
if test_end_idx >= total_bars:
|
|
break
|
|
|
|
train_end_idx = test_start_idx - gap_bars - 1
|
|
train = g.iloc[:train_end_idx + 1]
|
|
test = g.iloc[test_start_idx:test_end_idx + 1]
|
|
|
|
if len(train) < min_train_size or len(test) < test_bars:
|
|
continue
|
|
|
|
yield train, test, (idx[test_start_idx], idx[test_end_idx])
|
|
|
|
def sample_rolling_window_splits(
|
|
g: pd.DataFrame,
|
|
n_splits: int,
|
|
test_bars: int,
|
|
gap_bars: int,
|
|
since: str | None
|
|
):
|
|
"""Rolling window CV - fixed training window size"""
|
|
idx = g.index
|
|
if since is not None:
|
|
idx = idx[idx >= pd.Timestamp(since, tz="UTC")]
|
|
|
|
total_bars = len(idx)
|
|
train_bars = test_bars * 3 # Fixed training window size
|
|
|
|
# Calculate split points
|
|
available_splits = max(1, (total_bars - train_bars - test_bars - gap_bars) // test_bars)
|
|
n_splits = min(n_splits, available_splits)
|
|
|
|
if n_splits <= 0:
|
|
raise ValueError("Not enough data for rolling window splits")
|
|
|
|
for i in range(n_splits):
|
|
train_start_idx = i * test_bars
|
|
train_end_idx = train_start_idx + train_bars - 1
|
|
test_start_idx = train_end_idx + gap_bars + 1
|
|
test_end_idx = test_start_idx + test_bars - 1
|
|
|
|
if test_end_idx >= total_bars:
|
|
break
|
|
|
|
train = g.iloc[train_start_idx:train_end_idx + 1]
|
|
test = g.iloc[test_start_idx:test_end_idx + 1]
|
|
|
|
if len(train) < train_bars or len(test) < test_bars:
|
|
continue
|
|
|
|
yield train, test, (idx[test_start_idx], idx[test_end_idx])
|
|
|
|
def get_cv_splits(g: pd.DataFrame, cv_method: str, n_splits: int, test_bars: int,
|
|
gap_bars: int, seed: int, since: str | None):
|
|
"""Dispatch to appropriate CV method"""
|
|
if cv_method == "random":
|
|
return sample_random_splits(g, n_splits, test_bars, gap_bars, seed, since)
|
|
elif cv_method == "expanding":
|
|
return sample_expanding_window_splits(g, n_splits, test_bars, gap_bars, since)
|
|
elif cv_method == "rolling":
|
|
return sample_rolling_window_splits(g, n_splits, test_bars, gap_bars, since)
|
|
else:
|
|
raise ValueError(f"Unknown CV method: {cv_method}")
|
|
|
|
# ------------------------------ Model / Fit -----------------------------------
|
|
def fit_and_predict_train_test(train: pd.DataFrame, test: pd.DataFrame,
|
|
n_states: int, feature_selection_method: str,
|
|
n_features: int):
|
|
Xtr, ytr, feats = feature_matrix(train)
|
|
Xte, yte, _ = feature_matrix(test)
|
|
|
|
# Apply feature selection
|
|
Xtr_sel, Xte_sel, selected_feats = select_features(
|
|
Xtr, ytr, Xte, feats, feature_selection_method, n_features
|
|
)
|
|
|
|
scaler = StandardScaler()
|
|
Xtr_s = scaler.fit_transform(Xtr_sel)
|
|
Xte_s = scaler.transform(Xte_sel)
|
|
|
|
hmm = GaussianHMM(n_components=n_states, covariance_type="diag", n_iter=300, random_state=7)
|
|
hmm.fit(Xtr_s)
|
|
|
|
st_tr = hmm.predict(Xtr_s)
|
|
st_te = hmm.predict(Xte_s)
|
|
|
|
means = {s: float(np.nanmean(ytr[st_tr == s])) for s in range(n_states)}
|
|
small = np.nanpercentile(np.abs(list(means.values())), 30)
|
|
state_to_stance = {s: (1 if m > +small else (-1 if m < -small else 0)) for s, m in means.items()}
|
|
preds = np.vectorize(state_to_stance.get)(st_te).astype(np.int8)
|
|
|
|
return preds, yte, state_to_stance, selected_feats, hmm
|
|
|
|
def metrics(y: np.ndarray, preds: np.ndarray, rule: str) -> dict[str,float]:
|
|
T = min(len(y), len(preds)); y, preds = y[:T], preds[:T]
|
|
pnl = preds * y
|
|
hit = (np.sign(preds) == np.sign(y)).mean() if T else np.nan
|
|
bars_per_day = int(round(24 * 60 / max(1, int(pd.Timedelta(rule).total_seconds() // 60))))
|
|
ann = np.sqrt(365 * bars_per_day)
|
|
sharpe = float(np.nanmean(pnl) / (np.nanstd(pnl) + 1e-12) * ann)
|
|
|
|
# Additional metrics
|
|
positive_returns = (pnl > 0).sum()
|
|
total_trades = len(pnl)
|
|
win_rate = positive_returns / total_trades if total_trades > 0 else 0
|
|
profit_factor = abs(pnl[pnl > 0].sum() / (pnl[pnl < 0].sum() + 1e-12))
|
|
|
|
return {
|
|
"hit_rate": float(hit),
|
|
"ann_sharpe": sharpe,
|
|
"n_points": int(T),
|
|
"win_rate": win_rate,
|
|
"profit_factor": profit_factor,
|
|
"total_return": float(pnl.sum())
|
|
}
|
|
|
|
# ------------------------------ Runner ---------------------------------------
|
|
def run_rule_mc(minute: pd.DataFrame, rule: str, n_states: int,
|
|
horizon_min: int, cv, feature_selection_method: str,
|
|
n_features: int) -> dict:
|
|
g = build_features(minute, rule, horizon_min)
|
|
rows = []
|
|
feature_importance = {}
|
|
|
|
for i, (train, test, (ts, te)) in enumerate(get_cv_splits(g, cv.cv_method, cv.cv_splits,
|
|
cv.cv_test_bars, cv.cv_gap_bars,
|
|
cv.cv_seed, cv.cv_since)):
|
|
print(f" Split {i+1}: Train {len(train)} bars, Test {len(test)} bars")
|
|
|
|
preds, ytest, state_map, feats, hmm = fit_and_predict_train_test(
|
|
train, test, n_states, feature_selection_method, n_features
|
|
)
|
|
|
|
m = metrics(ytest, preds, rule)
|
|
rows.append({
|
|
"hit_rate": m["hit_rate"],
|
|
"ann_sharpe": m["ann_sharpe"],
|
|
"n_points": m["n_points"],
|
|
"test_span": (ts, te),
|
|
"win_rate": m["win_rate"],
|
|
"profit_factor": m["profit_factor"],
|
|
"total_return": m["total_return"]
|
|
})
|
|
|
|
# Track feature usage
|
|
for feat in feats:
|
|
feature_importance[feat] = feature_importance.get(feat, 0) + 1
|
|
|
|
if not rows:
|
|
return {
|
|
"rule": rule, "hit_mean": np.nan, "sharpe_mean": np.nan,
|
|
"splits": 0, "hit_std": np.nan, "sharpe_std": np.nan,
|
|
"win_rate_mean": np.nan, "profit_factor_mean": np.nan
|
|
}
|
|
|
|
hits = np.array([r["hit_rate"] for r in rows], dtype=float)
|
|
sharpes = np.array([r["ann_sharpe"] for r in rows], dtype=float)
|
|
win_rates = np.array([r["win_rate"] for r in rows], dtype=float)
|
|
profit_factors = np.array([r["profit_factor"] for r in rows], dtype=float)
|
|
|
|
# Sort features by importance
|
|
sorted_features = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)
|
|
top_features = [f[0] for f in sorted_features[:5]] # Top 5 most frequently selected features
|
|
|
|
return {
|
|
"rule": rule,
|
|
"hit_mean": float(np.nanmean(hits)),
|
|
"hit_std": float(np.nanstd(hits)),
|
|
"sharpe_mean": float(np.nanmean(sharpes)),
|
|
"sharpe_std": float(np.nanstd(sharpes)),
|
|
"win_rate_mean": float(np.nanmean(win_rates)),
|
|
"profit_factor_mean": float(np.nanmean(profit_factors)),
|
|
"splits": len(rows),
|
|
"top_features": top_features
|
|
}
|
|
|
|
# ------------------------------ MAIN -----------------------------------------
|
|
def main(args: CLI) -> None:
|
|
print("Loading data...")
|
|
btc = _load_bitstamp_csv(args.btc_csv, "btc")
|
|
eth = _load_bitstamp_csv(args.eth_csv, "eth")
|
|
minute = _align_minutely(btc, eth)
|
|
print(f"Aligned data: {len(minute)} minutes")
|
|
|
|
class CV: pass
|
|
cv = CV()
|
|
cv.cv_splits = args.cv_splits
|
|
cv.cv_test_bars = args.cv_test_bars
|
|
cv.cv_gap_bars = args.cv_gap_bars
|
|
cv.cv_seed = args.cv_seed
|
|
cv.cv_since = args.cv_since
|
|
cv.cv_method = args.cv_method
|
|
|
|
results = []
|
|
for rule in args.resample_rules:
|
|
print(f"\nProcessing rule: {rule}")
|
|
result = run_rule_mc(minute, rule, args.n_states, args.horizon_min, cv,
|
|
args.feature_selection_method, args.n_features)
|
|
results.append(result)
|
|
|
|
df = pd.DataFrame(results).sort_values(by="sharpe_mean", ascending=False)
|
|
|
|
print("\n" + "="*80)
|
|
print("ENHANCED RESULTS: Randomized time-split comparison with Feature Selection")
|
|
print("="*80)
|
|
print(f"States={args.n_states} | HorizonMin={args.horizon_min} | Splits={args.cv_splits}")
|
|
print(f"TestBars={args.cv_test_bars} | GapBars={args.cv_gap_bars} | Since={args.cv_since}")
|
|
print(f"CV Method={args.cv_method} | Feature Selection={args.feature_selection_method} | N Features={args.n_features}")
|
|
print("="*80)
|
|
|
|
if not df.empty:
|
|
df["hit"] = df["hit_mean"].round(4).astype(str) + " ± " + df["hit_std"].round(4).astype(str)
|
|
df["sharpe"] = df["sharpe_mean"].round(4).astype(str) + " ± " + df["sharpe_std"].round(4).astype(str)
|
|
df["win_rate"] = (df["win_rate_mean"] * 100).round(2).astype(str) + "%"
|
|
df["profit_factor"] = df["profit_factor_mean"].round(3).astype(str)
|
|
|
|
display_cols = ["rule", "splits", "hit", "sharpe", "win_rate", "profit_factor"]
|
|
print(df[display_cols].to_string(index=False))
|
|
|
|
# Show top features for best performing rule
|
|
best_rule = df.iloc[0]
|
|
print(f"\nTop features for best rule '{best_rule['rule']}':")
|
|
for i, feat in enumerate(best_rule.get('top_features', [])[:5]):
|
|
print(f" {i+1}. {feat}")
|
|
else:
|
|
print("No valid splits found")
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
main(args) |