Compare commits

..

3 Commits

17 changed files with 4725 additions and 291 deletions

23
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,23 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: main.py",
"type": "debugpy",
"request": "launch",
"program": "main.py",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}",
"args": [
"--csv",
"../data/btcusd_1-min_data.csv",
"--min-date",
"2017-06-01"
]
}
]
}

View File

@@ -1,38 +1,30 @@
# OHLCV Predictor - Simple Inference # OHLCV Predictor - Inference (Quick Reference)
Refactored for easy reuse in other projects. For full instructions, see the main README.
## Usage ## Minimal usage
```python ```python
from predictor import OHLCVPredictor from predictor import OHLCVPredictor
predictor = OHLCVPredictor('model.json') predictor = OHLCVPredictor('../data/xgboost_model_all_features.json')
predictions = predictor.predict(your_ohlcv_dataframe) predictions = predictor.predict(your_ohlcv_dataframe)
``` ```
## Files Needed
Copy these 5 files to your other project:
1. `predictor.py`
2. `custom_xgboost.py`
3. `feature_engineering.py`
4. `technical_indicator_functions.py`
5. `xgboost_model_all_features.json`
## Data Requirements
Your DataFrame needs these columns: Your DataFrame needs these columns:
- `Open`, `High`, `Low`, `Close`, `Volume`, `Timestamp` - `Timestamp`, `Open`, `High`, `Low`, `Close`, `Volume`, `log_return`
## Dependencies Note: If you are only running inference (not training with `main.py`), compute `log_return` first:
```python
import numpy as np
df['log_return'] = np.log(df['Close'] / df['Close'].shift(1))
``` ```
xgboost >= 3.0.2
pandas >= 2.2.3 ## Files to reuse in other projects
numpy >= 2.2.3
scikit-learn >= 1.6.1 - `predictor.py`
ta >= 0.11.0 - `custom_xgboost.py`
numba >= 0.61.2 - `feature_engineering.py`
``` - `technical_indicator_functions.py`
- your trained model file (e.g., `xgboost_model_all_features.json`)

120
README.md
View File

@@ -1,2 +1,122 @@
# OHLCVPredictor # OHLCVPredictor
End-to-end pipeline for engineering OHLCV features, training an XGBoost regressor (GPU by default), and running inference via a small, reusable predictor API.
## Quickstart (uv)
Prereqs:
- Python 3.12+
- `uv` installed (see `https://docs.astral.sh/uv/`)
Install dependencies:
```powershell
uv sync
```
Run training (expects an input CSV; see Data Requirements):
```powershell
uv run python main.py
```
Run the inference demo:
```powershell
uv run python inference_example.py
```
## Data requirements
Your input DataFrame/CSV must include these columns:
- `Timestamp`, `Open`, `High`, `Low`, `Close`, `Volume`, `log_return`
Notes:
- `Timestamp` can be either a pandas datetime-like column or Unix seconds (int). During inference, the predictor will try to parse strings as datetimes; non-object dtypes are treated as Unix seconds.
- `log_return` should be computed as:
```python
df['log_return'] = np.log(df['Close'] / df['Close'].shift(1))
```
The training script (`main.py`) computes it automatically. For standalone inference, ensure it exists before calling the predictor.
- The training script filters out rows with `Volume == 0` and focuses on data newer than `2017-06-01` by default.
## Training workflow
The training entrypoint is `main.py`:
- Reads the CSV at `../data/btcusd_1-min_data.csv` by default. Adjust `csv_path` in `main.py` to point to your data, or move your CSV to that path.
- Engineers a large set of technical and OHLCV-derived features (see `feature_engineering.py` and `technical_indicator_functions.py`).
- Optionally performs walk-forward cross validation to compute averaged feature importances.
- Prunes low-importance and redundant features, trains XGBoost (GPU by default), and saves artifacts.
- Produces charts with Plotly into `charts/`.
Outputs produced by training:
- Model: `../data/xgboost_model_all_features.json`
- Feature list: `../data/xgboost_model_all_features_features.json` (exact feature names and order used for training)
- Results CSV: `../data/cumulative_feature_results.csv`
- Charts: files under `charts/` (e.g., `all_features_prediction_error_distribution.html`)
Run:
```powershell
uv run python main.py
```
If you do not have a CUDA-capable GPU, set the device to CPU (see GPU/CPU section).
## Inference usage
You can reuse the predictor in other projects or run the included example.
Minimal example:
```python
from predictor import OHLCVPredictor
import numpy as np
predictor = OHLCVPredictor('../data/xgboost_model_all_features.json')
# df must contain: Timestamp, Open, High, Low, Close, Volume, log_return
log_returns = predictor.predict(df)
prices_pred, prices_actual = predictor.predict_prices(df)
```
Run the comprehensive demo:
```powershell
uv run python inference_example.py
```
Files needed to embed the predictor in another project:
- `predictor.py`
- `custom_xgboost.py`
- `feature_engineering.py`
- `technical_indicator_functions.py`
- your trained model file (e.g., `xgboost_model_all_features.json`)
- the companion feature list JSON saved next to the model (same basename with `_features.json`)
## GPU/CPU notes
Training uses XGBoost with `device='cuda'` by default (see `custom_xgboost.py`). If you do not have a CUDA-capable GPU or drivers:
- Change the parameter in `CustomXGBoostGPU.train()` from `device='cuda'` to `device='cpu'`, or
- Pass `device='cpu'` when calling `train()` wherever applicable.
Inference works on CPU even if the model was trained on GPU.
## Dependencies
The project is managed via `pyproject.toml` and `uv`. Key runtime deps include:
- `xgboost`, `pandas`, `numpy`, `scikit-learn`, `ta`, `numba`
- `dash`/Plotly for charts (Plotly is used by `plot_results.py`)
Install using:
```powershell
uv sync
```
## Troubleshooting
- KeyError: `'log_return'` during inference: ensure your input DataFrame includes `log_return` as described above.
- Model file not found: confirm the path passed to `OHLCVPredictor(...)` matches where training saved it (default `../data/xgboost_model_all_features.json`).
- Feature mismatch (e.g., XGBoost "Number of columns does not match"): ensure you use the model together with its companion feature list JSON. The predictor will automatically use it if present. If missing, retrain with the current code so the feature list is generated.
- Empty/old charts: delete the `charts/` folder and rerun training.
- Memory issues: consider downcasting or using smaller windows; the code already downcasts numerics where possible.

File diff suppressed because one or more lines are too long

77
evaluation.py Normal file
View File

@@ -0,0 +1,77 @@
import numpy as np
from typing import Dict, List, Tuple
try:
from .custom_xgboost import CustomXGBoostGPU
except ImportError:
from custom_xgboost import CustomXGBoostGPU
from sklearn.metrics import mean_squared_error, r2_score
def _compute_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> Tuple[float, float, float, float]:
"""Compute RMSE, MAPE, R2, and directional accuracy.
Returns:
(rmse, mape, r2, directional_accuracy)
"""
rmse = float(np.sqrt(mean_squared_error(y_true, y_pred)))
with np.errstate(divide='ignore', invalid='ignore'):
mape_arr = np.abs((y_true - y_pred) / np.where(y_true == 0, np.nan, y_true))
mape = float(np.nanmean(mape_arr) * 100.0)
r2 = float(r2_score(y_true, y_pred))
direction_actual = np.sign(np.diff(y_true))
direction_pred = np.sign(np.diff(y_pred))
min_len = min(len(direction_actual), len(direction_pred))
if min_len == 0:
dir_acc = 0.0
else:
dir_acc = float((direction_actual[:min_len] == direction_pred[:min_len]).mean())
return rmse, mape, r2, dir_acc
def walk_forward_cv(
X: np.ndarray,
y: np.ndarray,
feature_names: List[str],
n_splits: int = 5,
) -> Tuple[Dict[str, float], Dict[str, float]]:
"""Run simple walk-forward CV and aggregate metrics and feature importances.
Returns:
metrics_avg: Average metrics across folds {rmse, mape, r2, dir_acc}
importance_avg: Average feature importance across folds {feature -> importance}
"""
num_samples = len(X)
fold_size = num_samples // (n_splits + 1)
if fold_size <= 0:
raise ValueError("Not enough samples for walk-forward CV")
metrics_accum = {"rmse": [], "mape": [], "r2": [], "dir_acc": []}
importance_sum = {name: 0.0 for name in feature_names}
for i in range(1, n_splits + 1):
train_end = i * fold_size
test_end = (i + 1) * fold_size if i < n_splits else num_samples
X_train, y_train = X[:train_end], y[:train_end]
X_test, y_test = X[train_end:test_end], y[train_end:test_end]
if len(X_test) == 0:
continue
model = CustomXGBoostGPU(X_train, X_test, y_train, y_test)
model.train(eval_metric='rmse')
preds = model.predict(X_test)
rmse, mape, r2, dir_acc = _compute_metrics(y_test, preds)
metrics_accum["rmse"].append(rmse)
metrics_accum["mape"].append(mape)
metrics_accum["r2"].append(r2)
metrics_accum["dir_acc"].append(dir_acc)
fold_importance = model.get_feature_importance(feature_names)
for name, val in fold_importance.items():
importance_sum[name] += float(val)
metrics_avg = {k: float(np.mean(v)) if len(v) > 0 else float('nan') for k, v in metrics_accum.items()}
importance_avg = {k: (importance_sum[k] / n_splits) for k in feature_names}
return metrics_avg, importance_avg

View File

@@ -433,4 +433,157 @@ def feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes):
np.save(st_file, features_dict[st_name].values) np.save(st_file, features_dict[st_name].values)
np.save(st_trend_file, features_dict[st_trend_name].values) np.save(st_trend_file, features_dict[st_trend_name].values)
# --- OHLCV-only additional features ---
# Helper for caching single-series features using the same pattern as above
def _save_or_load_feature(name, series):
if csv_prefix:
feature_file = f'../data/{csv_prefix}_{name}.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict[name] = pd.Series(arr, index=df.index)
else:
# Ensure pandas Series with correct index
series = pd.Series(series, index=df.index)
features_dict[name] = series
np.save(feature_file, series.values)
else:
series = pd.Series(series, index=df.index)
features_dict[name] = series
eps = 1e-9
# Candle shape/position
body = (df['Close'] - df['Open']).abs()
rng = (df['High'] - df['Low'])
upper_wick = df['High'] - df[['Open', 'Close']].max(axis=1)
lower_wick = df[['Open', 'Close']].min(axis=1) - df['Low']
_save_or_load_feature('candle_body', body)
_save_or_load_feature('candle_upper_wick', upper_wick)
_save_or_load_feature('candle_lower_wick', lower_wick)
_save_or_load_feature('candle_body_to_range', body / (rng + eps))
_save_or_load_feature('candle_upper_wick_to_range', upper_wick / (rng + eps))
_save_or_load_feature('candle_lower_wick_to_range', lower_wick / (rng + eps))
_save_or_load_feature('close_pos_in_bar', (df['Close'] - df['Low']) / (rng + eps))
for w in window_sizes:
roll_max = df['High'].rolling(w).max()
roll_min = df['Low'].rolling(w).min()
close_pos_roll = (df['Close'] - roll_min) / ((roll_max - roll_min) + eps)
_save_or_load_feature(f'close_pos_in_roll_{w}', close_pos_roll)
# Range-based volatility (Parkinson, GarmanKlass, RogersSatchell, YangZhang)
log_hl = np.log((df['High'] / df['Low']).replace(0, np.nan))
log_co = np.log((df['Close'] / df['Open']).replace(0, np.nan))
log_close = np.log(df['Close'].replace(0, np.nan))
ret1 = log_close.diff()
for w in window_sizes:
# Parkinson
parkinson_var = (log_hl.pow(2)).rolling(w).mean() / (4.0 * np.log(2.0))
_save_or_load_feature(f'park_vol_{w}', np.sqrt(parkinson_var.clip(lower=0)))
# GarmanKlass
gk_var = 0.5 * (log_hl.pow(2)).rolling(w).mean() - (2.0 * np.log(2.0) - 1.0) * (log_co.pow(2)).rolling(w).mean()
_save_or_load_feature(f'gk_vol_{w}', np.sqrt(gk_var.clip(lower=0)))
# RogersSatchell
u = np.log((df['High'] / df['Close']).replace(0, np.nan))
d = np.log((df['Low'] / df['Close']).replace(0, np.nan))
uo = np.log((df['High'] / df['Open']).replace(0, np.nan))
do = np.log((df['Low'] / df['Open']).replace(0, np.nan))
rs_term = u * uo + d * do
rs_var = rs_term.rolling(w).mean()
_save_or_load_feature(f'rs_vol_{w}', np.sqrt(rs_var.clip(lower=0)))
# YangZhang
g = np.log((df['Open'] / df['Close'].shift(1)).replace(0, np.nan))
u_yz = np.log((df['High'] / df['Open']).replace(0, np.nan))
d_yz = np.log((df['Low'] / df['Open']).replace(0, np.nan))
c_yz = np.log((df['Close'] / df['Open']).replace(0, np.nan))
sigma_g2 = g.rolling(w).var()
sigma_c2 = c_yz.rolling(w).var()
sigma_rs = (u_yz * (u_yz - c_yz) + d_yz * (d_yz - c_yz)).rolling(w).mean()
k = 0.34 / (1.34 + (w + 1.0) / max(w - 1.0, 1.0))
yz_var = sigma_g2 + k * sigma_c2 + (1.0 - k) * sigma_rs
_save_or_load_feature(f'yz_vol_{w}', np.sqrt(yz_var.clip(lower=0)))
# Trend strength: rolling linear-regression slope and R² of log price
def _linreg_slope(arr):
y = np.asarray(arr, dtype=float)
n = y.size
x = np.arange(n, dtype=float)
xmean = (n - 1.0) / 2.0
ymean = np.nanmean(y)
xm = x - xmean
ym = y - ymean
cov = np.nansum(xm * ym)
varx = np.nansum(xm * xm) + eps
return cov / varx
def _linreg_r2(arr):
y = np.asarray(arr, dtype=float)
n = y.size
x = np.arange(n, dtype=float)
xmean = (n - 1.0) / 2.0
ymean = np.nanmean(y)
slope = _linreg_slope(arr)
intercept = ymean - slope * xmean
yhat = slope * x + intercept
ss_tot = np.nansum((y - ymean) ** 2)
ss_res = np.nansum((y - yhat) ** 2)
return 1.0 - ss_res / (ss_tot + eps)
for w in window_sizes:
_save_or_load_feature(f'lr_slope_log_close_{w}', log_close.rolling(w).apply(_linreg_slope, raw=True))
_save_or_load_feature(f'lr_r2_log_close_{w}', log_close.rolling(w).apply(_linreg_r2, raw=True))
# EMA(7), EMA(21), their slopes and spread
ema_7 = df['Close'].ewm(span=7, adjust=False).mean()
ema_21 = df['Close'].ewm(span=21, adjust=False).mean()
_save_or_load_feature('ema_7', ema_7)
_save_or_load_feature('ema_21', ema_21)
_save_or_load_feature('ema_7_slope', ema_7.pct_change())
_save_or_load_feature('ema_21_slope', ema_21.pct_change())
_save_or_load_feature('ema_7_21_spread', ema_7 - ema_21)
# VWAP over windows and distance of Close from VWAP
tp = (df['High'] + df['Low'] + df['Close']) / 3.0
for w in window_sizes:
vwap_w = (tp * df['Volume']).rolling(w).sum() / (df['Volume'].rolling(w).sum() + eps)
_save_or_load_feature(f'vwap_{w}', vwap_w)
_save_or_load_feature(f'vwap_dist_{w}', (df['Close'] - vwap_w) / (vwap_w + eps))
# Autocorrelation of log returns at lags 15 (rolling window 30)
for lag in range(1, 6):
ac = ret1.rolling(30).corr(ret1.shift(lag))
_save_or_load_feature(f'ret_autocorr_lag{lag}_30', ac)
# Rolling skewness and kurtosis of returns (15, 30)
for w in [15, 30]:
_save_or_load_feature(f'ret_skew_{w}', ret1.rolling(w).skew())
_save_or_load_feature(f'ret_kurt_{w}', ret1.rolling(w).kurt())
# Volume z-score and return-volume rolling correlation (15, 30)
for w in [15, 30]:
vol_mean = df['Volume'].rolling(w).mean()
vol_std = df['Volume'].rolling(w).std()
_save_or_load_feature(f'volume_zscore_{w}', (df['Volume'] - vol_mean) / (vol_std + eps))
_save_or_load_feature(f'ret_vol_corr_{w}', ret1.rolling(w).corr(df['Volume']))
# Cyclical time features and relative volume vs hour-of-day average
try:
hours = pd.to_datetime(df['Timestamp']).dt.hour
except Exception:
try:
hours = pd.to_datetime(df['Timestamp'], unit='s', errors='coerce').dt.hour
except Exception:
hours = pd.Series(np.nan, index=df.index)
_save_or_load_feature('sin_hour', np.sin(2.0 * np.pi * (hours.fillna(0)) / 24.0))
_save_or_load_feature('cos_hour', np.cos(2.0 * np.pi * (hours.fillna(0)) / 24.0))
hourly_mean_vol = df['Volume'].groupby(hours).transform('mean')
_save_or_load_feature('relative_volume_hour', df['Volume'] / (hourly_mean_vol + eps))
return features_dict return features_dict

268
main.py
View File

@@ -1,269 +1,7 @@
import sys from ohlcvpredictor.cli import main
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import pandas as pd
import numpy as np
from custom_xgboost import CustomXGBoostGPU
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from plot_results import plot_prediction_error_distribution, plot_direction_transition_heatmap
import time
from numba import njit
import csv
from feature_engineering import feature_engineering
from sklearn.feature_selection import VarianceThreshold
charts_dir = 'charts'
if not os.path.exists(charts_dir):
os.makedirs(charts_dir)
def run_indicator(func, *args): if __name__ == "__main__":
return func(*args) main()
def run_indicator_job(job):
import time
func, *args = job
indicator_name = func.__name__
start = time.time()
result = func(*args)
elapsed = time.time() - start
print(f'Indicator {indicator_name} computed in {elapsed:.4f} seconds')
return result
if __name__ == '__main__':
IMPUTE_NANS = True # Set to True to impute NaNs, False to drop rows with NaNs
csv_path = '../data/btcusd_1-min_data.csv'
csv_prefix = os.path.splitext(os.path.basename(csv_path))[0]
print('Reading CSV and filtering data...')
df = pd.read_csv(csv_path)
df = df[df['Volume'] != 0]
min_date = '2017-06-01'
print('Converting Timestamp and filtering by date...')
df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
df = df[df['Timestamp'] >= min_date]
lags = 3
print('Calculating log returns as the new target...')
df['log_return'] = np.log(df['Close'] / df['Close'].shift(1))
ohlcv_cols = ['Open', 'High', 'Low', 'Close', 'Volume']
window_sizes = [5, 15, 30] # in minutes, adjust as needed
features_dict = {}
print('Starting feature computation...')
feature_start_time = time.time()
features_dict = feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes)
print('Concatenating all new features to DataFrame...')
features_df = pd.DataFrame(features_dict)
df = pd.concat([df, features_df], axis=1)
# feature_cols_for_variance = [col for col in features_df.columns if features_df[col].dtype in [np.float32, np.float64, float, int, np.int32, np.int64]]
# if feature_cols_for_variance:
# selector = VarianceThreshold(threshold=1e-5)
# filtered_features = selector.fit_transform(features_df[feature_cols_for_variance])
# kept_mask = selector.get_support()
# kept_feature_names = [col for col, keep in zip(feature_cols_for_variance, kept_mask) if keep]
# print(f"Features removed by low variance: {[col for col, keep in zip(feature_cols_for_variance, kept_mask) if not keep]}")
# # Only keep the selected features in features_df and df
# features_df = features_df[kept_feature_names]
# for col in feature_cols_for_variance:
# if col not in kept_feature_names:
# df.drop(col, axis=1, inplace=True)
# else:
# print("No numeric features found for variance thresholding.")
# Remove highly correlated features (keep only one from each correlated group)
# corr_matrix = features_df.corr().abs()
# upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
# to_drop = [column for column in upper.columns if any(upper[column] > 0.95)]
# if to_drop:
# print(f"Features removed due to high correlation: {to_drop}")
# features_df = features_df.drop(columns=to_drop)
# df = df.drop(columns=to_drop)
# else:
# print("No highly correlated features found for removal.")
print('Downcasting float columns to save memory...')
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col], downcast='float')
except Exception:
pass
# Add time features (exclude 'dayofweek')
print('Adding hour feature...')
df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
df['hour'] = df['Timestamp'].dt.hour
# Handle NaNs after all feature engineering
if IMPUTE_NANS:
print('Imputing NaNs after feature engineering (using mean imputation)...')
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df[col] = df[col].fillna(df[col].mean())
# If you want to impute non-numeric columns differently, add logic here
else:
print('Dropping NaNs after feature engineering...')
df = df.dropna().reset_index(drop=True)
# Exclude 'Timestamp', 'Close', 'log_return', and any future target columns from features
print('Selecting feature columns...')
exclude_cols = ['Timestamp', 'Close']
exclude_cols += ['log_return_5', 'volatility_5', 'volatility_15', 'volatility_30']
exclude_cols += ['bb_bbm', 'bb_bbh', 'bb_bbl', 'stoch_k', 'sma_50', 'sma_200', 'psar',
'donchian_hband', 'donchian_lband', 'donchian_mband', 'keltner_hband', 'keltner_lband',
'keltner_mband', 'ichimoku_a', 'ichimoku_b', 'ichimoku_base_line', 'ichimoku_conversion_line',
'Open_lag1', 'Open_lag2', 'Open_lag3', 'High_lag1', 'High_lag2', 'High_lag3', 'Low_lag1', 'Low_lag2',
'Low_lag3', 'Close_lag1', 'Close_lag2', 'Close_lag3', 'Open_roll_mean_15', 'Open_roll_std_15', 'Open_roll_min_15',
'Open_roll_max_15', 'Open_roll_mean_30', 'Open_roll_min_30', 'Open_roll_max_30', 'High_roll_mean_15', 'High_roll_std_15',
'High_roll_min_15', 'High_roll_max_15', 'Low_roll_mean_5', 'Low_roll_min_5', 'Low_roll_max_5', 'Low_roll_mean_30',
'Low_roll_std_30', 'Low_roll_min_30', 'Low_roll_max_30', 'Close_roll_mean_5', 'Close_roll_min_5', 'Close_roll_max_5',
'Close_roll_mean_15', 'Close_roll_std_15', 'Close_roll_min_15', 'Close_roll_max_15', 'Close_roll_mean_30',
'Close_roll_std_30', 'Close_roll_min_30', 'Close_roll_max_30', 'Volume_roll_max_5', 'Volume_roll_max_15',
'Volume_roll_max_30', 'supertrend_12_3.0', 'supertrend_10_1.0', 'supertrend_11_2.0']
feature_cols = [col for col in df.columns if col not in exclude_cols]
print('Features used for training:', feature_cols)
# from xgboost import XGBRegressor
# from sklearn.model_selection import GridSearchCV
# # Prepare data for grid search
# X = df[feature_cols].values.astype(np.float32)
# y = df["log_return"].values.astype(np.float32)
# split_idx = int(len(X) * 0.8)
# X_train, X_test = X[:split_idx], X[split_idx:]
# y_train, y_test = y[:split_idx], y[split_idx:]
# # Define parameter grid
# param_grid = {
# 'learning_rate': [0.01, 0.05, 0.1],
# 'max_depth': [3, 5, 7],
# 'n_estimators': [100, 200],
# 'subsample': [0.8, 1.0],
# 'colsample_bytree': [0.8, 1.0],
# }
# print('Starting grid search for XGBoost hyperparameters...')
# xgb_model = XGBRegressor(objective='reg:squarederror', tree_method='hist', device='cuda', eval_metric='mae', verbosity=0)
# grid_search = GridSearchCV(xgb_model, param_grid, cv=3, scoring='neg_mean_absolute_error', verbose=2, n_jobs=-1)
# grid_search.fit(X_train, y_train)
# print('Best parameters found:', grid_search.best_params_)
# # Use best estimator for predictions
# best_model = grid_search.best_estimator_
# test_preds = best_model.predict(X_test)
# rmse = np.sqrt(mean_squared_error(y_test, test_preds))
# # Reconstruct price series from log returns
# if 'Close' in df.columns:
# close_prices = df['Close'].values
# else:
# close_prices = pd.read_csv(csv_path)['Close'].values
# start_price = close_prices[split_idx]
# actual_prices = [start_price]
# for r_ in y_test:
# actual_prices.append(actual_prices[-1] * np.exp(r_))
# actual_prices = np.array(actual_prices[1:])
# predicted_prices = [start_price]
# for r_ in test_preds:
# predicted_prices.append(predicted_prices[-1] * np.exp(r_))
# predicted_prices = np.array(predicted_prices[1:])
# mae = mean_absolute_error(actual_prices, predicted_prices)
# r2 = r2_score(actual_prices, predicted_prices)
# direction_actual = np.sign(np.diff(actual_prices))
# direction_pred = np.sign(np.diff(predicted_prices))
# directional_accuracy = (direction_actual == direction_pred).mean()
# mape = np.mean(np.abs((actual_prices - predicted_prices) / actual_prices)) * 100
# print(f'Grid search results: RMSE={rmse:.4f}, MAE={mae:.4f}, R2={r2:.4f}, MAPE={mape:.2f}%, DirAcc={directional_accuracy*100:.2f}%')
# plot_prefix = f'all_features_gridsearch'
# plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix)
# sys.exit(0)
# Prepare CSV for results
results_csv = '../data/cumulative_feature_results.csv'
if not os.path.exists(results_csv):
with open(results_csv, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['num_features', 'added feature', 'rmse', 'mae', 'r2', 'mape', 'directional_accuracy', 'feature_importance'])
try:
X = df[feature_cols].values.astype(np.float32)
y = df["log_return"].values.astype(np.float32)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
test_timestamps = df['Timestamp'].values[split_idx:]
model = CustomXGBoostGPU(X_train, X_test, y_train, y_test)
booster = model.train(eval_metric='rmse')
# colsample_bytree=1.0,
# learning_rate=0.05,
# max_depth=7,
# n_estimators=200,
# subsample=0.8
# )
model.save_model(f'../data/xgboost_model_all_features.json')
test_preds = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, test_preds))
# Reconstruct price series from log returns
if 'Close' in df.columns:
close_prices = df['Close'].values
else:
close_prices = pd.read_csv(csv_path)['Close'].values
start_price = close_prices[split_idx]
actual_prices = [start_price]
for r_ in y_test:
actual_prices.append(actual_prices[-1] * np.exp(r_))
actual_prices = np.array(actual_prices[1:])
predicted_prices = [start_price]
for r_ in test_preds:
predicted_prices.append(predicted_prices[-1] * np.exp(r_))
predicted_prices = np.array(predicted_prices[1:])
# mae = mean_absolute_error(actual_prices, predicted_prices)
r2 = r2_score(actual_prices, predicted_prices)
direction_actual = np.sign(np.diff(actual_prices))
direction_pred = np.sign(np.diff(predicted_prices))
directional_accuracy = (direction_actual == direction_pred).mean()
mape = np.mean(np.abs((actual_prices - predicted_prices) / actual_prices)) * 100
# Save results to CSV for all features used in this run
feature_importance_dict = model.get_feature_importance(feature_cols)
with open(results_csv, 'a', newline='') as f:
writer = csv.writer(f)
for feature in feature_cols:
importance = feature_importance_dict.get(feature, 0.0)
fi_str = format(importance, ".6f")
row = [feature]
for val in [rmse, mape, r2, directional_accuracy]:
if isinstance(val, float):
row.append(format(val, '.10f'))
else:
row.append(val)
row.append(fi_str)
writer.writerow(row)
print('Feature importances and results saved for all features used in this run.')
# Plotting for this run
# plot_prefix = f'cumulative_{n}_features'
# plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix)
# plot_direction_transition_heatmap(actual_prices, predicted_prices, prefix=plot_prefix)
except Exception as e:
print(f'Cumulative feature run failed: {e}')
print(f'All cumulative feature runs completed. Results saved to {results_csv}')
plot_prefix = f'all_features'
plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix)
sys.exit(0)

View File

@@ -0,0 +1,14 @@
"""OHLCV Predictor package."""
__all__ = [
"config",
"data",
"preprocess",
"selection",
"metrics",
"model",
"pipeline",
"cli",
]

29
ohlcvpredictor/cli.py Normal file
View File

@@ -0,0 +1,29 @@
import argparse
from .config import RunConfig, DataConfig
from .pipeline import run_pipeline
def build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="OHLCV Predictor Pipeline")
p.add_argument("--csv", dest="csv_path", required=False, default="../data/btcusd_1-min_data.csv")
p.add_argument("--min-date", dest="min_date", required=False, default="2017-06-01")
p.add_argument("--max-date", dest="max_date", required=False, default=None)
return p
def main() -> None:
parser = build_arg_parser()
args = parser.parse_args()
run_cfg = RunConfig(
data=DataConfig(csv_path=args.csv_path, min_date=args.min_date, max_date=args.max_date)
)
metrics = run_pipeline(run_cfg)
print(
f"RMSE={metrics['rmse']:.6f}, MAPE={metrics['mape']:.4f}%, R2={metrics['r2']:.6f}, DirAcc={metrics['directional_accuracy']:.4f}"
)
if __name__ == "__main__":
main()

65
ohlcvpredictor/config.py Normal file
View File

@@ -0,0 +1,65 @@
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class DataConfig:
"""Configuration for data loading and basic filtering."""
csv_path: str
min_date: str = "2017-06-01"
max_date: Optional[str] = None
drop_volume_zero: bool = True
@dataclass
class FeatureConfig:
"""Configuration for feature engineering."""
ohlcv_cols: List[str] = field(default_factory=lambda: ["Open", "High", "Low", "Close", "Volume"])
lags: int = 3
window_sizes: List[int] = field(default_factory=lambda: [5, 15, 30])
@dataclass
class PreprocessConfig:
"""Configuration for preprocessing and NaN handling."""
impute_nans: bool = True
@dataclass
class PruningConfig:
"""Configuration for feature pruning and CV."""
do_walk_forward_cv: bool = True
n_splits: int = 5
auto_prune: bool = True
top_k: int = 150
known_low_features: List[str] = field(
default_factory=lambda: [
"supertrend_12_3.0",
"supertrend_10_1.0",
"supertrend_11_2.0",
"supertrend_trend_12_3.0",
"supertrend_trend_10_1.0",
"supertrend_trend_11_2.0",
"hour",
]
)
@dataclass
class OutputConfig:
"""Configuration for outputs and artifacts."""
charts_dir: str = "charts"
results_csv: str = "../data/cumulative_feature_results.csv"
model_output_path: str = "../data/xgboost_model_all_features.json"
@dataclass
class RunConfig:
"""Top-level configuration grouping for a pipeline run."""
data: DataConfig
features: FeatureConfig = field(default_factory=FeatureConfig)
preprocess: PreprocessConfig = field(default_factory=PreprocessConfig)
pruning: PruningConfig = field(default_factory=PruningConfig)
output: OutputConfig = field(default_factory=OutputConfig)

38
ohlcvpredictor/data.py Normal file
View File

@@ -0,0 +1,38 @@
from typing import Tuple
import os
import pandas as pd
import numpy as np
from .config import DataConfig
def load_and_filter_data(cfg: DataConfig) -> pd.DataFrame:
"""Load CSV, filter, and convert timestamp.
- Reads the CSV at cfg.csv_path
- Drops rows with Volume == 0 if configured
- Converts 'Timestamp' from seconds to datetime and filters by cfg.min_date
- Adds 'log_return' target column
"""
if not os.path.exists(cfg.csv_path):
raise FileNotFoundError(f"CSV not found: {cfg.csv_path}")
df = pd.read_csv(cfg.csv_path)
if cfg.drop_volume_zero and 'Volume' in df.columns:
df = df[df['Volume'] != 0]
if 'Timestamp' not in df.columns:
raise ValueError("Expected 'Timestamp' column in input CSV")
df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
df = df[df['Timestamp'] >= cfg.min_date]
if cfg.max_date:
df = df[df['Timestamp'] <= cfg.max_date]
if 'Close' not in df.columns:
raise ValueError("Expected 'Close' column in input CSV")
df['log_return'] = np.log(df['Close'] / df['Close'].shift(1))
return df

26
ohlcvpredictor/metrics.py Normal file
View File

@@ -0,0 +1,26 @@
from typing import Dict, Tuple
import numpy as np
from sklearn.metrics import mean_squared_error, r2_score
def compute_price_series_from_log_returns(start_price: float, log_returns: np.ndarray) -> np.ndarray:
"""Reconstruct price series from log returns starting at start_price."""
prices = [start_price]
for r in log_returns:
prices.append(prices[-1] * float(np.exp(r)))
return np.asarray(prices[1:])
def compute_metrics_from_prices(actual_prices: np.ndarray, predicted_prices: np.ndarray) -> Dict[str, float]:
"""Compute RMSE, MAPE, R2, and directional accuracy given price series."""
rmse = float(np.sqrt(mean_squared_error(actual_prices, predicted_prices)))
with np.errstate(divide='ignore', invalid='ignore'):
mape_arr = np.abs((actual_prices - predicted_prices) / np.where(actual_prices == 0, np.nan, actual_prices))
mape = float(np.nanmean(mape_arr) * 100.0)
r2 = float(r2_score(actual_prices, predicted_prices))
direction_actual = np.sign(np.diff(actual_prices))
direction_pred = np.sign(np.diff(predicted_prices))
dir_acc = float((direction_actual == direction_pred).mean()) if len(direction_actual) > 0 else 0.0
return {"rmse": rmse, "mape": mape, "r2": r2, "directional_accuracy": dir_acc}

28
ohlcvpredictor/model.py Normal file
View File

@@ -0,0 +1,28 @@
from typing import Dict, List, Tuple
import numpy as np
from custom_xgboost import CustomXGBoostGPU
def train_model(
X_train: np.ndarray,
X_test: np.ndarray,
y_train: np.ndarray,
y_test: np.ndarray,
eval_metric: str = 'rmse',
):
"""Train the XGBoost model and return the fitted wrapper."""
model = CustomXGBoostGPU(X_train, X_test, y_train, y_test)
model.train(eval_metric=eval_metric)
return model
def predict(model: CustomXGBoostGPU, X: np.ndarray) -> np.ndarray:
"""Predict using the trained model."""
return model.predict(X)
def get_feature_importance(model: CustomXGBoostGPU, feature_names: List[str]) -> Dict[str, float]:
return model.get_feature_importance(feature_names)

125
ohlcvpredictor/pipeline.py Normal file
View File

@@ -0,0 +1,125 @@
from __future__ import annotations
from typing import Dict, List, Tuple
import os
import csv
import json
import numpy as np
import pandas as pd
from .config import RunConfig
from .data import load_and_filter_data
from .preprocess import add_basic_time_features, downcast_numeric_columns, handle_nans
from .selection import build_feature_list, prune_features
from .model import train_model, predict, get_feature_importance
from .metrics import compute_price_series_from_log_returns, compute_metrics_from_prices
from evaluation import walk_forward_cv
from feature_engineering import feature_engineering
from plot_results import plot_prediction_error_distribution
def ensure_charts_dir(path: str) -> None:
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
def run_pipeline(cfg: RunConfig) -> Dict[str, float]:
# Setup outputs
ensure_charts_dir(cfg.output.charts_dir)
# Load and target
df = load_and_filter_data(cfg.data)
# Features
features_dict = feature_engineering(
df,
os.path.splitext(os.path.basename(cfg.data.csv_path))[0],
cfg.features.ohlcv_cols,
cfg.features.lags,
cfg.features.window_sizes,
)
features_df = pd.DataFrame(features_dict)
df = pd.concat([df, features_df], axis=1)
# Preprocess
df = downcast_numeric_columns(df)
df = add_basic_time_features(df)
df = handle_nans(df, cfg.preprocess)
# Feature selection and pruning
feature_cols = build_feature_list(df.columns)
X = df[feature_cols].values.astype(np.float32)
y = df["log_return"].values.astype(np.float32)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
importance_avg = None
if cfg.pruning.do_walk_forward_cv:
metrics_avg, importance_avg = walk_forward_cv(X, y, feature_cols, n_splits=cfg.pruning.n_splits)
# Optional: you may log or return metrics_avg
kept_feature_cols = prune_features(feature_cols, importance_avg, cfg.pruning) if cfg.pruning.auto_prune else feature_cols
# Train model
model = train_model(
df[kept_feature_cols].values.astype(np.float32)[:split_idx],
df[kept_feature_cols].values.astype(np.float32)[split_idx:],
y[:split_idx],
y[split_idx:],
eval_metric='rmse',
)
# Save model
model.save_model(cfg.output.model_output_path)
# Persist the exact feature list used for training next to the model
try:
features_path = os.path.splitext(cfg.output.model_output_path)[0] + "_features.json"
with open(features_path, "w") as f:
json.dump({"feature_names": kept_feature_cols}, f)
except Exception:
# Feature list persistence is optional; avoid breaking the run on failure
pass
# Predict
X_test_kept = df[kept_feature_cols].values.astype(np.float32)[split_idx:]
test_preds = predict(model, X_test_kept)
# Reconstruct price series
close_prices = df['Close'].values
start_price = close_prices[split_idx]
actual_prices = compute_price_series_from_log_returns(start_price, y_test)
predicted_prices = compute_price_series_from_log_returns(start_price, test_preds)
# Metrics
metrics = compute_metrics_from_prices(actual_prices, predicted_prices)
# Plot prediction error distribution to charts dir (parity with previous behavior)
try:
plot_prediction_error_distribution(predicted_prices, actual_prices, prefix="all_features")
except Exception:
# plotting is optional; ignore failures in headless environments
pass
# Persist per-feature metrics and importances
feat_importance = get_feature_importance(model, kept_feature_cols)
if not os.path.exists(cfg.output.results_csv):
with open(cfg.output.results_csv, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['feature', 'rmse', 'mape', 'r2', 'directional_accuracy', 'feature_importance'])
with open(cfg.output.results_csv, 'a', newline='') as f:
writer = csv.writer(f)
for feature in kept_feature_cols:
importance = feat_importance.get(feature, 0.0)
row = [feature]
for key in ['rmse', 'mape', 'r2', 'directional_accuracy']:
val = metrics[key]
row.append(f"{val:.10f}")
row.append(f"{importance:.6f}")
writer.writerow(row)
return metrics

View File

@@ -0,0 +1,39 @@
from typing import List
import pandas as pd
import numpy as np
from .config import PreprocessConfig
def add_basic_time_features(df: pd.DataFrame) -> pd.DataFrame:
"""Add basic time features such as hour-of-day."""
df = df.copy()
df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
df['hour'] = df['Timestamp'].dt.hour
return df
def downcast_numeric_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Downcast numeric columns to save memory."""
df = df.copy()
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col], downcast='float')
except Exception:
# ignore non-numeric columns
pass
return df
def handle_nans(df: pd.DataFrame, cfg: PreprocessConfig) -> pd.DataFrame:
"""Impute NaNs (mean) or drop rows, based on config."""
df = df.copy()
if cfg.impute_nans:
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df[col] = df[col].fillna(df[col].mean())
else:
df = df.dropna().reset_index(drop=True)
return df

View File

@@ -0,0 +1,59 @@
from typing import Dict, Iterable, List, Sequence, Set, Tuple
import numpy as np
from .config import PruningConfig
EXCLUDE_BASE_FEATURES: List[str] = [
'Timestamp', 'Close',
'log_return_5', 'volatility_5', 'volatility_15', 'volatility_30',
'bb_bbm', 'bb_bbh', 'bb_bbl', 'stoch_k', 'sma_50', 'sma_200', 'psar',
'donchian_hband', 'donchian_lband', 'donchian_mband', 'keltner_hband', 'keltner_lband',
'keltner_mband', 'ichimoku_a', 'ichimoku_b', 'ichimoku_base_line', 'ichimoku_conversion_line',
'Open_lag1', 'Open_lag2', 'Open_lag3', 'High_lag1', 'High_lag2', 'High_lag3', 'Low_lag1', 'Low_lag2',
'Low_lag3', 'Close_lag1', 'Close_lag2', 'Close_lag3', 'Open_roll_mean_15', 'Open_roll_std_15', 'Open_roll_min_15',
'Open_roll_max_15', 'Open_roll_mean_30', 'Open_roll_min_30', 'Open_roll_max_30', 'High_roll_mean_15', 'High_roll_std_15',
'High_roll_min_15', 'High_roll_max_15', 'Low_roll_mean_5', 'Low_roll_min_5', 'Low_roll_max_5', 'Low_roll_mean_30',
'Low_roll_std_30', 'Low_roll_min_30', 'Low_roll_max_30', 'Close_roll_mean_5', 'Close_roll_min_5', 'Close_roll_max_5',
'Close_roll_mean_15', 'Close_roll_std_15', 'Close_roll_min_15', 'Close_roll_max_15', 'Close_roll_mean_30',
'Close_roll_std_30', 'Close_roll_min_30', 'Close_roll_max_30', 'Volume_roll_max_5', 'Volume_roll_max_15',
'Volume_roll_max_30', 'supertrend_12_3.0', 'supertrend_10_1.0', 'supertrend_11_2.0',
]
def build_feature_list(all_columns: Sequence[str]) -> List[str]:
"""Return the model feature list by excluding base columns and targets."""
return [col for col in all_columns if col not in EXCLUDE_BASE_FEATURES]
def prune_features(
feature_cols: Sequence[str],
importance_avg: Dict[str, float] | None,
cfg: PruningConfig,
) -> List[str]:
"""Decide which features to keep using averaged importances and rules."""
prune_set: Set[str] = set()
if importance_avg is not None:
sorted_feats = sorted(importance_avg.items(), key=lambda kv: kv[1], reverse=True)
keep_names = set(name for name, _ in sorted_feats[: cfg.top_k])
for name in feature_cols:
if name not in keep_names:
prune_set.add(name)
for name in cfg.known_low_features:
if name in feature_cols:
prune_set.add(name)
# If Parkinson vol exists, drop alternatives at same window
for w in [5, 15, 30]:
park = f'park_vol_{w}'
if park in feature_cols:
for alt in [f'gk_vol_{w}', f'rs_vol_{w}', f'yz_vol_{w}']:
if alt in feature_cols:
prune_set.add(alt)
kept = [c for c in feature_cols if c not in prune_set]
return kept

View File

@@ -1,6 +1,7 @@
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import os import os
import json
try: try:
from .custom_xgboost import CustomXGBoostGPU from .custom_xgboost import CustomXGBoostGPU
@@ -19,6 +20,21 @@ class OHLCVPredictor:
self.model = CustomXGBoostGPU.load_model(model_path) self.model = CustomXGBoostGPU.load_model(model_path)
self.exclude_cols = self._get_excluded_features() self.exclude_cols = self._get_excluded_features()
self._feature_names = self._load_trained_feature_names(model_path)
def _load_trained_feature_names(self, model_path: str):
"""Load the exact feature list saved during training, if present."""
try:
features_path = os.path.splitext(model_path)[0] + "_features.json"
if os.path.exists(features_path):
with open(features_path, "r") as f:
data = json.load(f)
names = data.get("feature_names")
if isinstance(names, list) and all(isinstance(x, str) for x in names):
return names
except Exception:
pass
return None
def _get_excluded_features(self): def _get_excluded_features(self):
"""Get the list of features to exclude (copied from main.py)""" """Get the list of features to exclude (copied from main.py)"""
@@ -80,6 +96,13 @@ class OHLCVPredictor:
df = df.copy() df = df.copy()
# Select features and predict # Select features and predict
if self._feature_names is not None:
# Use the exact training feature names and order
missing = [c for c in self._feature_names if c not in df.columns]
if missing:
raise ValueError(f"Input is missing required trained features: {missing[:10]}{'...' if len(missing)>10 else ''}")
feature_cols = self._feature_names
else:
feature_cols = [col for col in df.columns if col not in self.exclude_cols] feature_cols = [col for col in df.columns if col not in self.exclude_cols]
X = df[feature_cols].values.astype(np.float32) X = df[feature_cols].values.astype(np.float32)
return self.model.predict(X) return self.model.predict(X)