Refactor main script and introduce CLI for OHLCV Predictor. Consolidate functionality into a new package structure, enhancing modularity. Update README to reflect new features and usage instructions, including the requirement for a companion feature list JSON. Add configuration classes for better parameter management and streamline data loading and preprocessing.

This commit is contained in:
Simon Moisy
2025-08-12 16:06:05 +08:00
parent 70da858aac
commit 289d11b0a8
14 changed files with 4361 additions and 327 deletions

View File

@@ -0,0 +1,14 @@
"""OHLCV Predictor package."""
__all__ = [
"config",
"data",
"preprocess",
"selection",
"metrics",
"model",
"pipeline",
"cli",
]

29
ohlcvpredictor/cli.py Normal file
View File

@@ -0,0 +1,29 @@
import argparse
from .config import RunConfig, DataConfig
from .pipeline import run_pipeline
def build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="OHLCV Predictor Pipeline")
p.add_argument("--csv", dest="csv_path", required=False, default="../data/btcusd_1-min_data.csv")
p.add_argument("--min-date", dest="min_date", required=False, default="2017-06-01")
p.add_argument("--max-date", dest="max_date", required=False, default=None)
return p
def main() -> None:
parser = build_arg_parser()
args = parser.parse_args()
run_cfg = RunConfig(
data=DataConfig(csv_path=args.csv_path, min_date=args.min_date, max_date=args.max_date)
)
metrics = run_pipeline(run_cfg)
print(
f"RMSE={metrics['rmse']:.6f}, MAPE={metrics['mape']:.4f}%, R2={metrics['r2']:.6f}, DirAcc={metrics['directional_accuracy']:.4f}"
)
if __name__ == "__main__":
main()

65
ohlcvpredictor/config.py Normal file
View File

@@ -0,0 +1,65 @@
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class DataConfig:
"""Configuration for data loading and basic filtering."""
csv_path: str
min_date: str = "2017-06-01"
max_date: Optional[str] = None
drop_volume_zero: bool = True
@dataclass
class FeatureConfig:
"""Configuration for feature engineering."""
ohlcv_cols: List[str] = field(default_factory=lambda: ["Open", "High", "Low", "Close", "Volume"])
lags: int = 3
window_sizes: List[int] = field(default_factory=lambda: [5, 15, 30])
@dataclass
class PreprocessConfig:
"""Configuration for preprocessing and NaN handling."""
impute_nans: bool = True
@dataclass
class PruningConfig:
"""Configuration for feature pruning and CV."""
do_walk_forward_cv: bool = True
n_splits: int = 5
auto_prune: bool = True
top_k: int = 150
known_low_features: List[str] = field(
default_factory=lambda: [
"supertrend_12_3.0",
"supertrend_10_1.0",
"supertrend_11_2.0",
"supertrend_trend_12_3.0",
"supertrend_trend_10_1.0",
"supertrend_trend_11_2.0",
"hour",
]
)
@dataclass
class OutputConfig:
"""Configuration for outputs and artifacts."""
charts_dir: str = "charts"
results_csv: str = "../data/cumulative_feature_results.csv"
model_output_path: str = "../data/xgboost_model_all_features.json"
@dataclass
class RunConfig:
"""Top-level configuration grouping for a pipeline run."""
data: DataConfig
features: FeatureConfig = field(default_factory=FeatureConfig)
preprocess: PreprocessConfig = field(default_factory=PreprocessConfig)
pruning: PruningConfig = field(default_factory=PruningConfig)
output: OutputConfig = field(default_factory=OutputConfig)

38
ohlcvpredictor/data.py Normal file
View File

@@ -0,0 +1,38 @@
from typing import Tuple
import os
import pandas as pd
import numpy as np
from .config import DataConfig
def load_and_filter_data(cfg: DataConfig) -> pd.DataFrame:
"""Load CSV, filter, and convert timestamp.
- Reads the CSV at cfg.csv_path
- Drops rows with Volume == 0 if configured
- Converts 'Timestamp' from seconds to datetime and filters by cfg.min_date
- Adds 'log_return' target column
"""
if not os.path.exists(cfg.csv_path):
raise FileNotFoundError(f"CSV not found: {cfg.csv_path}")
df = pd.read_csv(cfg.csv_path)
if cfg.drop_volume_zero and 'Volume' in df.columns:
df = df[df['Volume'] != 0]
if 'Timestamp' not in df.columns:
raise ValueError("Expected 'Timestamp' column in input CSV")
df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
df = df[df['Timestamp'] >= cfg.min_date]
if cfg.max_date:
df = df[df['Timestamp'] <= cfg.max_date]
if 'Close' not in df.columns:
raise ValueError("Expected 'Close' column in input CSV")
df['log_return'] = np.log(df['Close'] / df['Close'].shift(1))
return df

26
ohlcvpredictor/metrics.py Normal file
View File

@@ -0,0 +1,26 @@
from typing import Dict, Tuple
import numpy as np
from sklearn.metrics import mean_squared_error, r2_score
def compute_price_series_from_log_returns(start_price: float, log_returns: np.ndarray) -> np.ndarray:
"""Reconstruct price series from log returns starting at start_price."""
prices = [start_price]
for r in log_returns:
prices.append(prices[-1] * float(np.exp(r)))
return np.asarray(prices[1:])
def compute_metrics_from_prices(actual_prices: np.ndarray, predicted_prices: np.ndarray) -> Dict[str, float]:
"""Compute RMSE, MAPE, R2, and directional accuracy given price series."""
rmse = float(np.sqrt(mean_squared_error(actual_prices, predicted_prices)))
with np.errstate(divide='ignore', invalid='ignore'):
mape_arr = np.abs((actual_prices - predicted_prices) / np.where(actual_prices == 0, np.nan, actual_prices))
mape = float(np.nanmean(mape_arr) * 100.0)
r2 = float(r2_score(actual_prices, predicted_prices))
direction_actual = np.sign(np.diff(actual_prices))
direction_pred = np.sign(np.diff(predicted_prices))
dir_acc = float((direction_actual == direction_pred).mean()) if len(direction_actual) > 0 else 0.0
return {"rmse": rmse, "mape": mape, "r2": r2, "directional_accuracy": dir_acc}

28
ohlcvpredictor/model.py Normal file
View File

@@ -0,0 +1,28 @@
from typing import Dict, List, Tuple
import numpy as np
from custom_xgboost import CustomXGBoostGPU
def train_model(
X_train: np.ndarray,
X_test: np.ndarray,
y_train: np.ndarray,
y_test: np.ndarray,
eval_metric: str = 'rmse',
):
"""Train the XGBoost model and return the fitted wrapper."""
model = CustomXGBoostGPU(X_train, X_test, y_train, y_test)
model.train(eval_metric=eval_metric)
return model
def predict(model: CustomXGBoostGPU, X: np.ndarray) -> np.ndarray:
"""Predict using the trained model."""
return model.predict(X)
def get_feature_importance(model: CustomXGBoostGPU, feature_names: List[str]) -> Dict[str, float]:
return model.get_feature_importance(feature_names)

125
ohlcvpredictor/pipeline.py Normal file
View File

@@ -0,0 +1,125 @@
from __future__ import annotations
from typing import Dict, List, Tuple
import os
import csv
import json
import numpy as np
import pandas as pd
from .config import RunConfig
from .data import load_and_filter_data
from .preprocess import add_basic_time_features, downcast_numeric_columns, handle_nans
from .selection import build_feature_list, prune_features
from .model import train_model, predict, get_feature_importance
from .metrics import compute_price_series_from_log_returns, compute_metrics_from_prices
from evaluation import walk_forward_cv
from feature_engineering import feature_engineering
from plot_results import plot_prediction_error_distribution
def ensure_charts_dir(path: str) -> None:
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
def run_pipeline(cfg: RunConfig) -> Dict[str, float]:
# Setup outputs
ensure_charts_dir(cfg.output.charts_dir)
# Load and target
df = load_and_filter_data(cfg.data)
# Features
features_dict = feature_engineering(
df,
os.path.splitext(os.path.basename(cfg.data.csv_path))[0],
cfg.features.ohlcv_cols,
cfg.features.lags,
cfg.features.window_sizes,
)
features_df = pd.DataFrame(features_dict)
df = pd.concat([df, features_df], axis=1)
# Preprocess
df = downcast_numeric_columns(df)
df = add_basic_time_features(df)
df = handle_nans(df, cfg.preprocess)
# Feature selection and pruning
feature_cols = build_feature_list(df.columns)
X = df[feature_cols].values.astype(np.float32)
y = df["log_return"].values.astype(np.float32)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
importance_avg = None
if cfg.pruning.do_walk_forward_cv:
metrics_avg, importance_avg = walk_forward_cv(X, y, feature_cols, n_splits=cfg.pruning.n_splits)
# Optional: you may log or return metrics_avg
kept_feature_cols = prune_features(feature_cols, importance_avg, cfg.pruning) if cfg.pruning.auto_prune else feature_cols
# Train model
model = train_model(
df[kept_feature_cols].values.astype(np.float32)[:split_idx],
df[kept_feature_cols].values.astype(np.float32)[split_idx:],
y[:split_idx],
y[split_idx:],
eval_metric='rmse',
)
# Save model
model.save_model(cfg.output.model_output_path)
# Persist the exact feature list used for training next to the model
try:
features_path = os.path.splitext(cfg.output.model_output_path)[0] + "_features.json"
with open(features_path, "w") as f:
json.dump({"feature_names": kept_feature_cols}, f)
except Exception:
# Feature list persistence is optional; avoid breaking the run on failure
pass
# Predict
X_test_kept = df[kept_feature_cols].values.astype(np.float32)[split_idx:]
test_preds = predict(model, X_test_kept)
# Reconstruct price series
close_prices = df['Close'].values
start_price = close_prices[split_idx]
actual_prices = compute_price_series_from_log_returns(start_price, y_test)
predicted_prices = compute_price_series_from_log_returns(start_price, test_preds)
# Metrics
metrics = compute_metrics_from_prices(actual_prices, predicted_prices)
# Plot prediction error distribution to charts dir (parity with previous behavior)
try:
plot_prediction_error_distribution(predicted_prices, actual_prices, prefix="all_features")
except Exception:
# plotting is optional; ignore failures in headless environments
pass
# Persist per-feature metrics and importances
feat_importance = get_feature_importance(model, kept_feature_cols)
if not os.path.exists(cfg.output.results_csv):
with open(cfg.output.results_csv, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['feature', 'rmse', 'mape', 'r2', 'directional_accuracy', 'feature_importance'])
with open(cfg.output.results_csv, 'a', newline='') as f:
writer = csv.writer(f)
for feature in kept_feature_cols:
importance = feat_importance.get(feature, 0.0)
row = [feature]
for key in ['rmse', 'mape', 'r2', 'directional_accuracy']:
val = metrics[key]
row.append(f"{val:.10f}")
row.append(f"{importance:.6f}")
writer.writerow(row)
return metrics

View File

@@ -0,0 +1,39 @@
from typing import List
import pandas as pd
import numpy as np
from .config import PreprocessConfig
def add_basic_time_features(df: pd.DataFrame) -> pd.DataFrame:
"""Add basic time features such as hour-of-day."""
df = df.copy()
df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
df['hour'] = df['Timestamp'].dt.hour
return df
def downcast_numeric_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Downcast numeric columns to save memory."""
df = df.copy()
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col], downcast='float')
except Exception:
# ignore non-numeric columns
pass
return df
def handle_nans(df: pd.DataFrame, cfg: PreprocessConfig) -> pd.DataFrame:
"""Impute NaNs (mean) or drop rows, based on config."""
df = df.copy()
if cfg.impute_nans:
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df[col] = df[col].fillna(df[col].mean())
else:
df = df.dropna().reset_index(drop=True)
return df

View File

@@ -0,0 +1,59 @@
from typing import Dict, Iterable, List, Sequence, Set, Tuple
import numpy as np
from .config import PruningConfig
EXCLUDE_BASE_FEATURES: List[str] = [
'Timestamp', 'Close',
'log_return_5', 'volatility_5', 'volatility_15', 'volatility_30',
'bb_bbm', 'bb_bbh', 'bb_bbl', 'stoch_k', 'sma_50', 'sma_200', 'psar',
'donchian_hband', 'donchian_lband', 'donchian_mband', 'keltner_hband', 'keltner_lband',
'keltner_mband', 'ichimoku_a', 'ichimoku_b', 'ichimoku_base_line', 'ichimoku_conversion_line',
'Open_lag1', 'Open_lag2', 'Open_lag3', 'High_lag1', 'High_lag2', 'High_lag3', 'Low_lag1', 'Low_lag2',
'Low_lag3', 'Close_lag1', 'Close_lag2', 'Close_lag3', 'Open_roll_mean_15', 'Open_roll_std_15', 'Open_roll_min_15',
'Open_roll_max_15', 'Open_roll_mean_30', 'Open_roll_min_30', 'Open_roll_max_30', 'High_roll_mean_15', 'High_roll_std_15',
'High_roll_min_15', 'High_roll_max_15', 'Low_roll_mean_5', 'Low_roll_min_5', 'Low_roll_max_5', 'Low_roll_mean_30',
'Low_roll_std_30', 'Low_roll_min_30', 'Low_roll_max_30', 'Close_roll_mean_5', 'Close_roll_min_5', 'Close_roll_max_5',
'Close_roll_mean_15', 'Close_roll_std_15', 'Close_roll_min_15', 'Close_roll_max_15', 'Close_roll_mean_30',
'Close_roll_std_30', 'Close_roll_min_30', 'Close_roll_max_30', 'Volume_roll_max_5', 'Volume_roll_max_15',
'Volume_roll_max_30', 'supertrend_12_3.0', 'supertrend_10_1.0', 'supertrend_11_2.0',
]
def build_feature_list(all_columns: Sequence[str]) -> List[str]:
"""Return the model feature list by excluding base columns and targets."""
return [col for col in all_columns if col not in EXCLUDE_BASE_FEATURES]
def prune_features(
feature_cols: Sequence[str],
importance_avg: Dict[str, float] | None,
cfg: PruningConfig,
) -> List[str]:
"""Decide which features to keep using averaged importances and rules."""
prune_set: Set[str] = set()
if importance_avg is not None:
sorted_feats = sorted(importance_avg.items(), key=lambda kv: kv[1], reverse=True)
keep_names = set(name for name, _ in sorted_feats[: cfg.top_k])
for name in feature_cols:
if name not in keep_names:
prune_set.add(name)
for name in cfg.known_low_features:
if name in feature_cols:
prune_set.add(name)
# If Parkinson vol exists, drop alternatives at same window
for w in [5, 15, 30]:
park = f'park_vol_{w}'
if park in feature_cols:
for alt in [f'gk_vol_{w}', f'rs_vol_{w}', f'yz_vol_{w}']:
if alt in feature_cols:
prune_set.add(alt)
kept = [c for c in feature_cols if c not in prune_set]
return kept