OHLCVPredictor/predictor.py

120 lines
5.6 KiB
Python

import pandas as pd
import numpy as np
import os
import json
try:
from .custom_xgboost import CustomXGBoostGPU
except ImportError:
from custom_xgboost import CustomXGBoostGPU
try:
from .feature_engineering import feature_engineering
except ImportError:
from feature_engineering import feature_engineering
class OHLCVPredictor:
def __init__(self, model_path):
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
self.model = CustomXGBoostGPU.load_model(model_path)
self.exclude_cols = self._get_excluded_features()
self._feature_names = self._load_trained_feature_names(model_path)
def _load_trained_feature_names(self, model_path: str):
"""Load the exact feature list saved during training, if present."""
try:
features_path = os.path.splitext(model_path)[0] + "_features.json"
if os.path.exists(features_path):
with open(features_path, "r") as f:
data = json.load(f)
names = data.get("feature_names")
if isinstance(names, list) and all(isinstance(x, str) for x in names):
return names
except Exception:
pass
return None
def _get_excluded_features(self):
"""Get the list of features to exclude (copied from main.py)"""
exclude_cols = ['Timestamp', 'Close']
exclude_cols += ['log_return_5', 'volatility_5', 'volatility_15', 'volatility_30']
exclude_cols += ['bb_bbm', 'bb_bbh', 'bb_bbl', 'stoch_k', 'sma_50', 'sma_200', 'psar',
'donchian_hband', 'donchian_lband', 'donchian_mband', 'keltner_hband', 'keltner_lband',
'keltner_mband', 'ichimoku_a', 'ichimoku_b', 'ichimoku_base_line', 'ichimoku_conversion_line',
'Open_lag1', 'Open_lag2', 'Open_lag3', 'High_lag1', 'High_lag2', 'High_lag3', 'Low_lag1', 'Low_lag2',
'Low_lag3', 'Close_lag1', 'Close_lag2', 'Close_lag3', 'Open_roll_mean_15', 'Open_roll_std_15', 'Open_roll_min_15',
'Open_roll_max_15', 'Open_roll_mean_30', 'Open_roll_min_30', 'Open_roll_max_30', 'High_roll_mean_15', 'High_roll_std_15',
'High_roll_min_15', 'High_roll_max_15', 'Low_roll_mean_5', 'Low_roll_min_5', 'Low_roll_max_5', 'Low_roll_mean_30',
'Low_roll_std_30', 'Low_roll_min_30', 'Low_roll_max_30', 'Close_roll_mean_5', 'Close_roll_min_5', 'Close_roll_max_5',
'Close_roll_mean_15', 'Close_roll_std_15', 'Close_roll_min_15', 'Close_roll_max_15', 'Close_roll_mean_30',
'Close_roll_std_30', 'Close_roll_min_30', 'Close_roll_max_30', 'Volume_roll_max_5', 'Volume_roll_max_15',
'Volume_roll_max_30', 'supertrend_12_3.0', 'supertrend_10_1.0', 'supertrend_11_2.0']
return exclude_cols
def predict(self, df, csv_prefix=None):
# Validate input DataFrame
required_cols = ['Open', 'High', 'Low', 'Close', 'Volume', 'Timestamp']
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
# Make a copy and preprocess
df = df.copy()
df = df[df['Volume'] != 0].reset_index(drop=True)
# Convert timestamps
if df['Timestamp'].dtype == 'object':
df['Timestamp'] = pd.to_datetime(df['Timestamp'])
else:
df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
# Feature engineering
ohlcv_cols = ['Open', 'High', 'Low', 'Close', 'Volume']
features_dict = feature_engineering(df, csv_prefix, ohlcv_cols, 3, [5, 15, 30])
features_df = pd.DataFrame(features_dict)
df = pd.concat([df, features_df], axis=1)
# Downcast and add time features (exclude Timestamp to preserve datetime)
for col in df.columns:
if col != 'Timestamp': # Don't convert Timestamp to numeric
try:
df[col] = pd.to_numeric(df[col], downcast='float')
except Exception:
pass
df['hour'] = df['Timestamp'].dt.hour
# Handle NaNs
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if df[col].isna().any():
df[col] = df[col].fillna(df[col].mean())
# Defragment DataFrame after all columns have been added
df = df.copy()
# Select features and predict
if self._feature_names is not None:
# Use the exact training feature names and order
missing = [c for c in self._feature_names if c not in df.columns]
if missing:
raise ValueError(f"Input is missing required trained features: {missing[:10]}{'...' if len(missing)>10 else ''}")
feature_cols = self._feature_names
else:
feature_cols = [col for col in df.columns if col not in self.exclude_cols]
X = df[feature_cols].values.astype(np.float32)
return self.model.predict(X)
def predict_prices(self, df, csv_prefix=None):
log_return_preds = self.predict(df, csv_prefix)
df_clean = df[df['Volume'] != 0].copy()
close_prices = df_clean['Close'].values
predicted_prices = [close_prices[0]]
for i, log_ret in enumerate(log_return_preds[1:], 1):
if i < len(close_prices):
predicted_prices.append(predicted_prices[-1] * np.exp(log_ret))
return np.array(predicted_prices), close_prices[:len(predicted_prices)]