Did feature selection, moved from RMSE to MAE for accuracy testing (maybe not a good idea after all), fine tuned hyperparameters a bit (need to do more)

This commit is contained in:
Simon Moisy 2025-06-03 15:40:43 +08:00
parent 8d3f045a92
commit 0bbb0e52af
6 changed files with 848 additions and 709 deletions

View File

@ -15,8 +15,8 @@ class CustomXGBoostGPU:
'tree_method': 'hist',
'device': 'cuda',
'objective': 'reg:squarederror',
'eval_metric': 'rmse',
'verbosity': 1,
'eval_metric': 'mae',
'verbosity': 0,
}
params.update(xgb_params)
self.params = params # Store params for later access
@ -37,3 +37,12 @@ class CustomXGBoostGPU:
if self.model is None:
raise ValueError('Model not trained yet.')
self.model.save_model(file_path)
def get_feature_importance(self, feature_names):
if self.model is None:
raise ValueError('Model not trained yet.')
# get_score returns a dict with keys like 'f0', 'f1', ...
score_dict = self.model.get_score(importance_type='weight')
# Map to feature names
importances = [score_dict.get(f'f{i}', 0.0) for i in range(len(feature_names))]
return dict(zip(feature_names, importances))

401
feature_engineering.py Normal file
View File

@ -0,0 +1,401 @@
import os
import numpy as np
import pandas as pd
import ta
from technical_indicator_functions import *
def feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes):
feature_file = f'../data/{csv_prefix}_rsi.npy'
features_dict = {}
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['rsi'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: rsi')
_, values = calc_rsi(df['Close'])
features_dict['rsi'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# MACD
feature_file = f'../data/{csv_prefix}_macd.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['macd'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: macd')
_, values = calc_macd(df['Close'])
features_dict['macd'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# ATR
feature_file = f'../data/{csv_prefix}_atr.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['atr'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: atr')
_, values = calc_atr(df['High'], df['Low'], df['Close'])
features_dict['atr'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# CCI
feature_file = f'../data/{csv_prefix}_cci.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['cci'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: cci')
_, values = calc_cci(df['High'], df['Low'], df['Close'])
features_dict['cci'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Williams %R
feature_file = f'../data/{csv_prefix}_williams_r.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['williams_r'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: williams_r')
_, values = calc_williamsr(df['High'], df['Low'], df['Close'])
features_dict['williams_r'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# EMA 14
feature_file = f'../data/{csv_prefix}_ema_14.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['ema_14'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: ema_14')
_, values = calc_ema(df['Close'])
features_dict['ema_14'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# OBV
feature_file = f'../data/{csv_prefix}_obv.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['obv'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: obv')
_, values = calc_obv(df['Close'], df['Volume'])
features_dict['obv'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# CMF
feature_file = f'../data/{csv_prefix}_cmf.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['cmf'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: cmf')
_, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume'])
features_dict['cmf'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# ROC 10
feature_file = f'../data/{csv_prefix}_roc_10.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['roc_10'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: roc_10')
_, values = calc_roc(df['Close'])
features_dict['roc_10'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# DPO 20
feature_file = f'../data/{csv_prefix}_dpo_20.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['dpo_20'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: dpo_20')
_, values = calc_dpo(df['Close'])
features_dict['dpo_20'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Ultimate Oscillator
feature_file = f'../data/{csv_prefix}_ultimate_osc.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['ultimate_osc'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: ultimate_osc')
_, values = calc_ultimate(df['High'], df['Low'], df['Close'])
features_dict['ultimate_osc'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Daily Return
feature_file = f'../data/{csv_prefix}_daily_return.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['daily_return'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: daily_return')
_, values = calc_daily_return(df['Close'])
features_dict['daily_return'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Multi-column indicators
# Bollinger Bands
print('Calculating multi-column indicator: bollinger')
result = calc_bollinger(df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Stochastic Oscillator
print('Calculating multi-column indicator: stochastic')
result = calc_stochastic(df['High'], df['Low'], df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# SMA
print('Calculating multi-column indicator: sma')
result = calc_sma(df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# PSAR
print('Calculating multi-column indicator: psar')
result = calc_psar(df['High'], df['Low'], df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Donchian Channel
print('Calculating multi-column indicator: donchian')
result = calc_donchian(df['High'], df['Low'], df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Keltner Channel
print('Calculating multi-column indicator: keltner')
result = calc_keltner(df['High'], df['Low'], df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Ichimoku
print('Calculating multi-column indicator: ichimoku')
result = calc_ichimoku(df['High'], df['Low'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Elder Ray
print('Calculating multi-column indicator: elder_ray')
result = calc_elder_ray(df['Close'], df['Low'], df['High'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Prepare lags, rolling stats, log returns, and volatility features sequentially
# Lags
for col in ohlcv_cols:
for lag in range(1, lags + 1):
feature_name = f'{col}_lag{lag}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'C Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
print(f'Computing lag feature: {feature_name}')
result = compute_lag(df, col, lag)
features_dict[feature_name] = result
np.save(feature_file, result.values)
print(f'Saved feature: {feature_file}')
# Rolling statistics
for col in ohlcv_cols:
for window in window_sizes:
if (col == 'Open' and window == 5):
continue
if (col == 'High' and window == 5):
continue
if (col == 'High' and window == 30):
continue
if (col == 'Low' and window == 15):
continue
for stat in ['mean', 'std', 'min', 'max']:
feature_name = f'{col}_roll_{stat}_{window}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'D Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
print(f'Computing rolling stat feature: {feature_name}')
result = compute_rolling(df, col, stat, window)
features_dict[feature_name] = result
np.save(feature_file, result.values)
print(f'Saved feature: {feature_file}')
# Log returns for different horizons
for horizon in [5, 15, 30]:
feature_name = f'log_return_{horizon}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'E Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
print(f'Computing log return feature: {feature_name}')
result = compute_log_return(df, horizon)
features_dict[feature_name] = result
np.save(feature_file, result.values)
print(f'Saved feature: {feature_file}')
# Volatility
for window in window_sizes:
feature_name = f'volatility_{window}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'F Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
print(f'Computing volatility feature: {feature_name}')
result = compute_volatility(df, window)
features_dict[feature_name] = result
np.save(feature_file, result.values)
print(f'Saved feature: {feature_file}')
# --- Additional Technical Indicator Features ---
# ADX
adx_names = ['adx', 'adx_pos', 'adx_neg']
adx_files = [f'../data/{csv_prefix}_{name}.npy' for name in adx_names]
if all(os.path.exists(f) for f in adx_files):
print('G Loading cached features: ADX')
for name, f in zip(adx_names, adx_files):
arr = np.load(f)
features_dict[name] = pd.Series(arr, index=df.index)
else:
print('Calculating multi-column indicator: adx')
result = calc_adx(df['High'], df['Low'], df['Close'])
for subname, values in result:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Force Index
feature_file = f'../data/{csv_prefix}_force_index.npy'
if os.path.exists(feature_file):
print(f'K Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['force_index'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: force_index')
_, values = calc_force_index(df['Close'], df['Volume'])
features_dict['force_index'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Supertrend indicators
for period, multiplier in [(12, 3.0), (10, 1.0), (11, 2.0)]:
st_name = f'supertrend_{period}_{multiplier}'
st_trend_name = f'supertrend_trend_{period}_{multiplier}'
st_file = f'../data/{csv_prefix}_{st_name}.npy'
st_trend_file = f'../data/{csv_prefix}_{st_trend_name}.npy'
if os.path.exists(st_file) and os.path.exists(st_trend_file):
print(f'L Loading cached features: {st_file}, {st_trend_file}')
features_dict[st_name] = pd.Series(np.load(st_file), index=df.index)
features_dict[st_trend_name] = pd.Series(np.load(st_trend_file), index=df.index)
else:
print(f'Calculating Supertrend indicator: {st_name}')
st = ta.supertrend(df['High'], df['Low'], df['Close'], length=period, multiplier=multiplier)
features_dict[st_name] = st[f'SUPERT_{period}_{multiplier}']
features_dict[st_trend_name] = st[f'SUPERTd_{period}_{multiplier}']
np.save(st_file, features_dict[st_name].values)
np.save(st_trend_file, features_dict[st_trend_name].values)
print(f'Saved features: {st_file}, {st_trend_file}')
return features_dict

880
main.py
View File

@ -9,7 +9,13 @@ from plot_results import plot_prediction_error_distribution, plot_direction_tran
import time
from numba import njit
import csv
import ta
import pandas_ta as ta
from feature_engineering import feature_engineering
from sklearn.feature_selection import VarianceThreshold
charts_dir = 'charts'
if not os.path.exists(charts_dir):
os.makedirs(charts_dir)
def run_indicator(func, *args):
return func(*args)
@ -24,254 +30,6 @@ def run_indicator_job(job):
print(f'Indicator {indicator_name} computed in {elapsed:.4f} seconds')
return result
def calc_rsi(close):
from ta.momentum import RSIIndicator
return ('rsi', RSIIndicator(close, window=14).rsi())
def calc_macd(close):
from ta.trend import MACD
return ('macd', MACD(close).macd())
def calc_bollinger(close):
from ta.volatility import BollingerBands
bb = BollingerBands(close=close, window=20, window_dev=2)
return [
('bb_bbm', bb.bollinger_mavg()),
('bb_bbh', bb.bollinger_hband()),
('bb_bbl', bb.bollinger_lband()),
('bb_bb_width', bb.bollinger_hband() - bb.bollinger_lband())
]
def calc_stochastic(high, low, close):
from ta.momentum import StochasticOscillator
stoch = StochasticOscillator(high=high, low=low, close=close, window=14, smooth_window=3)
return [
('stoch_k', stoch.stoch()),
('stoch_d', stoch.stoch_signal())
]
def calc_atr(high, low, close):
from ta.volatility import AverageTrueRange
atr = AverageTrueRange(high=high, low=low, close=close, window=14)
return ('atr', atr.average_true_range())
def calc_cci(high, low, close):
from ta.trend import CCIIndicator
cci = CCIIndicator(high=high, low=low, close=close, window=20)
return ('cci', cci.cci())
def calc_williamsr(high, low, close):
from ta.momentum import WilliamsRIndicator
willr = WilliamsRIndicator(high=high, low=low, close=close, lbp=14)
return ('williams_r', willr.williams_r())
def calc_ema(close):
from ta.trend import EMAIndicator
ema = EMAIndicator(close=close, window=14)
return ('ema_14', ema.ema_indicator())
def calc_obv(close, volume):
from ta.volume import OnBalanceVolumeIndicator
obv = OnBalanceVolumeIndicator(close=close, volume=volume)
return ('obv', obv.on_balance_volume())
def calc_cmf(high, low, close, volume):
from ta.volume import ChaikinMoneyFlowIndicator
cmf = ChaikinMoneyFlowIndicator(high=high, low=low, close=close, volume=volume, window=20)
return ('cmf', cmf.chaikin_money_flow())
def calc_sma(close):
from ta.trend import SMAIndicator
return [
('sma_50', SMAIndicator(close, window=50).sma_indicator()),
('sma_200', SMAIndicator(close, window=200).sma_indicator())
]
def calc_roc(close):
from ta.momentum import ROCIndicator
return ('roc_10', ROCIndicator(close, window=10).roc())
def calc_momentum(close):
return ('momentum_10', close - close.shift(10))
def calc_psar(high, low, close):
# Use the Numba-accelerated fast_psar function for speed
psar_values = fast_psar(np.array(high), np.array(low), np.array(close))
return [('psar', pd.Series(psar_values, index=close.index))]
def calc_donchian(high, low, close):
from ta.volatility import DonchianChannel
donchian = DonchianChannel(high, low, close, window=20)
return [
('donchian_hband', donchian.donchian_channel_hband()),
('donchian_lband', donchian.donchian_channel_lband()),
('donchian_mband', donchian.donchian_channel_mband())
]
def calc_keltner(high, low, close):
from ta.volatility import KeltnerChannel
keltner = KeltnerChannel(high, low, close, window=20)
return [
('keltner_hband', keltner.keltner_channel_hband()),
('keltner_lband', keltner.keltner_channel_lband()),
('keltner_mband', keltner.keltner_channel_mband())
]
def calc_dpo(close):
from ta.trend import DPOIndicator
return ('dpo_20', DPOIndicator(close, window=20).dpo())
def calc_ultimate(high, low, close):
from ta.momentum import UltimateOscillator
return ('ultimate_osc', UltimateOscillator(high, low, close).ultimate_oscillator())
def calc_ichimoku(high, low):
from ta.trend import IchimokuIndicator
ichimoku = IchimokuIndicator(high, low, window1=9, window2=26, window3=52)
return [
('ichimoku_a', ichimoku.ichimoku_a()),
('ichimoku_b', ichimoku.ichimoku_b()),
('ichimoku_base_line', ichimoku.ichimoku_base_line()),
('ichimoku_conversion_line', ichimoku.ichimoku_conversion_line())
]
def calc_elder_ray(close, low, high):
from ta.trend import EMAIndicator
ema = EMAIndicator(close, window=13).ema_indicator()
return [
('elder_ray_bull', ema - low),
('elder_ray_bear', ema - high)
]
def calc_daily_return(close):
from ta.others import DailyReturnIndicator
return ('daily_return', DailyReturnIndicator(close).daily_return())
@njit
def fast_psar(high, low, close, af=0.02, max_af=0.2):
length = len(close)
psar = np.zeros(length)
bull = True
af_step = af
ep = low[0]
psar[0] = low[0]
for i in range(1, length):
prev_psar = psar[i-1]
if bull:
psar[i] = prev_psar + af_step * (ep - prev_psar)
if low[i] < psar[i]:
bull = False
psar[i] = ep
af_step = af
ep = low[i]
else:
if high[i] > ep:
ep = high[i]
af_step = min(af_step + af, max_af)
else:
psar[i] = prev_psar + af_step * (ep - prev_psar)
if high[i] > psar[i]:
bull = True
psar[i] = ep
af_step = af
ep = high[i]
else:
if low[i] < ep:
ep = low[i]
af_step = min(af_step + af, max_af)
return psar
def compute_lag(df, col, lag):
return df[col].shift(lag)
def compute_rolling(df, col, stat, window):
if stat == 'mean':
return df[col].rolling(window).mean()
elif stat == 'std':
return df[col].rolling(window).std()
elif stat == 'min':
return df[col].rolling(window).min()
elif stat == 'max':
return df[col].rolling(window).max()
def compute_log_return(df, horizon):
return np.log(df['Close'] / df['Close'].shift(horizon))
def compute_volatility(df, window):
return df['log_return'].rolling(window).std()
def run_feature_job(job, df):
feature_name, func, *args = job
print(f'Computing feature: {feature_name}')
result = func(df, *args)
return feature_name, result
def calc_adx(high, low, close):
from ta.trend import ADXIndicator
adx = ADXIndicator(high=high, low=low, close=close, window=14)
return [
('adx', adx.adx()),
('adx_pos', adx.adx_pos()),
('adx_neg', adx.adx_neg())
]
def calc_trix(close):
from ta.trend import TRIXIndicator
trix = TRIXIndicator(close=close, window=15)
return ('trix', trix.trix())
def calc_vortex(high, low, close):
from ta.trend import VortexIndicator
vortex = VortexIndicator(high=high, low=low, close=close, window=14)
return [
('vortex_pos', vortex.vortex_indicator_pos()),
('vortex_neg', vortex.vortex_indicator_neg())
]
def calc_kama(close):
import pandas_ta as ta
kama = ta.kama(close, length=10)
return ('kama', kama)
def calc_force_index(close, volume):
from ta.volume import ForceIndexIndicator
fi = ForceIndexIndicator(close=close, volume=volume, window=13)
return ('force_index', fi.force_index())
def calc_eom(high, low, volume):
from ta.volume import EaseOfMovementIndicator
eom = EaseOfMovementIndicator(high=high, low=low, volume=volume, window=14)
return ('eom', eom.ease_of_movement())
def calc_mfi(high, low, close, volume):
from ta.volume import MFIIndicator
mfi = MFIIndicator(high=high, low=low, close=close, volume=volume, window=14)
return ('mfi', mfi.money_flow_index())
def calc_adi(high, low, close, volume):
from ta.volume import AccDistIndexIndicator
adi = AccDistIndexIndicator(high=high, low=low, close=close, volume=volume)
return ('adi', adi.acc_dist_index())
def calc_tema(close):
import pandas_ta as ta
tema = ta.tema(close, length=10)
return ('tema', tema)
def calc_stochrsi(close):
from ta.momentum import StochRSIIndicator
stochrsi = StochRSIIndicator(close=close, window=14, smooth1=3, smooth2=3)
return [
('stochrsi', stochrsi.stochrsi()),
('stochrsi_k', stochrsi.stochrsi_k()),
('stochrsi_d', stochrsi.stochrsi_d())
]
def calc_awesome_oscillator(high, low):
from ta.momentum import AwesomeOscillatorIndicator
ao = AwesomeOscillatorIndicator(high=high, low=low, window1=5, window2=34)
return ('awesome_osc', ao.awesome_oscillator())
if __name__ == '__main__':
IMPUTE_NANS = True # Set to True to impute NaNs, False to drop rows with NaNs
csv_path = '../data/btcusd_1-min_data.csv'
@ -298,412 +56,38 @@ if __name__ == '__main__':
print('Starting feature computation...')
feature_start_time = time.time()
# --- Technical Indicator Features: Calculate or Load from Cache ---
print('Calculating or loading technical indicator features...')
# RSI
feature_file = f'../data/{csv_prefix}_rsi.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['rsi'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: rsi')
_, values = calc_rsi(df['Close'])
features_dict['rsi'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# MACD
feature_file = f'../data/{csv_prefix}_macd.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['macd'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: macd')
_, values = calc_macd(df['Close'])
features_dict['macd'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# ATR
feature_file = f'../data/{csv_prefix}_atr.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['atr'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: atr')
_, values = calc_atr(df['High'], df['Low'], df['Close'])
features_dict['atr'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# CCI
feature_file = f'../data/{csv_prefix}_cci.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['cci'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: cci')
_, values = calc_cci(df['High'], df['Low'], df['Close'])
features_dict['cci'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Williams %R
feature_file = f'../data/{csv_prefix}_williams_r.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['williams_r'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: williams_r')
_, values = calc_williamsr(df['High'], df['Low'], df['Close'])
features_dict['williams_r'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# EMA 14
feature_file = f'../data/{csv_prefix}_ema_14.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['ema_14'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: ema_14')
_, values = calc_ema(df['Close'])
features_dict['ema_14'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# OBV
feature_file = f'../data/{csv_prefix}_obv.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['obv'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: obv')
_, values = calc_obv(df['Close'], df['Volume'])
features_dict['obv'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# CMF
feature_file = f'../data/{csv_prefix}_cmf.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['cmf'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: cmf')
_, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume'])
features_dict['cmf'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# ROC 10
feature_file = f'../data/{csv_prefix}_roc_10.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['roc_10'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: roc_10')
_, values = calc_roc(df['Close'])
features_dict['roc_10'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# DPO 20
feature_file = f'../data/{csv_prefix}_dpo_20.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['dpo_20'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: dpo_20')
_, values = calc_dpo(df['Close'])
features_dict['dpo_20'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Ultimate Oscillator
feature_file = f'../data/{csv_prefix}_ultimate_osc.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['ultimate_osc'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: ultimate_osc')
_, values = calc_ultimate(df['High'], df['Low'], df['Close'])
features_dict['ultimate_osc'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Daily Return
feature_file = f'../data/{csv_prefix}_daily_return.npy'
if os.path.exists(feature_file):
print(f'A Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['daily_return'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: daily_return')
_, values = calc_daily_return(df['Close'])
features_dict['daily_return'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Multi-column indicators
# Bollinger Bands
print('Calculating multi-column indicator: bollinger')
result = calc_bollinger(df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Stochastic Oscillator
print('Calculating multi-column indicator: stochastic')
result = calc_stochastic(df['High'], df['Low'], df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# SMA
print('Calculating multi-column indicator: sma')
result = calc_sma(df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# PSAR
print('Calculating multi-column indicator: psar')
result = calc_psar(df['High'], df['Low'], df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Donchian Channel
print('Calculating multi-column indicator: donchian')
result = calc_donchian(df['High'], df['Low'], df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Keltner Channel
print('Calculating multi-column indicator: keltner')
result = calc_keltner(df['High'], df['Low'], df['Close'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Ichimoku
print('Calculating multi-column indicator: ichimoku')
result = calc_ichimoku(df['High'], df['Low'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Elder Ray
print('Calculating multi-column indicator: elder_ray')
result = calc_elder_ray(df['Close'], df['Low'], df['High'])
for subname, values in result:
print(f"Adding subfeature: {subname}")
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'B Loading cached feature: {sub_feature_file}')
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Prepare lags, rolling stats, log returns, and volatility features sequentially
# Lags
for col in ohlcv_cols:
for lag in range(1, lags + 1):
feature_name = f'{col}_lag{lag}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'C Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
print(f'Computing lag feature: {feature_name}')
result = compute_lag(df, col, lag)
features_dict[feature_name] = result
np.save(feature_file, result.values)
print(f'Saved feature: {feature_file}')
# Rolling statistics
for col in ohlcv_cols:
for window in window_sizes:
if (col == 'Open' and window == 5):
continue
if (col == 'High' and window == 5):
continue
if (col == 'High' and window == 30):
continue
if (col == 'Low' and window == 15):
continue
for stat in ['mean', 'std', 'min', 'max']:
feature_name = f'{col}_roll_{stat}_{window}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'D Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
print(f'Computing rolling stat feature: {feature_name}')
result = compute_rolling(df, col, stat, window)
features_dict[feature_name] = result
np.save(feature_file, result.values)
print(f'Saved feature: {feature_file}')
# Log returns for different horizons
for horizon in [5, 15, 30]:
feature_name = f'log_return_{horizon}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'E Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
print(f'Computing log return feature: {feature_name}')
result = compute_log_return(df, horizon)
features_dict[feature_name] = result
np.save(feature_file, result.values)
print(f'Saved feature: {feature_file}')
# Volatility
for window in window_sizes:
feature_name = f'volatility_{window}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'F Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
print(f'Computing volatility feature: {feature_name}')
result = compute_volatility(df, window)
features_dict[feature_name] = result
np.save(feature_file, result.values)
print(f'Saved feature: {feature_file}')
# --- Additional Technical Indicator Features ---
# ADX
adx_names = ['adx', 'adx_pos', 'adx_neg']
adx_files = [f'../data/{csv_prefix}_{name}.npy' for name in adx_names]
if all(os.path.exists(f) for f in adx_files):
print('G Loading cached features: ADX')
for name, f in zip(adx_names, adx_files):
arr = np.load(f)
features_dict[name] = pd.Series(arr, index=df.index)
else:
print('Calculating multi-column indicator: adx')
result = calc_adx(df['High'], df['Low'], df['Close'])
for subname, values in result:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Force Index
feature_file = f'../data/{csv_prefix}_force_index.npy'
if os.path.exists(feature_file):
print(f'K Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['force_index'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: force_index')
_, values = calc_force_index(df['Close'], df['Volume'])
features_dict['force_index'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Supertrend indicators
for period, multiplier in [(12, 3.0), (10, 1.0), (11, 2.0)]:
st_name = f'supertrend_{period}_{multiplier}'
st_trend_name = f'supertrend_trend_{period}_{multiplier}'
st_file = f'../data/{csv_prefix}_{st_name}.npy'
st_trend_file = f'../data/{csv_prefix}_{st_trend_name}.npy'
if os.path.exists(st_file) and os.path.exists(st_trend_file):
print(f'L Loading cached features: {st_file}, {st_trend_file}')
features_dict[st_name] = pd.Series(np.load(st_file), index=df.index)
features_dict[st_trend_name] = pd.Series(np.load(st_trend_file), index=df.index)
else:
print(f'Calculating Supertrend indicator: {st_name}')
st = ta.supertrend(df['High'], df['Low'], df['Close'], length=period, multiplier=multiplier)
features_dict[st_name] = st[f'SUPERT_{period}_{multiplier}']
features_dict[st_trend_name] = st[f'SUPERTd_{period}_{multiplier}']
np.save(st_file, features_dict[st_name].values)
np.save(st_trend_file, features_dict[st_trend_name].values)
print(f'Saved features: {st_file}, {st_trend_file}')
# Concatenate all new features at once
features_dict = feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes)
print('Concatenating all new features to DataFrame...')
features_df = pd.DataFrame(features_dict)
print("Columns in features_df:", features_df.columns.tolist())
print("All-NaN columns in features_df:", features_df.columns[features_df.isna().all()].tolist())
df = pd.concat([df, features_df], axis=1)
# Print all columns after concatenation
print("All columns in df after concat:", df.columns.tolist())
# feature_cols_for_variance = [col for col in features_df.columns if features_df[col].dtype in [np.float32, np.float64, float, int, np.int32, np.int64]]
# if feature_cols_for_variance:
# selector = VarianceThreshold(threshold=1e-5)
# filtered_features = selector.fit_transform(features_df[feature_cols_for_variance])
# kept_mask = selector.get_support()
# kept_feature_names = [col for col, keep in zip(feature_cols_for_variance, kept_mask) if keep]
# print(f"Features removed by low variance: {[col for col, keep in zip(feature_cols_for_variance, kept_mask) if not keep]}")
# # Only keep the selected features in features_df and df
# features_df = features_df[kept_feature_names]
# for col in feature_cols_for_variance:
# if col not in kept_feature_names:
# df.drop(col, axis=1, inplace=True)
# else:
# print("No numeric features found for variance thresholding.")
# Remove highly correlated features (keep only one from each correlated group)
# corr_matrix = features_df.corr().abs()
# upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
# to_drop = [column for column in upper.columns if any(upper[column] > 0.95)]
# if to_drop:
# print(f"Features removed due to high correlation: {to_drop}")
# features_df = features_df.drop(columns=to_drop)
# df = df.drop(columns=to_drop)
# else:
# print("No highly correlated features found for removal.")
# Downcast all float columns to save memory
print('Downcasting float columns to save memory...')
for col in df.columns:
try:
@ -729,76 +113,158 @@ if __name__ == '__main__':
# Exclude 'Timestamp', 'Close', 'log_return', and any future target columns from features
print('Selecting feature columns...')
exclude_cols = ['Timestamp', 'Close', 'log_return', 'log_return_5', 'log_return_15', 'log_return_30']
exclude_cols = ['Timestamp', 'Close']
exclude_cols += ['log_return_5', 'volatility_5', 'volatility_15', 'volatility_30']
exclude_cols += ['bb_bbm', 'bb_bbh', 'bb_bbl', 'stoch_k', 'sma_50', 'sma_200', 'psar',
'donchian_hband', 'donchian_lband', 'donchian_mband', 'keltner_hband', 'keltner_lband',
'keltner_mband', 'ichimoku_a', 'ichimoku_b', 'ichimoku_base_line', 'ichimoku_conversion_line',
'Open_lag1', 'Open_lag2', 'Open_lag3', 'High_lag1', 'High_lag2', 'High_lag3', 'Low_lag1', 'Low_lag2',
'Low_lag3', 'Close_lag1', 'Close_lag2', 'Close_lag3', 'Open_roll_mean_15', 'Open_roll_std_15', 'Open_roll_min_15',
'Open_roll_max_15', 'Open_roll_mean_30', 'Open_roll_min_30', 'Open_roll_max_30', 'High_roll_mean_15', 'High_roll_std_15',
'High_roll_min_15', 'High_roll_max_15', 'Low_roll_mean_5', 'Low_roll_min_5', 'Low_roll_max_5', 'Low_roll_mean_30',
'Low_roll_std_30', 'Low_roll_min_30', 'Low_roll_max_30', 'Close_roll_mean_5', 'Close_roll_min_5', 'Close_roll_max_5',
'Close_roll_mean_15', 'Close_roll_std_15', 'Close_roll_min_15', 'Close_roll_max_15', 'Close_roll_mean_30',
'Close_roll_std_30', 'Close_roll_min_30', 'Close_roll_max_30', 'Volume_roll_max_5', 'Volume_roll_max_15',
'Volume_roll_max_30', 'supertrend_12_3.0', 'supertrend_10_1.0', 'supertrend_11_2.0']
feature_cols = [col for col in df.columns if col not in exclude_cols]
print('Features used for training:', feature_cols)
# from xgboost import XGBRegressor
# from sklearn.model_selection import GridSearchCV
# # Prepare data for grid search
# X = df[feature_cols].values.astype(np.float32)
# y = df["log_return"].values.astype(np.float32)
# split_idx = int(len(X) * 0.8)
# X_train, X_test = X[:split_idx], X[split_idx:]
# y_train, y_test = y[:split_idx], y[split_idx:]
# # Define parameter grid
# param_grid = {
# 'learning_rate': [0.01, 0.05, 0.1],
# 'max_depth': [3, 5, 7],
# 'n_estimators': [100, 200],
# 'subsample': [0.8, 1.0],
# 'colsample_bytree': [0.8, 1.0],
# }
# print('Starting grid search for XGBoost hyperparameters...')
# xgb_model = XGBRegressor(objective='reg:squarederror', tree_method='hist', device='cuda', eval_metric='mae', verbosity=0)
# grid_search = GridSearchCV(xgb_model, param_grid, cv=3, scoring='neg_mean_absolute_error', verbose=2, n_jobs=-1)
# grid_search.fit(X_train, y_train)
# print('Best parameters found:', grid_search.best_params_)
# # Use best estimator for predictions
# best_model = grid_search.best_estimator_
# test_preds = best_model.predict(X_test)
# rmse = np.sqrt(mean_squared_error(y_test, test_preds))
# # Reconstruct price series from log returns
# if 'Close' in df.columns:
# close_prices = df['Close'].values
# else:
# close_prices = pd.read_csv(csv_path)['Close'].values
# start_price = close_prices[split_idx]
# actual_prices = [start_price]
# for r_ in y_test:
# actual_prices.append(actual_prices[-1] * np.exp(r_))
# actual_prices = np.array(actual_prices[1:])
# predicted_prices = [start_price]
# for r_ in test_preds:
# predicted_prices.append(predicted_prices[-1] * np.exp(r_))
# predicted_prices = np.array(predicted_prices[1:])
# mae = mean_absolute_error(actual_prices, predicted_prices)
# r2 = r2_score(actual_prices, predicted_prices)
# direction_actual = np.sign(np.diff(actual_prices))
# direction_pred = np.sign(np.diff(predicted_prices))
# directional_accuracy = (direction_actual == direction_pred).mean()
# mape = np.mean(np.abs((actual_prices - predicted_prices) / actual_prices)) * 100
# print(f'Grid search results: RMSE={rmse:.4f}, MAE={mae:.4f}, R2={r2:.4f}, MAPE={mape:.2f}%, DirAcc={directional_accuracy*100:.2f}%')
# plot_prefix = f'all_features_gridsearch'
# plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix)
# sys.exit(0)
# Prepare CSV for results
results_csv = '../data/leave_one_out_results.csv'
results_csv = '../data/cumulative_feature_results.csv'
if not os.path.exists(results_csv):
with open(results_csv, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['left_out_feature', 'used_features', 'rmse', 'mae', 'r2', 'mape', 'directional_accuracy'])
writer.writerow(['num_features', 'added feature', 'rmse', 'mae', 'r2', 'mape', 'directional_accuracy', 'feature_importance'])
total_features = len(feature_cols)
never_leave_out = {'Open', 'High', 'Low', 'Close', 'Volume'}
for idx, left_out in enumerate(feature_cols):
if left_out in never_leave_out:
continue
used = [f for f in feature_cols if f != left_out]
print(f'\n=== Leave-one-out {idx+1}/{total_features}: left out {left_out} ===')
try:
# Prepare X and y for this combination
X = df[used].values.astype(np.float32)
y = df["log_return"].values.astype(np.float32)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
test_timestamps = df['Timestamp'].values[split_idx:]
try:
X = df[feature_cols].values.astype(np.float32)
y = df["log_return"].values.astype(np.float32)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
test_timestamps = df['Timestamp'].values[split_idx:]
model = CustomXGBoostGPU(X_train, X_test, y_train, y_test)
booster = model.train()
model.save_model(f'../data/xgboost_model_wo_{left_out}.json')
model = CustomXGBoostGPU(X_train, X_test, y_train, y_test)
booster = model.train(
colsample_bytree=1.0,
learning_rate=0.05,
max_depth=7,
n_estimators=200,
subsample=0.8
)
model.save_model(f'../data/xgboost_model_all_features.json')
test_preds = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, test_preds))
test_preds = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, test_preds))
# Reconstruct price series from log returns
if 'Close' in df.columns:
close_prices = df['Close'].values
else:
close_prices = pd.read_csv(csv_path)['Close'].values
start_price = close_prices[split_idx]
actual_prices = [start_price]
for r_ in y_test:
actual_prices.append(actual_prices[-1] * np.exp(r_))
actual_prices = np.array(actual_prices[1:])
predicted_prices = [start_price]
for r_ in test_preds:
predicted_prices.append(predicted_prices[-1] * np.exp(r_))
predicted_prices = np.array(predicted_prices[1:])
# Reconstruct price series from log returns
if 'Close' in df.columns:
close_prices = df['Close'].values
else:
close_prices = pd.read_csv(csv_path)['Close'].values
start_price = close_prices[split_idx]
actual_prices = [start_price]
for r_ in y_test:
actual_prices.append(actual_prices[-1] * np.exp(r_))
actual_prices = np.array(actual_prices[1:])
predicted_prices = [start_price]
for r_ in test_preds:
predicted_prices.append(predicted_prices[-1] * np.exp(r_))
predicted_prices = np.array(predicted_prices[1:])
mae = mean_absolute_error(actual_prices, predicted_prices)
r2 = r2_score(actual_prices, predicted_prices)
direction_actual = np.sign(np.diff(actual_prices))
direction_pred = np.sign(np.diff(predicted_prices))
directional_accuracy = (direction_actual == direction_pred).mean()
mape = np.mean(np.abs((actual_prices - predicted_prices) / actual_prices)) * 100
mae = mean_absolute_error(actual_prices, predicted_prices)
r2 = r2_score(actual_prices, predicted_prices)
direction_actual = np.sign(np.diff(actual_prices))
direction_pred = np.sign(np.diff(predicted_prices))
directional_accuracy = (direction_actual == direction_pred).mean()
mape = np.mean(np.abs((actual_prices - predicted_prices) / actual_prices)) * 100
# Save results to CSV
with open(results_csv, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([left_out, "|".join(used), rmse, mae, r2, mape, directional_accuracy])
print(f'Left out {left_out}: RMSE={rmse:.4f}, MAE={mae:.4f}, R2={r2:.4f}, MAPE={mape:.2f}%, DirAcc={directional_accuracy*100:.2f}%')
# Save results to CSV for all features used in this run
feature_importance_dict = model.get_feature_importance(feature_cols)
with open(results_csv, 'a', newline='') as f:
writer = csv.writer(f)
for feature in feature_cols:
importance = feature_importance_dict.get(feature, 0.0)
fi_str = format(importance, ".6f")
row = [feature]
for val in [rmse, mae, r2, mape, directional_accuracy]:
if isinstance(val, float):
row.append(format(val, '.10f'))
else:
row.append(val)
row.append(fi_str)
writer.writerow(row)
print('Feature importances and results saved for all features used in this run.')
# Plotting for this run
plot_prefix = f'loo_{left_out}'
print('Plotting distribution of absolute prediction errors...')
plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix)
# Plotting for this run
# plot_prefix = f'cumulative_{n}_features'
# plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix)
# plot_direction_transition_heatmap(actual_prices, predicted_prices, prefix=plot_prefix)
except Exception as e:
print(f'Cumulative feature run failed: {e}')
print(f'All cumulative feature runs completed. Results saved to {results_csv}')
plot_prefix = f'all_features'
plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix)
print('Plotting directional accuracy...')
plot_direction_transition_heatmap(actual_prices, predicted_prices, prefix=plot_prefix)
except Exception as e:
print(f'Leave-one-out failed for {left_out}: {e}')
print(f'All leave-one-out runs completed. Results saved to {results_csv}')
sys.exit(0)

View File

@ -8,6 +8,7 @@ dependencies = [
"dash>=3.0.4",
"numba>=0.61.2",
"pandas>=2.2.3",
"pandas-ta>=0.3.14b0",
"scikit-learn>=1.6.1",
"ta>=0.11.0",
"xgboost>=3.0.2",

View File

@ -0,0 +1,251 @@
from numba import njit
import pandas as pd
import numpy as np
def calc_rsi(close):
from ta.momentum import RSIIndicator
return ('rsi', RSIIndicator(close, window=14).rsi())
def calc_macd(close):
from ta.trend import MACD
return ('macd', MACD(close).macd())
def calc_bollinger(close):
from ta.volatility import BollingerBands
bb = BollingerBands(close=close, window=20, window_dev=2)
return [
('bb_bbm', bb.bollinger_mavg()),
('bb_bbh', bb.bollinger_hband()),
('bb_bbl', bb.bollinger_lband()),
('bb_bb_width', bb.bollinger_hband() - bb.bollinger_lband())
]
def calc_stochastic(high, low, close):
from ta.momentum import StochasticOscillator
stoch = StochasticOscillator(high=high, low=low, close=close, window=14, smooth_window=3)
return [
('stoch_k', stoch.stoch()),
('stoch_d', stoch.stoch_signal())
]
def calc_atr(high, low, close):
from ta.volatility import AverageTrueRange
atr = AverageTrueRange(high=high, low=low, close=close, window=14)
return ('atr', atr.average_true_range())
def calc_cci(high, low, close):
from ta.trend import CCIIndicator
cci = CCIIndicator(high=high, low=low, close=close, window=20)
return ('cci', cci.cci())
def calc_williamsr(high, low, close):
from ta.momentum import WilliamsRIndicator
willr = WilliamsRIndicator(high=high, low=low, close=close, lbp=14)
return ('williams_r', willr.williams_r())
def calc_ema(close):
from ta.trend import EMAIndicator
ema = EMAIndicator(close=close, window=14)
return ('ema_14', ema.ema_indicator())
def calc_obv(close, volume):
from ta.volume import OnBalanceVolumeIndicator
obv = OnBalanceVolumeIndicator(close=close, volume=volume)
return ('obv', obv.on_balance_volume())
def calc_cmf(high, low, close, volume):
from ta.volume import ChaikinMoneyFlowIndicator
cmf = ChaikinMoneyFlowIndicator(high=high, low=low, close=close, volume=volume, window=20)
return ('cmf', cmf.chaikin_money_flow())
def calc_sma(close):
from ta.trend import SMAIndicator
return [
('sma_50', SMAIndicator(close, window=50).sma_indicator()),
('sma_200', SMAIndicator(close, window=200).sma_indicator())
]
def calc_roc(close):
from ta.momentum import ROCIndicator
return ('roc_10', ROCIndicator(close, window=10).roc())
def calc_momentum(close):
return ('momentum_10', close - close.shift(10))
def calc_psar(high, low, close):
# Use the Numba-accelerated fast_psar function for speed
psar_values = fast_psar(np.array(high), np.array(low), np.array(close))
return [('psar', pd.Series(psar_values, index=close.index))]
def calc_donchian(high, low, close):
from ta.volatility import DonchianChannel
donchian = DonchianChannel(high, low, close, window=20)
return [
('donchian_hband', donchian.donchian_channel_hband()),
('donchian_lband', donchian.donchian_channel_lband()),
('donchian_mband', donchian.donchian_channel_mband())
]
def calc_keltner(high, low, close):
from ta.volatility import KeltnerChannel
keltner = KeltnerChannel(high, low, close, window=20)
return [
('keltner_hband', keltner.keltner_channel_hband()),
('keltner_lband', keltner.keltner_channel_lband()),
('keltner_mband', keltner.keltner_channel_mband())
]
def calc_dpo(close):
from ta.trend import DPOIndicator
return ('dpo_20', DPOIndicator(close, window=20).dpo())
def calc_ultimate(high, low, close):
from ta.momentum import UltimateOscillator
return ('ultimate_osc', UltimateOscillator(high, low, close).ultimate_oscillator())
def calc_ichimoku(high, low):
from ta.trend import IchimokuIndicator
ichimoku = IchimokuIndicator(high, low, window1=9, window2=26, window3=52)
return [
('ichimoku_a', ichimoku.ichimoku_a()),
('ichimoku_b', ichimoku.ichimoku_b()),
('ichimoku_base_line', ichimoku.ichimoku_base_line()),
('ichimoku_conversion_line', ichimoku.ichimoku_conversion_line())
]
def calc_elder_ray(close, low, high):
from ta.trend import EMAIndicator
ema = EMAIndicator(close, window=13).ema_indicator()
return [
('elder_ray_bull', ema - low),
('elder_ray_bear', ema - high)
]
def calc_daily_return(close):
from ta.others import DailyReturnIndicator
return ('daily_return', DailyReturnIndicator(close).daily_return())
@njit
def fast_psar(high, low, close, af=0.02, max_af=0.2):
length = len(close)
psar = np.zeros(length)
bull = True
af_step = af
ep = low[0]
psar[0] = low[0]
for i in range(1, length):
prev_psar = psar[i-1]
if bull:
psar[i] = prev_psar + af_step * (ep - prev_psar)
if low[i] < psar[i]:
bull = False
psar[i] = ep
af_step = af
ep = low[i]
else:
if high[i] > ep:
ep = high[i]
af_step = min(af_step + af, max_af)
else:
psar[i] = prev_psar + af_step * (ep - prev_psar)
if high[i] > psar[i]:
bull = True
psar[i] = ep
af_step = af
ep = high[i]
else:
if low[i] < ep:
ep = low[i]
af_step = min(af_step + af, max_af)
return psar
def compute_lag(df, col, lag):
return df[col].shift(lag)
def compute_rolling(df, col, stat, window):
if stat == 'mean':
return df[col].rolling(window).mean()
elif stat == 'std':
return df[col].rolling(window).std()
elif stat == 'min':
return df[col].rolling(window).min()
elif stat == 'max':
return df[col].rolling(window).max()
def compute_log_return(df, horizon):
return np.log(df['Close'] / df['Close'].shift(horizon))
def compute_volatility(df, window):
return df['log_return'].rolling(window).std()
def run_feature_job(job, df):
feature_name, func, *args = job
print(f'Computing feature: {feature_name}')
result = func(df, *args)
return feature_name, result
def calc_adx(high, low, close):
from ta.trend import ADXIndicator
adx = ADXIndicator(high=high, low=low, close=close, window=14)
return [
('adx', adx.adx()),
('adx_pos', adx.adx_pos()),
('adx_neg', adx.adx_neg())
]
def calc_trix(close):
from ta.trend import TRIXIndicator
trix = TRIXIndicator(close=close, window=15)
return ('trix', trix.trix())
def calc_vortex(high, low, close):
from ta.trend import VortexIndicator
vortex = VortexIndicator(high=high, low=low, close=close, window=14)
return [
('vortex_pos', vortex.vortex_indicator_pos()),
('vortex_neg', vortex.vortex_indicator_neg())
]
def calc_kama(close):
import pandas_ta as ta
kama = ta.kama(close, length=10)
return ('kama', kama)
def calc_force_index(close, volume):
from ta.volume import ForceIndexIndicator
fi = ForceIndexIndicator(close=close, volume=volume, window=13)
return ('force_index', fi.force_index())
def calc_eom(high, low, volume):
from ta.volume import EaseOfMovementIndicator
eom = EaseOfMovementIndicator(high=high, low=low, volume=volume, window=14)
return ('eom', eom.ease_of_movement())
def calc_mfi(high, low, close, volume):
from ta.volume import MFIIndicator
mfi = MFIIndicator(high=high, low=low, close=close, volume=volume, window=14)
return ('mfi', mfi.money_flow_index())
def calc_adi(high, low, close, volume):
from ta.volume import AccDistIndexIndicator
adi = AccDistIndexIndicator(high=high, low=low, close=close, volume=volume)
return ('adi', adi.acc_dist_index())
def calc_tema(close):
import pandas_ta as ta
tema = ta.tema(close, length=10)
return ('tema', tema)
def calc_stochrsi(close):
from ta.momentum import StochRSIIndicator
stochrsi = StochRSIIndicator(close=close, window=14, smooth1=3, smooth2=3)
return [
('stochrsi', stochrsi.stochrsi()),
('stochrsi_k', stochrsi.stochrsi_k()),
('stochrsi_d', stochrsi.stochrsi_d())
]
def calc_awesome_oscillator(high, low):
from ta.momentum import AwesomeOscillatorIndicator
ao = AwesomeOscillatorIndicator(high=high, low=low, window1=5, window2=34)
return ('awesome_osc', ao.awesome_oscillator())

11
uv.lock generated
View File

@ -314,6 +314,7 @@ dependencies = [
{ name = "dash" },
{ name = "numba" },
{ name = "pandas" },
{ name = "pandas-ta" },
{ name = "scikit-learn" },
{ name = "ta" },
{ name = "xgboost" },
@ -324,6 +325,7 @@ requires-dist = [
{ name = "dash", specifier = ">=3.0.4" },
{ name = "numba", specifier = ">=0.61.2" },
{ name = "pandas", specifier = ">=2.2.3" },
{ name = "pandas-ta", specifier = ">=0.3.14b0" },
{ name = "scikit-learn", specifier = ">=1.6.1" },
{ name = "ta", specifier = ">=0.11.0" },
{ name = "xgboost", specifier = ">=3.0.2" },
@ -372,6 +374,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ab/5f/b38085618b950b79d2d9164a711c52b10aefc0ae6833b96f626b7021b2ed/pandas-2.2.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:ad5b65698ab28ed8d7f18790a0dc58005c7629f227be9ecc1072aa74c0c1d43a", size = 13098436, upload-time = "2024-09-20T13:09:48.112Z" },
]
[[package]]
name = "pandas-ta"
version = "0.3.14b0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pandas" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f7/0b/1666f0a185d4f08215f53cc088122a73c92421447b04028f0464fabe1ce6/pandas_ta-0.3.14b.tar.gz", hash = "sha256:0fa35aec831d2815ea30b871688a8d20a76b288a7be2d26cc00c35cd8c09a993", size = 115089, upload-time = "2021-07-28T20:51:17.456Z" }
[[package]]
name = "plotly"
version = "6.1.2"