diff --git a/custom_xgboost.py b/custom_xgboost.py index dd5a58a..4429af1 100644 --- a/custom_xgboost.py +++ b/custom_xgboost.py @@ -15,8 +15,8 @@ class CustomXGBoostGPU: 'tree_method': 'hist', 'device': 'cuda', 'objective': 'reg:squarederror', - 'eval_metric': 'rmse', - 'verbosity': 1, + 'eval_metric': 'mae', + 'verbosity': 0, } params.update(xgb_params) self.params = params # Store params for later access @@ -37,3 +37,12 @@ class CustomXGBoostGPU: if self.model is None: raise ValueError('Model not trained yet.') self.model.save_model(file_path) + + def get_feature_importance(self, feature_names): + if self.model is None: + raise ValueError('Model not trained yet.') + # get_score returns a dict with keys like 'f0', 'f1', ... + score_dict = self.model.get_score(importance_type='weight') + # Map to feature names + importances = [score_dict.get(f'f{i}', 0.0) for i in range(len(feature_names))] + return dict(zip(feature_names, importances)) diff --git a/feature_engineering.py b/feature_engineering.py new file mode 100644 index 0000000..7d1eeb2 --- /dev/null +++ b/feature_engineering.py @@ -0,0 +1,401 @@ +import os +import numpy as np +import pandas as pd +import ta +from technical_indicator_functions import * + +def feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes): + feature_file = f'../data/{csv_prefix}_rsi.npy' + features_dict = {} + + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['rsi'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: rsi') + _, values = calc_rsi(df['Close']) + features_dict['rsi'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # MACD + feature_file = f'../data/{csv_prefix}_macd.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['macd'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: macd') + _, values = calc_macd(df['Close']) + features_dict['macd'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # ATR + feature_file = f'../data/{csv_prefix}_atr.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['atr'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: atr') + _, values = calc_atr(df['High'], df['Low'], df['Close']) + features_dict['atr'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # CCI + feature_file = f'../data/{csv_prefix}_cci.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['cci'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: cci') + _, values = calc_cci(df['High'], df['Low'], df['Close']) + features_dict['cci'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # Williams %R + feature_file = f'../data/{csv_prefix}_williams_r.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['williams_r'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: williams_r') + _, values = calc_williamsr(df['High'], df['Low'], df['Close']) + features_dict['williams_r'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # EMA 14 + feature_file = f'../data/{csv_prefix}_ema_14.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['ema_14'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: ema_14') + _, values = calc_ema(df['Close']) + features_dict['ema_14'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # OBV + feature_file = f'../data/{csv_prefix}_obv.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['obv'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: obv') + _, values = calc_obv(df['Close'], df['Volume']) + features_dict['obv'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # CMF + feature_file = f'../data/{csv_prefix}_cmf.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['cmf'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: cmf') + _, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume']) + features_dict['cmf'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # ROC 10 + feature_file = f'../data/{csv_prefix}_roc_10.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['roc_10'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: roc_10') + _, values = calc_roc(df['Close']) + features_dict['roc_10'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # DPO 20 + feature_file = f'../data/{csv_prefix}_dpo_20.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['dpo_20'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: dpo_20') + _, values = calc_dpo(df['Close']) + features_dict['dpo_20'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # Ultimate Oscillator + feature_file = f'../data/{csv_prefix}_ultimate_osc.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['ultimate_osc'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: ultimate_osc') + _, values = calc_ultimate(df['High'], df['Low'], df['Close']) + features_dict['ultimate_osc'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # Daily Return + feature_file = f'../data/{csv_prefix}_daily_return.npy' + if os.path.exists(feature_file): + print(f'A Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['daily_return'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: daily_return') + _, values = calc_daily_return(df['Close']) + features_dict['daily_return'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # Multi-column indicators + # Bollinger Bands + print('Calculating multi-column indicator: bollinger') + result = calc_bollinger(df['Close']) + for subname, values in result: + print(f"Adding subfeature: {subname}") + sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' + if os.path.exists(sub_feature_file): + print(f'B Loading cached feature: {sub_feature_file}') + arr = np.load(sub_feature_file) + features_dict[subname] = pd.Series(arr, index=df.index) + else: + features_dict[subname] = values + np.save(sub_feature_file, values.values) + print(f'Saved feature: {sub_feature_file}') + + # Stochastic Oscillator + print('Calculating multi-column indicator: stochastic') + result = calc_stochastic(df['High'], df['Low'], df['Close']) + for subname, values in result: + print(f"Adding subfeature: {subname}") + sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' + if os.path.exists(sub_feature_file): + print(f'B Loading cached feature: {sub_feature_file}') + arr = np.load(sub_feature_file) + features_dict[subname] = pd.Series(arr, index=df.index) + else: + features_dict[subname] = values + np.save(sub_feature_file, values.values) + print(f'Saved feature: {sub_feature_file}') + + # SMA + print('Calculating multi-column indicator: sma') + result = calc_sma(df['Close']) + for subname, values in result: + print(f"Adding subfeature: {subname}") + sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' + if os.path.exists(sub_feature_file): + print(f'B Loading cached feature: {sub_feature_file}') + arr = np.load(sub_feature_file) + features_dict[subname] = pd.Series(arr, index=df.index) + else: + features_dict[subname] = values + np.save(sub_feature_file, values.values) + print(f'Saved feature: {sub_feature_file}') + + # PSAR + print('Calculating multi-column indicator: psar') + result = calc_psar(df['High'], df['Low'], df['Close']) + for subname, values in result: + print(f"Adding subfeature: {subname}") + sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' + if os.path.exists(sub_feature_file): + print(f'B Loading cached feature: {sub_feature_file}') + arr = np.load(sub_feature_file) + features_dict[subname] = pd.Series(arr, index=df.index) + else: + features_dict[subname] = values + np.save(sub_feature_file, values.values) + print(f'Saved feature: {sub_feature_file}') + + # Donchian Channel + print('Calculating multi-column indicator: donchian') + result = calc_donchian(df['High'], df['Low'], df['Close']) + for subname, values in result: + print(f"Adding subfeature: {subname}") + sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' + if os.path.exists(sub_feature_file): + print(f'B Loading cached feature: {sub_feature_file}') + arr = np.load(sub_feature_file) + features_dict[subname] = pd.Series(arr, index=df.index) + else: + features_dict[subname] = values + np.save(sub_feature_file, values.values) + print(f'Saved feature: {sub_feature_file}') + + # Keltner Channel + print('Calculating multi-column indicator: keltner') + result = calc_keltner(df['High'], df['Low'], df['Close']) + for subname, values in result: + print(f"Adding subfeature: {subname}") + sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' + if os.path.exists(sub_feature_file): + print(f'B Loading cached feature: {sub_feature_file}') + arr = np.load(sub_feature_file) + features_dict[subname] = pd.Series(arr, index=df.index) + else: + features_dict[subname] = values + np.save(sub_feature_file, values.values) + print(f'Saved feature: {sub_feature_file}') + + # Ichimoku + print('Calculating multi-column indicator: ichimoku') + result = calc_ichimoku(df['High'], df['Low']) + for subname, values in result: + print(f"Adding subfeature: {subname}") + sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' + if os.path.exists(sub_feature_file): + print(f'B Loading cached feature: {sub_feature_file}') + arr = np.load(sub_feature_file) + features_dict[subname] = pd.Series(arr, index=df.index) + else: + features_dict[subname] = values + np.save(sub_feature_file, values.values) + print(f'Saved feature: {sub_feature_file}') + + # Elder Ray + print('Calculating multi-column indicator: elder_ray') + result = calc_elder_ray(df['Close'], df['Low'], df['High']) + for subname, values in result: + print(f"Adding subfeature: {subname}") + sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' + if os.path.exists(sub_feature_file): + print(f'B Loading cached feature: {sub_feature_file}') + arr = np.load(sub_feature_file) + features_dict[subname] = pd.Series(arr, index=df.index) + else: + features_dict[subname] = values + np.save(sub_feature_file, values.values) + print(f'Saved feature: {sub_feature_file}') + + # Prepare lags, rolling stats, log returns, and volatility features sequentially + # Lags + for col in ohlcv_cols: + for lag in range(1, lags + 1): + feature_name = f'{col}_lag{lag}' + feature_file = f'../data/{csv_prefix}_{feature_name}.npy' + if os.path.exists(feature_file): + print(f'C Loading cached feature: {feature_file}') + features_dict[feature_name] = np.load(feature_file) + else: + print(f'Computing lag feature: {feature_name}') + result = compute_lag(df, col, lag) + features_dict[feature_name] = result + np.save(feature_file, result.values) + print(f'Saved feature: {feature_file}') + # Rolling statistics + for col in ohlcv_cols: + for window in window_sizes: + if (col == 'Open' and window == 5): + continue + if (col == 'High' and window == 5): + continue + if (col == 'High' and window == 30): + continue + if (col == 'Low' and window == 15): + continue + for stat in ['mean', 'std', 'min', 'max']: + feature_name = f'{col}_roll_{stat}_{window}' + feature_file = f'../data/{csv_prefix}_{feature_name}.npy' + if os.path.exists(feature_file): + print(f'D Loading cached feature: {feature_file}') + features_dict[feature_name] = np.load(feature_file) + else: + print(f'Computing rolling stat feature: {feature_name}') + result = compute_rolling(df, col, stat, window) + features_dict[feature_name] = result + np.save(feature_file, result.values) + print(f'Saved feature: {feature_file}') + # Log returns for different horizons + for horizon in [5, 15, 30]: + feature_name = f'log_return_{horizon}' + feature_file = f'../data/{csv_prefix}_{feature_name}.npy' + if os.path.exists(feature_file): + print(f'E Loading cached feature: {feature_file}') + features_dict[feature_name] = np.load(feature_file) + else: + print(f'Computing log return feature: {feature_name}') + result = compute_log_return(df, horizon) + features_dict[feature_name] = result + np.save(feature_file, result.values) + print(f'Saved feature: {feature_file}') + # Volatility + for window in window_sizes: + feature_name = f'volatility_{window}' + feature_file = f'../data/{csv_prefix}_{feature_name}.npy' + if os.path.exists(feature_file): + print(f'F Loading cached feature: {feature_file}') + features_dict[feature_name] = np.load(feature_file) + else: + print(f'Computing volatility feature: {feature_name}') + result = compute_volatility(df, window) + features_dict[feature_name] = result + np.save(feature_file, result.values) + print(f'Saved feature: {feature_file}') + + # --- Additional Technical Indicator Features --- + # ADX + adx_names = ['adx', 'adx_pos', 'adx_neg'] + adx_files = [f'../data/{csv_prefix}_{name}.npy' for name in adx_names] + if all(os.path.exists(f) for f in adx_files): + print('G Loading cached features: ADX') + for name, f in zip(adx_names, adx_files): + arr = np.load(f) + features_dict[name] = pd.Series(arr, index=df.index) + else: + print('Calculating multi-column indicator: adx') + result = calc_adx(df['High'], df['Low'], df['Close']) + for subname, values in result: + sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' + features_dict[subname] = values + np.save(sub_feature_file, values.values) + print(f'Saved feature: {sub_feature_file}') + + # Force Index + feature_file = f'../data/{csv_prefix}_force_index.npy' + if os.path.exists(feature_file): + print(f'K Loading cached feature: {feature_file}') + arr = np.load(feature_file) + features_dict['force_index'] = pd.Series(arr, index=df.index) + else: + print('Calculating feature: force_index') + _, values = calc_force_index(df['Close'], df['Volume']) + features_dict['force_index'] = values + np.save(feature_file, values.values) + print(f'Saved feature: {feature_file}') + + # Supertrend indicators + for period, multiplier in [(12, 3.0), (10, 1.0), (11, 2.0)]: + st_name = f'supertrend_{period}_{multiplier}' + st_trend_name = f'supertrend_trend_{period}_{multiplier}' + st_file = f'../data/{csv_prefix}_{st_name}.npy' + st_trend_file = f'../data/{csv_prefix}_{st_trend_name}.npy' + if os.path.exists(st_file) and os.path.exists(st_trend_file): + print(f'L Loading cached features: {st_file}, {st_trend_file}') + features_dict[st_name] = pd.Series(np.load(st_file), index=df.index) + features_dict[st_trend_name] = pd.Series(np.load(st_trend_file), index=df.index) + else: + print(f'Calculating Supertrend indicator: {st_name}') + st = ta.supertrend(df['High'], df['Low'], df['Close'], length=period, multiplier=multiplier) + features_dict[st_name] = st[f'SUPERT_{period}_{multiplier}'] + features_dict[st_trend_name] = st[f'SUPERTd_{period}_{multiplier}'] + np.save(st_file, features_dict[st_name].values) + np.save(st_trend_file, features_dict[st_trend_name].values) + print(f'Saved features: {st_file}, {st_trend_file}') + + return features_dict diff --git a/main.py b/main.py index bebf09b..9cda3e1 100644 --- a/main.py +++ b/main.py @@ -9,7 +9,13 @@ from plot_results import plot_prediction_error_distribution, plot_direction_tran import time from numba import njit import csv -import ta +import pandas_ta as ta +from feature_engineering import feature_engineering +from sklearn.feature_selection import VarianceThreshold + +charts_dir = 'charts' +if not os.path.exists(charts_dir): + os.makedirs(charts_dir) def run_indicator(func, *args): return func(*args) @@ -24,254 +30,6 @@ def run_indicator_job(job): print(f'Indicator {indicator_name} computed in {elapsed:.4f} seconds') return result -def calc_rsi(close): - from ta.momentum import RSIIndicator - return ('rsi', RSIIndicator(close, window=14).rsi()) - -def calc_macd(close): - from ta.trend import MACD - return ('macd', MACD(close).macd()) - -def calc_bollinger(close): - from ta.volatility import BollingerBands - bb = BollingerBands(close=close, window=20, window_dev=2) - return [ - ('bb_bbm', bb.bollinger_mavg()), - ('bb_bbh', bb.bollinger_hband()), - ('bb_bbl', bb.bollinger_lband()), - ('bb_bb_width', bb.bollinger_hband() - bb.bollinger_lband()) - ] - -def calc_stochastic(high, low, close): - from ta.momentum import StochasticOscillator - stoch = StochasticOscillator(high=high, low=low, close=close, window=14, smooth_window=3) - return [ - ('stoch_k', stoch.stoch()), - ('stoch_d', stoch.stoch_signal()) - ] - -def calc_atr(high, low, close): - from ta.volatility import AverageTrueRange - atr = AverageTrueRange(high=high, low=low, close=close, window=14) - return ('atr', atr.average_true_range()) - -def calc_cci(high, low, close): - from ta.trend import CCIIndicator - cci = CCIIndicator(high=high, low=low, close=close, window=20) - return ('cci', cci.cci()) - -def calc_williamsr(high, low, close): - from ta.momentum import WilliamsRIndicator - willr = WilliamsRIndicator(high=high, low=low, close=close, lbp=14) - return ('williams_r', willr.williams_r()) - -def calc_ema(close): - from ta.trend import EMAIndicator - ema = EMAIndicator(close=close, window=14) - return ('ema_14', ema.ema_indicator()) - -def calc_obv(close, volume): - from ta.volume import OnBalanceVolumeIndicator - obv = OnBalanceVolumeIndicator(close=close, volume=volume) - return ('obv', obv.on_balance_volume()) - -def calc_cmf(high, low, close, volume): - from ta.volume import ChaikinMoneyFlowIndicator - cmf = ChaikinMoneyFlowIndicator(high=high, low=low, close=close, volume=volume, window=20) - return ('cmf', cmf.chaikin_money_flow()) - -def calc_sma(close): - from ta.trend import SMAIndicator - return [ - ('sma_50', SMAIndicator(close, window=50).sma_indicator()), - ('sma_200', SMAIndicator(close, window=200).sma_indicator()) - ] - -def calc_roc(close): - from ta.momentum import ROCIndicator - return ('roc_10', ROCIndicator(close, window=10).roc()) - -def calc_momentum(close): - return ('momentum_10', close - close.shift(10)) - -def calc_psar(high, low, close): - # Use the Numba-accelerated fast_psar function for speed - psar_values = fast_psar(np.array(high), np.array(low), np.array(close)) - return [('psar', pd.Series(psar_values, index=close.index))] - -def calc_donchian(high, low, close): - from ta.volatility import DonchianChannel - donchian = DonchianChannel(high, low, close, window=20) - return [ - ('donchian_hband', donchian.donchian_channel_hband()), - ('donchian_lband', donchian.donchian_channel_lband()), - ('donchian_mband', donchian.donchian_channel_mband()) - ] - -def calc_keltner(high, low, close): - from ta.volatility import KeltnerChannel - keltner = KeltnerChannel(high, low, close, window=20) - return [ - ('keltner_hband', keltner.keltner_channel_hband()), - ('keltner_lband', keltner.keltner_channel_lband()), - ('keltner_mband', keltner.keltner_channel_mband()) - ] - -def calc_dpo(close): - from ta.trend import DPOIndicator - return ('dpo_20', DPOIndicator(close, window=20).dpo()) - -def calc_ultimate(high, low, close): - from ta.momentum import UltimateOscillator - return ('ultimate_osc', UltimateOscillator(high, low, close).ultimate_oscillator()) - -def calc_ichimoku(high, low): - from ta.trend import IchimokuIndicator - ichimoku = IchimokuIndicator(high, low, window1=9, window2=26, window3=52) - return [ - ('ichimoku_a', ichimoku.ichimoku_a()), - ('ichimoku_b', ichimoku.ichimoku_b()), - ('ichimoku_base_line', ichimoku.ichimoku_base_line()), - ('ichimoku_conversion_line', ichimoku.ichimoku_conversion_line()) - ] - -def calc_elder_ray(close, low, high): - from ta.trend import EMAIndicator - ema = EMAIndicator(close, window=13).ema_indicator() - return [ - ('elder_ray_bull', ema - low), - ('elder_ray_bear', ema - high) - ] - -def calc_daily_return(close): - from ta.others import DailyReturnIndicator - return ('daily_return', DailyReturnIndicator(close).daily_return()) - -@njit -def fast_psar(high, low, close, af=0.02, max_af=0.2): - length = len(close) - psar = np.zeros(length) - bull = True - af_step = af - ep = low[0] - psar[0] = low[0] - for i in range(1, length): - prev_psar = psar[i-1] - if bull: - psar[i] = prev_psar + af_step * (ep - prev_psar) - if low[i] < psar[i]: - bull = False - psar[i] = ep - af_step = af - ep = low[i] - else: - if high[i] > ep: - ep = high[i] - af_step = min(af_step + af, max_af) - else: - psar[i] = prev_psar + af_step * (ep - prev_psar) - if high[i] > psar[i]: - bull = True - psar[i] = ep - af_step = af - ep = high[i] - else: - if low[i] < ep: - ep = low[i] - af_step = min(af_step + af, max_af) - return psar - -def compute_lag(df, col, lag): - return df[col].shift(lag) - -def compute_rolling(df, col, stat, window): - if stat == 'mean': - return df[col].rolling(window).mean() - elif stat == 'std': - return df[col].rolling(window).std() - elif stat == 'min': - return df[col].rolling(window).min() - elif stat == 'max': - return df[col].rolling(window).max() - -def compute_log_return(df, horizon): - return np.log(df['Close'] / df['Close'].shift(horizon)) - -def compute_volatility(df, window): - return df['log_return'].rolling(window).std() - -def run_feature_job(job, df): - feature_name, func, *args = job - print(f'Computing feature: {feature_name}') - result = func(df, *args) - return feature_name, result - -def calc_adx(high, low, close): - from ta.trend import ADXIndicator - adx = ADXIndicator(high=high, low=low, close=close, window=14) - return [ - ('adx', adx.adx()), - ('adx_pos', adx.adx_pos()), - ('adx_neg', adx.adx_neg()) - ] - -def calc_trix(close): - from ta.trend import TRIXIndicator - trix = TRIXIndicator(close=close, window=15) - return ('trix', trix.trix()) - -def calc_vortex(high, low, close): - from ta.trend import VortexIndicator - vortex = VortexIndicator(high=high, low=low, close=close, window=14) - return [ - ('vortex_pos', vortex.vortex_indicator_pos()), - ('vortex_neg', vortex.vortex_indicator_neg()) - ] - -def calc_kama(close): - import pandas_ta as ta - kama = ta.kama(close, length=10) - return ('kama', kama) - -def calc_force_index(close, volume): - from ta.volume import ForceIndexIndicator - fi = ForceIndexIndicator(close=close, volume=volume, window=13) - return ('force_index', fi.force_index()) - -def calc_eom(high, low, volume): - from ta.volume import EaseOfMovementIndicator - eom = EaseOfMovementIndicator(high=high, low=low, volume=volume, window=14) - return ('eom', eom.ease_of_movement()) - -def calc_mfi(high, low, close, volume): - from ta.volume import MFIIndicator - mfi = MFIIndicator(high=high, low=low, close=close, volume=volume, window=14) - return ('mfi', mfi.money_flow_index()) - -def calc_adi(high, low, close, volume): - from ta.volume import AccDistIndexIndicator - adi = AccDistIndexIndicator(high=high, low=low, close=close, volume=volume) - return ('adi', adi.acc_dist_index()) - -def calc_tema(close): - import pandas_ta as ta - tema = ta.tema(close, length=10) - return ('tema', tema) - -def calc_stochrsi(close): - from ta.momentum import StochRSIIndicator - stochrsi = StochRSIIndicator(close=close, window=14, smooth1=3, smooth2=3) - return [ - ('stochrsi', stochrsi.stochrsi()), - ('stochrsi_k', stochrsi.stochrsi_k()), - ('stochrsi_d', stochrsi.stochrsi_d()) - ] - -def calc_awesome_oscillator(high, low): - from ta.momentum import AwesomeOscillatorIndicator - ao = AwesomeOscillatorIndicator(high=high, low=low, window1=5, window2=34) - return ('awesome_osc', ao.awesome_oscillator()) - if __name__ == '__main__': IMPUTE_NANS = True # Set to True to impute NaNs, False to drop rows with NaNs csv_path = '../data/btcusd_1-min_data.csv' @@ -298,412 +56,38 @@ if __name__ == '__main__': print('Starting feature computation...') feature_start_time = time.time() - - # --- Technical Indicator Features: Calculate or Load from Cache --- - print('Calculating or loading technical indicator features...') - # RSI - feature_file = f'../data/{csv_prefix}_rsi.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['rsi'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: rsi') - _, values = calc_rsi(df['Close']) - features_dict['rsi'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # MACD - feature_file = f'../data/{csv_prefix}_macd.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['macd'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: macd') - _, values = calc_macd(df['Close']) - features_dict['macd'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # ATR - feature_file = f'../data/{csv_prefix}_atr.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['atr'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: atr') - _, values = calc_atr(df['High'], df['Low'], df['Close']) - features_dict['atr'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # CCI - feature_file = f'../data/{csv_prefix}_cci.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['cci'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: cci') - _, values = calc_cci(df['High'], df['Low'], df['Close']) - features_dict['cci'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # Williams %R - feature_file = f'../data/{csv_prefix}_williams_r.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['williams_r'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: williams_r') - _, values = calc_williamsr(df['High'], df['Low'], df['Close']) - features_dict['williams_r'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # EMA 14 - feature_file = f'../data/{csv_prefix}_ema_14.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['ema_14'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: ema_14') - _, values = calc_ema(df['Close']) - features_dict['ema_14'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # OBV - feature_file = f'../data/{csv_prefix}_obv.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['obv'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: obv') - _, values = calc_obv(df['Close'], df['Volume']) - features_dict['obv'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # CMF - feature_file = f'../data/{csv_prefix}_cmf.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['cmf'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: cmf') - _, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume']) - features_dict['cmf'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # ROC 10 - feature_file = f'../data/{csv_prefix}_roc_10.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['roc_10'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: roc_10') - _, values = calc_roc(df['Close']) - features_dict['roc_10'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # DPO 20 - feature_file = f'../data/{csv_prefix}_dpo_20.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['dpo_20'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: dpo_20') - _, values = calc_dpo(df['Close']) - features_dict['dpo_20'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # Ultimate Oscillator - feature_file = f'../data/{csv_prefix}_ultimate_osc.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['ultimate_osc'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: ultimate_osc') - _, values = calc_ultimate(df['High'], df['Low'], df['Close']) - features_dict['ultimate_osc'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # Daily Return - feature_file = f'../data/{csv_prefix}_daily_return.npy' - if os.path.exists(feature_file): - print(f'A Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['daily_return'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: daily_return') - _, values = calc_daily_return(df['Close']) - features_dict['daily_return'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # Multi-column indicators - # Bollinger Bands - print('Calculating multi-column indicator: bollinger') - result = calc_bollinger(df['Close']) - for subname, values in result: - print(f"Adding subfeature: {subname}") - sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' - if os.path.exists(sub_feature_file): - print(f'B Loading cached feature: {sub_feature_file}') - arr = np.load(sub_feature_file) - features_dict[subname] = pd.Series(arr, index=df.index) - else: - features_dict[subname] = values - np.save(sub_feature_file, values.values) - print(f'Saved feature: {sub_feature_file}') - - # Stochastic Oscillator - print('Calculating multi-column indicator: stochastic') - result = calc_stochastic(df['High'], df['Low'], df['Close']) - for subname, values in result: - print(f"Adding subfeature: {subname}") - sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' - if os.path.exists(sub_feature_file): - print(f'B Loading cached feature: {sub_feature_file}') - arr = np.load(sub_feature_file) - features_dict[subname] = pd.Series(arr, index=df.index) - else: - features_dict[subname] = values - np.save(sub_feature_file, values.values) - print(f'Saved feature: {sub_feature_file}') - - # SMA - print('Calculating multi-column indicator: sma') - result = calc_sma(df['Close']) - for subname, values in result: - print(f"Adding subfeature: {subname}") - sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' - if os.path.exists(sub_feature_file): - print(f'B Loading cached feature: {sub_feature_file}') - arr = np.load(sub_feature_file) - features_dict[subname] = pd.Series(arr, index=df.index) - else: - features_dict[subname] = values - np.save(sub_feature_file, values.values) - print(f'Saved feature: {sub_feature_file}') - - # PSAR - print('Calculating multi-column indicator: psar') - result = calc_psar(df['High'], df['Low'], df['Close']) - for subname, values in result: - print(f"Adding subfeature: {subname}") - sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' - if os.path.exists(sub_feature_file): - print(f'B Loading cached feature: {sub_feature_file}') - arr = np.load(sub_feature_file) - features_dict[subname] = pd.Series(arr, index=df.index) - else: - features_dict[subname] = values - np.save(sub_feature_file, values.values) - print(f'Saved feature: {sub_feature_file}') - - # Donchian Channel - print('Calculating multi-column indicator: donchian') - result = calc_donchian(df['High'], df['Low'], df['Close']) - for subname, values in result: - print(f"Adding subfeature: {subname}") - sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' - if os.path.exists(sub_feature_file): - print(f'B Loading cached feature: {sub_feature_file}') - arr = np.load(sub_feature_file) - features_dict[subname] = pd.Series(arr, index=df.index) - else: - features_dict[subname] = values - np.save(sub_feature_file, values.values) - print(f'Saved feature: {sub_feature_file}') - - # Keltner Channel - print('Calculating multi-column indicator: keltner') - result = calc_keltner(df['High'], df['Low'], df['Close']) - for subname, values in result: - print(f"Adding subfeature: {subname}") - sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' - if os.path.exists(sub_feature_file): - print(f'B Loading cached feature: {sub_feature_file}') - arr = np.load(sub_feature_file) - features_dict[subname] = pd.Series(arr, index=df.index) - else: - features_dict[subname] = values - np.save(sub_feature_file, values.values) - print(f'Saved feature: {sub_feature_file}') - - # Ichimoku - print('Calculating multi-column indicator: ichimoku') - result = calc_ichimoku(df['High'], df['Low']) - for subname, values in result: - print(f"Adding subfeature: {subname}") - sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' - if os.path.exists(sub_feature_file): - print(f'B Loading cached feature: {sub_feature_file}') - arr = np.load(sub_feature_file) - features_dict[subname] = pd.Series(arr, index=df.index) - else: - features_dict[subname] = values - np.save(sub_feature_file, values.values) - print(f'Saved feature: {sub_feature_file}') - - # Elder Ray - print('Calculating multi-column indicator: elder_ray') - result = calc_elder_ray(df['Close'], df['Low'], df['High']) - for subname, values in result: - print(f"Adding subfeature: {subname}") - sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' - if os.path.exists(sub_feature_file): - print(f'B Loading cached feature: {sub_feature_file}') - arr = np.load(sub_feature_file) - features_dict[subname] = pd.Series(arr, index=df.index) - else: - features_dict[subname] = values - np.save(sub_feature_file, values.values) - print(f'Saved feature: {sub_feature_file}') - - # Prepare lags, rolling stats, log returns, and volatility features sequentially - # Lags - for col in ohlcv_cols: - for lag in range(1, lags + 1): - feature_name = f'{col}_lag{lag}' - feature_file = f'../data/{csv_prefix}_{feature_name}.npy' - if os.path.exists(feature_file): - print(f'C Loading cached feature: {feature_file}') - features_dict[feature_name] = np.load(feature_file) - else: - print(f'Computing lag feature: {feature_name}') - result = compute_lag(df, col, lag) - features_dict[feature_name] = result - np.save(feature_file, result.values) - print(f'Saved feature: {feature_file}') - # Rolling statistics - for col in ohlcv_cols: - for window in window_sizes: - if (col == 'Open' and window == 5): - continue - if (col == 'High' and window == 5): - continue - if (col == 'High' and window == 30): - continue - if (col == 'Low' and window == 15): - continue - for stat in ['mean', 'std', 'min', 'max']: - feature_name = f'{col}_roll_{stat}_{window}' - feature_file = f'../data/{csv_prefix}_{feature_name}.npy' - if os.path.exists(feature_file): - print(f'D Loading cached feature: {feature_file}') - features_dict[feature_name] = np.load(feature_file) - else: - print(f'Computing rolling stat feature: {feature_name}') - result = compute_rolling(df, col, stat, window) - features_dict[feature_name] = result - np.save(feature_file, result.values) - print(f'Saved feature: {feature_file}') - # Log returns for different horizons - for horizon in [5, 15, 30]: - feature_name = f'log_return_{horizon}' - feature_file = f'../data/{csv_prefix}_{feature_name}.npy' - if os.path.exists(feature_file): - print(f'E Loading cached feature: {feature_file}') - features_dict[feature_name] = np.load(feature_file) - else: - print(f'Computing log return feature: {feature_name}') - result = compute_log_return(df, horizon) - features_dict[feature_name] = result - np.save(feature_file, result.values) - print(f'Saved feature: {feature_file}') - # Volatility - for window in window_sizes: - feature_name = f'volatility_{window}' - feature_file = f'../data/{csv_prefix}_{feature_name}.npy' - if os.path.exists(feature_file): - print(f'F Loading cached feature: {feature_file}') - features_dict[feature_name] = np.load(feature_file) - else: - print(f'Computing volatility feature: {feature_name}') - result = compute_volatility(df, window) - features_dict[feature_name] = result - np.save(feature_file, result.values) - print(f'Saved feature: {feature_file}') - - # --- Additional Technical Indicator Features --- - # ADX - adx_names = ['adx', 'adx_pos', 'adx_neg'] - adx_files = [f'../data/{csv_prefix}_{name}.npy' for name in adx_names] - if all(os.path.exists(f) for f in adx_files): - print('G Loading cached features: ADX') - for name, f in zip(adx_names, adx_files): - arr = np.load(f) - features_dict[name] = pd.Series(arr, index=df.index) - else: - print('Calculating multi-column indicator: adx') - result = calc_adx(df['High'], df['Low'], df['Close']) - for subname, values in result: - sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' - features_dict[subname] = values - np.save(sub_feature_file, values.values) - print(f'Saved feature: {sub_feature_file}') - - # Force Index - feature_file = f'../data/{csv_prefix}_force_index.npy' - if os.path.exists(feature_file): - print(f'K Loading cached feature: {feature_file}') - arr = np.load(feature_file) - features_dict['force_index'] = pd.Series(arr, index=df.index) - else: - print('Calculating feature: force_index') - _, values = calc_force_index(df['Close'], df['Volume']) - features_dict['force_index'] = values - np.save(feature_file, values.values) - print(f'Saved feature: {feature_file}') - - # Supertrend indicators - for period, multiplier in [(12, 3.0), (10, 1.0), (11, 2.0)]: - st_name = f'supertrend_{period}_{multiplier}' - st_trend_name = f'supertrend_trend_{period}_{multiplier}' - st_file = f'../data/{csv_prefix}_{st_name}.npy' - st_trend_file = f'../data/{csv_prefix}_{st_trend_name}.npy' - if os.path.exists(st_file) and os.path.exists(st_trend_file): - print(f'L Loading cached features: {st_file}, {st_trend_file}') - features_dict[st_name] = pd.Series(np.load(st_file), index=df.index) - features_dict[st_trend_name] = pd.Series(np.load(st_trend_file), index=df.index) - else: - print(f'Calculating Supertrend indicator: {st_name}') - st = ta.supertrend(df['High'], df['Low'], df['Close'], length=period, multiplier=multiplier) - features_dict[st_name] = st[f'SUPERT_{period}_{multiplier}'] - features_dict[st_trend_name] = st[f'SUPERTd_{period}_{multiplier}'] - np.save(st_file, features_dict[st_name].values) - np.save(st_trend_file, features_dict[st_trend_name].values) - print(f'Saved features: {st_file}, {st_trend_file}') - - # Concatenate all new features at once + features_dict = feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes) print('Concatenating all new features to DataFrame...') + features_df = pd.DataFrame(features_dict) - print("Columns in features_df:", features_df.columns.tolist()) - print("All-NaN columns in features_df:", features_df.columns[features_df.isna().all()].tolist()) df = pd.concat([df, features_df], axis=1) - # Print all columns after concatenation - print("All columns in df after concat:", df.columns.tolist()) + # feature_cols_for_variance = [col for col in features_df.columns if features_df[col].dtype in [np.float32, np.float64, float, int, np.int32, np.int64]] + # if feature_cols_for_variance: + # selector = VarianceThreshold(threshold=1e-5) + # filtered_features = selector.fit_transform(features_df[feature_cols_for_variance]) + # kept_mask = selector.get_support() + # kept_feature_names = [col for col, keep in zip(feature_cols_for_variance, kept_mask) if keep] + # print(f"Features removed by low variance: {[col for col, keep in zip(feature_cols_for_variance, kept_mask) if not keep]}") + # # Only keep the selected features in features_df and df + # features_df = features_df[kept_feature_names] + # for col in feature_cols_for_variance: + # if col not in kept_feature_names: + # df.drop(col, axis=1, inplace=True) + # else: + # print("No numeric features found for variance thresholding.") + + # Remove highly correlated features (keep only one from each correlated group) + # corr_matrix = features_df.corr().abs() + # upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)) + # to_drop = [column for column in upper.columns if any(upper[column] > 0.95)] + # if to_drop: + # print(f"Features removed due to high correlation: {to_drop}") + # features_df = features_df.drop(columns=to_drop) + # df = df.drop(columns=to_drop) + # else: + # print("No highly correlated features found for removal.") - # Downcast all float columns to save memory print('Downcasting float columns to save memory...') for col in df.columns: try: @@ -729,76 +113,158 @@ if __name__ == '__main__': # Exclude 'Timestamp', 'Close', 'log_return', and any future target columns from features print('Selecting feature columns...') - exclude_cols = ['Timestamp', 'Close', 'log_return', 'log_return_5', 'log_return_15', 'log_return_30'] + exclude_cols = ['Timestamp', 'Close'] + exclude_cols += ['log_return_5', 'volatility_5', 'volatility_15', 'volatility_30'] + exclude_cols += ['bb_bbm', 'bb_bbh', 'bb_bbl', 'stoch_k', 'sma_50', 'sma_200', 'psar', + 'donchian_hband', 'donchian_lband', 'donchian_mband', 'keltner_hband', 'keltner_lband', + 'keltner_mband', 'ichimoku_a', 'ichimoku_b', 'ichimoku_base_line', 'ichimoku_conversion_line', + 'Open_lag1', 'Open_lag2', 'Open_lag3', 'High_lag1', 'High_lag2', 'High_lag3', 'Low_lag1', 'Low_lag2', + 'Low_lag3', 'Close_lag1', 'Close_lag2', 'Close_lag3', 'Open_roll_mean_15', 'Open_roll_std_15', 'Open_roll_min_15', + 'Open_roll_max_15', 'Open_roll_mean_30', 'Open_roll_min_30', 'Open_roll_max_30', 'High_roll_mean_15', 'High_roll_std_15', + 'High_roll_min_15', 'High_roll_max_15', 'Low_roll_mean_5', 'Low_roll_min_5', 'Low_roll_max_5', 'Low_roll_mean_30', + 'Low_roll_std_30', 'Low_roll_min_30', 'Low_roll_max_30', 'Close_roll_mean_5', 'Close_roll_min_5', 'Close_roll_max_5', + 'Close_roll_mean_15', 'Close_roll_std_15', 'Close_roll_min_15', 'Close_roll_max_15', 'Close_roll_mean_30', + 'Close_roll_std_30', 'Close_roll_min_30', 'Close_roll_max_30', 'Volume_roll_max_5', 'Volume_roll_max_15', + 'Volume_roll_max_30', 'supertrend_12_3.0', 'supertrend_10_1.0', 'supertrend_11_2.0'] + feature_cols = [col for col in df.columns if col not in exclude_cols] print('Features used for training:', feature_cols) + # from xgboost import XGBRegressor + # from sklearn.model_selection import GridSearchCV + + # # Prepare data for grid search + # X = df[feature_cols].values.astype(np.float32) + # y = df["log_return"].values.astype(np.float32) + # split_idx = int(len(X) * 0.8) + # X_train, X_test = X[:split_idx], X[split_idx:] + # y_train, y_test = y[:split_idx], y[split_idx:] + + # # Define parameter grid + # param_grid = { + # 'learning_rate': [0.01, 0.05, 0.1], + # 'max_depth': [3, 5, 7], + # 'n_estimators': [100, 200], + # 'subsample': [0.8, 1.0], + # 'colsample_bytree': [0.8, 1.0], + # } + + # print('Starting grid search for XGBoost hyperparameters...') + # xgb_model = XGBRegressor(objective='reg:squarederror', tree_method='hist', device='cuda', eval_metric='mae', verbosity=0) + # grid_search = GridSearchCV(xgb_model, param_grid, cv=3, scoring='neg_mean_absolute_error', verbose=2, n_jobs=-1) + # grid_search.fit(X_train, y_train) + # print('Best parameters found:', grid_search.best_params_) + + # # Use best estimator for predictions + # best_model = grid_search.best_estimator_ + # test_preds = best_model.predict(X_test) + # rmse = np.sqrt(mean_squared_error(y_test, test_preds)) + + # # Reconstruct price series from log returns + # if 'Close' in df.columns: + # close_prices = df['Close'].values + # else: + # close_prices = pd.read_csv(csv_path)['Close'].values + # start_price = close_prices[split_idx] + # actual_prices = [start_price] + # for r_ in y_test: + # actual_prices.append(actual_prices[-1] * np.exp(r_)) + # actual_prices = np.array(actual_prices[1:]) + # predicted_prices = [start_price] + # for r_ in test_preds: + # predicted_prices.append(predicted_prices[-1] * np.exp(r_)) + # predicted_prices = np.array(predicted_prices[1:]) + + # mae = mean_absolute_error(actual_prices, predicted_prices) + # r2 = r2_score(actual_prices, predicted_prices) + # direction_actual = np.sign(np.diff(actual_prices)) + # direction_pred = np.sign(np.diff(predicted_prices)) + # directional_accuracy = (direction_actual == direction_pred).mean() + # mape = np.mean(np.abs((actual_prices - predicted_prices) / actual_prices)) * 100 + + # print(f'Grid search results: RMSE={rmse:.4f}, MAE={mae:.4f}, R2={r2:.4f}, MAPE={mape:.2f}%, DirAcc={directional_accuracy*100:.2f}%') + + # plot_prefix = f'all_features_gridsearch' + # plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix) + + # sys.exit(0) + # Prepare CSV for results - results_csv = '../data/leave_one_out_results.csv' + results_csv = '../data/cumulative_feature_results.csv' if not os.path.exists(results_csv): with open(results_csv, 'w', newline='') as f: writer = csv.writer(f) - writer.writerow(['left_out_feature', 'used_features', 'rmse', 'mae', 'r2', 'mape', 'directional_accuracy']) + writer.writerow(['num_features', 'added feature', 'rmse', 'mae', 'r2', 'mape', 'directional_accuracy', 'feature_importance']) - total_features = len(feature_cols) - never_leave_out = {'Open', 'High', 'Low', 'Close', 'Volume'} - for idx, left_out in enumerate(feature_cols): - if left_out in never_leave_out: - continue - used = [f for f in feature_cols if f != left_out] - print(f'\n=== Leave-one-out {idx+1}/{total_features}: left out {left_out} ===') - try: - # Prepare X and y for this combination - X = df[used].values.astype(np.float32) - y = df["log_return"].values.astype(np.float32) - split_idx = int(len(X) * 0.8) - X_train, X_test = X[:split_idx], X[split_idx:] - y_train, y_test = y[:split_idx], y[split_idx:] - test_timestamps = df['Timestamp'].values[split_idx:] + try: + X = df[feature_cols].values.astype(np.float32) + y = df["log_return"].values.astype(np.float32) + split_idx = int(len(X) * 0.8) + X_train, X_test = X[:split_idx], X[split_idx:] + y_train, y_test = y[:split_idx], y[split_idx:] + test_timestamps = df['Timestamp'].values[split_idx:] - model = CustomXGBoostGPU(X_train, X_test, y_train, y_test) - booster = model.train() - model.save_model(f'../data/xgboost_model_wo_{left_out}.json') + model = CustomXGBoostGPU(X_train, X_test, y_train, y_test) + booster = model.train( + colsample_bytree=1.0, + learning_rate=0.05, + max_depth=7, + n_estimators=200, + subsample=0.8 + ) + model.save_model(f'../data/xgboost_model_all_features.json') - test_preds = model.predict(X_test) - rmse = np.sqrt(mean_squared_error(y_test, test_preds)) + test_preds = model.predict(X_test) + rmse = np.sqrt(mean_squared_error(y_test, test_preds)) - # Reconstruct price series from log returns - if 'Close' in df.columns: - close_prices = df['Close'].values - else: - close_prices = pd.read_csv(csv_path)['Close'].values - start_price = close_prices[split_idx] - actual_prices = [start_price] - for r_ in y_test: - actual_prices.append(actual_prices[-1] * np.exp(r_)) - actual_prices = np.array(actual_prices[1:]) - predicted_prices = [start_price] - for r_ in test_preds: - predicted_prices.append(predicted_prices[-1] * np.exp(r_)) - predicted_prices = np.array(predicted_prices[1:]) + # Reconstruct price series from log returns + if 'Close' in df.columns: + close_prices = df['Close'].values + else: + close_prices = pd.read_csv(csv_path)['Close'].values + start_price = close_prices[split_idx] + actual_prices = [start_price] + for r_ in y_test: + actual_prices.append(actual_prices[-1] * np.exp(r_)) + actual_prices = np.array(actual_prices[1:]) + predicted_prices = [start_price] + for r_ in test_preds: + predicted_prices.append(predicted_prices[-1] * np.exp(r_)) + predicted_prices = np.array(predicted_prices[1:]) - mae = mean_absolute_error(actual_prices, predicted_prices) - r2 = r2_score(actual_prices, predicted_prices) - direction_actual = np.sign(np.diff(actual_prices)) - direction_pred = np.sign(np.diff(predicted_prices)) - directional_accuracy = (direction_actual == direction_pred).mean() - mape = np.mean(np.abs((actual_prices - predicted_prices) / actual_prices)) * 100 + mae = mean_absolute_error(actual_prices, predicted_prices) + r2 = r2_score(actual_prices, predicted_prices) + direction_actual = np.sign(np.diff(actual_prices)) + direction_pred = np.sign(np.diff(predicted_prices)) + directional_accuracy = (direction_actual == direction_pred).mean() + mape = np.mean(np.abs((actual_prices - predicted_prices) / actual_prices)) * 100 - # Save results to CSV - with open(results_csv, 'a', newline='') as f: - writer = csv.writer(f) - writer.writerow([left_out, "|".join(used), rmse, mae, r2, mape, directional_accuracy]) - print(f'Left out {left_out}: RMSE={rmse:.4f}, MAE={mae:.4f}, R2={r2:.4f}, MAPE={mape:.2f}%, DirAcc={directional_accuracy*100:.2f}%') + # Save results to CSV for all features used in this run + feature_importance_dict = model.get_feature_importance(feature_cols) + with open(results_csv, 'a', newline='') as f: + writer = csv.writer(f) + for feature in feature_cols: + importance = feature_importance_dict.get(feature, 0.0) + fi_str = format(importance, ".6f") + row = [feature] + for val in [rmse, mae, r2, mape, directional_accuracy]: + if isinstance(val, float): + row.append(format(val, '.10f')) + else: + row.append(val) + row.append(fi_str) + writer.writerow(row) + print('Feature importances and results saved for all features used in this run.') - # Plotting for this run - plot_prefix = f'loo_{left_out}' - print('Plotting distribution of absolute prediction errors...') - plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix) + # Plotting for this run + # plot_prefix = f'cumulative_{n}_features' + # plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix) + # plot_direction_transition_heatmap(actual_prices, predicted_prices, prefix=plot_prefix) + except Exception as e: + print(f'Cumulative feature run failed: {e}') + print(f'All cumulative feature runs completed. Results saved to {results_csv}') + + plot_prefix = f'all_features' + plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix) - print('Plotting directional accuracy...') - plot_direction_transition_heatmap(actual_prices, predicted_prices, prefix=plot_prefix) - except Exception as e: - print(f'Leave-one-out failed for {left_out}: {e}') - print(f'All leave-one-out runs completed. Results saved to {results_csv}') sys.exit(0) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 860ce36..93855bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ dependencies = [ "dash>=3.0.4", "numba>=0.61.2", "pandas>=2.2.3", + "pandas-ta>=0.3.14b0", "scikit-learn>=1.6.1", "ta>=0.11.0", "xgboost>=3.0.2", diff --git a/technical_indicator_functions.py b/technical_indicator_functions.py new file mode 100644 index 0000000..061dba1 --- /dev/null +++ b/technical_indicator_functions.py @@ -0,0 +1,251 @@ +from numba import njit +import pandas as pd +import numpy as np + +def calc_rsi(close): + from ta.momentum import RSIIndicator + return ('rsi', RSIIndicator(close, window=14).rsi()) + +def calc_macd(close): + from ta.trend import MACD + return ('macd', MACD(close).macd()) + +def calc_bollinger(close): + from ta.volatility import BollingerBands + bb = BollingerBands(close=close, window=20, window_dev=2) + return [ + ('bb_bbm', bb.bollinger_mavg()), + ('bb_bbh', bb.bollinger_hband()), + ('bb_bbl', bb.bollinger_lband()), + ('bb_bb_width', bb.bollinger_hband() - bb.bollinger_lband()) + ] + +def calc_stochastic(high, low, close): + from ta.momentum import StochasticOscillator + stoch = StochasticOscillator(high=high, low=low, close=close, window=14, smooth_window=3) + return [ + ('stoch_k', stoch.stoch()), + ('stoch_d', stoch.stoch_signal()) + ] + +def calc_atr(high, low, close): + from ta.volatility import AverageTrueRange + atr = AverageTrueRange(high=high, low=low, close=close, window=14) + return ('atr', atr.average_true_range()) + +def calc_cci(high, low, close): + from ta.trend import CCIIndicator + cci = CCIIndicator(high=high, low=low, close=close, window=20) + return ('cci', cci.cci()) + +def calc_williamsr(high, low, close): + from ta.momentum import WilliamsRIndicator + willr = WilliamsRIndicator(high=high, low=low, close=close, lbp=14) + return ('williams_r', willr.williams_r()) + +def calc_ema(close): + from ta.trend import EMAIndicator + ema = EMAIndicator(close=close, window=14) + return ('ema_14', ema.ema_indicator()) + +def calc_obv(close, volume): + from ta.volume import OnBalanceVolumeIndicator + obv = OnBalanceVolumeIndicator(close=close, volume=volume) + return ('obv', obv.on_balance_volume()) + +def calc_cmf(high, low, close, volume): + from ta.volume import ChaikinMoneyFlowIndicator + cmf = ChaikinMoneyFlowIndicator(high=high, low=low, close=close, volume=volume, window=20) + return ('cmf', cmf.chaikin_money_flow()) + +def calc_sma(close): + from ta.trend import SMAIndicator + return [ + ('sma_50', SMAIndicator(close, window=50).sma_indicator()), + ('sma_200', SMAIndicator(close, window=200).sma_indicator()) + ] + +def calc_roc(close): + from ta.momentum import ROCIndicator + return ('roc_10', ROCIndicator(close, window=10).roc()) + +def calc_momentum(close): + return ('momentum_10', close - close.shift(10)) + +def calc_psar(high, low, close): + # Use the Numba-accelerated fast_psar function for speed + psar_values = fast_psar(np.array(high), np.array(low), np.array(close)) + return [('psar', pd.Series(psar_values, index=close.index))] + +def calc_donchian(high, low, close): + from ta.volatility import DonchianChannel + donchian = DonchianChannel(high, low, close, window=20) + return [ + ('donchian_hband', donchian.donchian_channel_hband()), + ('donchian_lband', donchian.donchian_channel_lband()), + ('donchian_mband', donchian.donchian_channel_mband()) + ] + +def calc_keltner(high, low, close): + from ta.volatility import KeltnerChannel + keltner = KeltnerChannel(high, low, close, window=20) + return [ + ('keltner_hband', keltner.keltner_channel_hband()), + ('keltner_lband', keltner.keltner_channel_lband()), + ('keltner_mband', keltner.keltner_channel_mband()) + ] + +def calc_dpo(close): + from ta.trend import DPOIndicator + return ('dpo_20', DPOIndicator(close, window=20).dpo()) + +def calc_ultimate(high, low, close): + from ta.momentum import UltimateOscillator + return ('ultimate_osc', UltimateOscillator(high, low, close).ultimate_oscillator()) + +def calc_ichimoku(high, low): + from ta.trend import IchimokuIndicator + ichimoku = IchimokuIndicator(high, low, window1=9, window2=26, window3=52) + return [ + ('ichimoku_a', ichimoku.ichimoku_a()), + ('ichimoku_b', ichimoku.ichimoku_b()), + ('ichimoku_base_line', ichimoku.ichimoku_base_line()), + ('ichimoku_conversion_line', ichimoku.ichimoku_conversion_line()) + ] + +def calc_elder_ray(close, low, high): + from ta.trend import EMAIndicator + ema = EMAIndicator(close, window=13).ema_indicator() + return [ + ('elder_ray_bull', ema - low), + ('elder_ray_bear', ema - high) + ] + +def calc_daily_return(close): + from ta.others import DailyReturnIndicator + return ('daily_return', DailyReturnIndicator(close).daily_return()) + +@njit +def fast_psar(high, low, close, af=0.02, max_af=0.2): + length = len(close) + psar = np.zeros(length) + bull = True + af_step = af + ep = low[0] + psar[0] = low[0] + for i in range(1, length): + prev_psar = psar[i-1] + if bull: + psar[i] = prev_psar + af_step * (ep - prev_psar) + if low[i] < psar[i]: + bull = False + psar[i] = ep + af_step = af + ep = low[i] + else: + if high[i] > ep: + ep = high[i] + af_step = min(af_step + af, max_af) + else: + psar[i] = prev_psar + af_step * (ep - prev_psar) + if high[i] > psar[i]: + bull = True + psar[i] = ep + af_step = af + ep = high[i] + else: + if low[i] < ep: + ep = low[i] + af_step = min(af_step + af, max_af) + return psar + +def compute_lag(df, col, lag): + return df[col].shift(lag) + +def compute_rolling(df, col, stat, window): + if stat == 'mean': + return df[col].rolling(window).mean() + elif stat == 'std': + return df[col].rolling(window).std() + elif stat == 'min': + return df[col].rolling(window).min() + elif stat == 'max': + return df[col].rolling(window).max() + +def compute_log_return(df, horizon): + return np.log(df['Close'] / df['Close'].shift(horizon)) + +def compute_volatility(df, window): + return df['log_return'].rolling(window).std() + +def run_feature_job(job, df): + feature_name, func, *args = job + print(f'Computing feature: {feature_name}') + result = func(df, *args) + return feature_name, result + +def calc_adx(high, low, close): + from ta.trend import ADXIndicator + adx = ADXIndicator(high=high, low=low, close=close, window=14) + return [ + ('adx', adx.adx()), + ('adx_pos', adx.adx_pos()), + ('adx_neg', adx.adx_neg()) + ] + +def calc_trix(close): + from ta.trend import TRIXIndicator + trix = TRIXIndicator(close=close, window=15) + return ('trix', trix.trix()) + +def calc_vortex(high, low, close): + from ta.trend import VortexIndicator + vortex = VortexIndicator(high=high, low=low, close=close, window=14) + return [ + ('vortex_pos', vortex.vortex_indicator_pos()), + ('vortex_neg', vortex.vortex_indicator_neg()) + ] + +def calc_kama(close): + import pandas_ta as ta + kama = ta.kama(close, length=10) + return ('kama', kama) + +def calc_force_index(close, volume): + from ta.volume import ForceIndexIndicator + fi = ForceIndexIndicator(close=close, volume=volume, window=13) + return ('force_index', fi.force_index()) + +def calc_eom(high, low, volume): + from ta.volume import EaseOfMovementIndicator + eom = EaseOfMovementIndicator(high=high, low=low, volume=volume, window=14) + return ('eom', eom.ease_of_movement()) + +def calc_mfi(high, low, close, volume): + from ta.volume import MFIIndicator + mfi = MFIIndicator(high=high, low=low, close=close, volume=volume, window=14) + return ('mfi', mfi.money_flow_index()) + +def calc_adi(high, low, close, volume): + from ta.volume import AccDistIndexIndicator + adi = AccDistIndexIndicator(high=high, low=low, close=close, volume=volume) + return ('adi', adi.acc_dist_index()) + +def calc_tema(close): + import pandas_ta as ta + tema = ta.tema(close, length=10) + return ('tema', tema) + +def calc_stochrsi(close): + from ta.momentum import StochRSIIndicator + stochrsi = StochRSIIndicator(close=close, window=14, smooth1=3, smooth2=3) + return [ + ('stochrsi', stochrsi.stochrsi()), + ('stochrsi_k', stochrsi.stochrsi_k()), + ('stochrsi_d', stochrsi.stochrsi_d()) + ] + +def calc_awesome_oscillator(high, low): + from ta.momentum import AwesomeOscillatorIndicator + ao = AwesomeOscillatorIndicator(high=high, low=low, window1=5, window2=34) + return ('awesome_osc', ao.awesome_oscillator()) \ No newline at end of file diff --git a/uv.lock b/uv.lock index 8d25bef..d071312 100644 --- a/uv.lock +++ b/uv.lock @@ -314,6 +314,7 @@ dependencies = [ { name = "dash" }, { name = "numba" }, { name = "pandas" }, + { name = "pandas-ta" }, { name = "scikit-learn" }, { name = "ta" }, { name = "xgboost" }, @@ -324,6 +325,7 @@ requires-dist = [ { name = "dash", specifier = ">=3.0.4" }, { name = "numba", specifier = ">=0.61.2" }, { name = "pandas", specifier = ">=2.2.3" }, + { name = "pandas-ta", specifier = ">=0.3.14b0" }, { name = "scikit-learn", specifier = ">=1.6.1" }, { name = "ta", specifier = ">=0.11.0" }, { name = "xgboost", specifier = ">=3.0.2" }, @@ -372,6 +374,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ab/5f/b38085618b950b79d2d9164a711c52b10aefc0ae6833b96f626b7021b2ed/pandas-2.2.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:ad5b65698ab28ed8d7f18790a0dc58005c7629f227be9ecc1072aa74c0c1d43a", size = 13098436, upload-time = "2024-09-20T13:09:48.112Z" }, ] +[[package]] +name = "pandas-ta" +version = "0.3.14b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pandas" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f7/0b/1666f0a185d4f08215f53cc088122a73c92421447b04028f0464fabe1ce6/pandas_ta-0.3.14b.tar.gz", hash = "sha256:0fa35aec831d2815ea30b871688a8d20a76b288a7be2d26cc00c35cd8c09a993", size = 115089, upload-time = "2021-07-28T20:51:17.456Z" } + [[package]] name = "plotly" version = "6.1.2"