from __future__ import annotations from typing import Dict, List, Tuple import os import csv import json import numpy as np import pandas as pd from .config import RunConfig from .data import load_and_filter_data from .preprocess import add_basic_time_features, downcast_numeric_columns, handle_nans from .selection import build_feature_list, prune_features from .model import train_model, predict, get_feature_importance from .metrics import compute_price_series_from_log_returns, compute_metrics_from_prices from evaluation import walk_forward_cv from feature_engineering import feature_engineering from plot_results import plot_prediction_error_distribution def ensure_charts_dir(path: str) -> None: if not os.path.exists(path): os.makedirs(path, exist_ok=True) def run_pipeline(cfg: RunConfig) -> Dict[str, float]: # Setup outputs ensure_charts_dir(cfg.output.charts_dir) # Load and target df = load_and_filter_data(cfg.data) # Features features_dict = feature_engineering( df, os.path.splitext(os.path.basename(cfg.data.csv_path))[0], cfg.features.ohlcv_cols, cfg.features.lags, cfg.features.window_sizes, ) features_df = pd.DataFrame(features_dict) df = pd.concat([df, features_df], axis=1) # Preprocess df = downcast_numeric_columns(df) df = add_basic_time_features(df) df = handle_nans(df, cfg.preprocess) # Feature selection and pruning feature_cols = build_feature_list(df.columns) X = df[feature_cols].values.astype(np.float32) y = df["log_return"].values.astype(np.float32) split_idx = int(len(X) * 0.8) X_train, X_test = X[:split_idx], X[split_idx:] y_train, y_test = y[:split_idx], y[split_idx:] importance_avg = None if cfg.pruning.do_walk_forward_cv: metrics_avg, importance_avg = walk_forward_cv(X, y, feature_cols, n_splits=cfg.pruning.n_splits) # Optional: you may log or return metrics_avg kept_feature_cols = prune_features(feature_cols, importance_avg, cfg.pruning) if cfg.pruning.auto_prune else feature_cols # Train model model = train_model( df[kept_feature_cols].values.astype(np.float32)[:split_idx], df[kept_feature_cols].values.astype(np.float32)[split_idx:], y[:split_idx], y[split_idx:], eval_metric='rmse', ) # Save model model.save_model(cfg.output.model_output_path) # Persist the exact feature list used for training next to the model try: features_path = os.path.splitext(cfg.output.model_output_path)[0] + "_features.json" with open(features_path, "w") as f: json.dump({"feature_names": kept_feature_cols}, f) except Exception: # Feature list persistence is optional; avoid breaking the run on failure pass # Predict X_test_kept = df[kept_feature_cols].values.astype(np.float32)[split_idx:] test_preds = predict(model, X_test_kept) # Reconstruct price series close_prices = df['Close'].values start_price = close_prices[split_idx] actual_prices = compute_price_series_from_log_returns(start_price, y_test) predicted_prices = compute_price_series_from_log_returns(start_price, test_preds) # Metrics metrics = compute_metrics_from_prices(actual_prices, predicted_prices) # Plot prediction error distribution to charts dir (parity with previous behavior) try: plot_prediction_error_distribution(predicted_prices, actual_prices, prefix="all_features") except Exception: # plotting is optional; ignore failures in headless environments pass # Persist per-feature metrics and importances feat_importance = get_feature_importance(model, kept_feature_cols) if not os.path.exists(cfg.output.results_csv): with open(cfg.output.results_csv, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['feature', 'rmse', 'mape', 'r2', 'directional_accuracy', 'feature_importance']) with open(cfg.output.results_csv, 'a', newline='') as f: writer = csv.writer(f) for feature in kept_feature_cols: importance = feat_importance.get(feature, 0.0) row = [feature] for key in ['rmse', 'mape', 'r2', 'directional_accuracy']: val = metrics[key] row.append(f"{val:.10f}") row.append(f"{importance:.6f}") writer.writerow(row) return metrics