OHLCVPredictor/feature_engineering.py

437 lines
17 KiB
Python
Raw Normal View History

import os
import numpy as np
import pandas as pd
import ta
try:
from .technical_indicator_functions import *
except ImportError:
from technical_indicator_functions import *
def feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes):
"""
Compute and/or load features for the given DataFrame.
If csv_prefix is provided, features are cached to disk; otherwise, features are only computed in memory.
Args:
df (pd.DataFrame): Input OHLCV data.
csv_prefix (str or None): Prefix for feature files (for caching). If None or '', disables caching.
ohlcv_cols (list): List of OHLCV column names.
lags (int): Number of lag features.
window_sizes (list): List of window sizes for rolling features.
Returns:
dict: Dictionary of computed features.
"""
features_dict = {}
# RSI
if csv_prefix:
feature_file = f'../data/{csv_prefix}_rsi.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['rsi'] = pd.Series(arr, index=df.index)
else:
_, values = calc_rsi(df['Close'])
features_dict['rsi'] = values
np.save(feature_file, values.values)
else:
_, values = calc_rsi(df['Close'])
features_dict['rsi'] = values
# MACD
if csv_prefix:
feature_file = f'../data/{csv_prefix}_macd.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['macd'] = pd.Series(arr, index=df.index)
else:
_, values = calc_macd(df['Close'])
features_dict['macd'] = values
np.save(feature_file, values.values)
else:
_, values = calc_macd(df['Close'])
features_dict['macd'] = values
# ATR
if csv_prefix:
feature_file = f'../data/{csv_prefix}_atr.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['atr'] = pd.Series(arr, index=df.index)
else:
_, values = calc_atr(df['High'], df['Low'], df['Close'])
features_dict['atr'] = values
np.save(feature_file, values.values)
else:
_, values = calc_atr(df['High'], df['Low'], df['Close'])
features_dict['atr'] = values
# CCI
if csv_prefix:
feature_file = f'../data/{csv_prefix}_cci.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['cci'] = pd.Series(arr, index=df.index)
else:
_, values = calc_cci(df['High'], df['Low'], df['Close'])
features_dict['cci'] = values
np.save(feature_file, values.values)
else:
_, values = calc_cci(df['High'], df['Low'], df['Close'])
features_dict['cci'] = values
# Williams %R
if csv_prefix:
feature_file = f'../data/{csv_prefix}_williams_r.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['williams_r'] = pd.Series(arr, index=df.index)
else:
_, values = calc_williamsr(df['High'], df['Low'], df['Close'])
features_dict['williams_r'] = values
np.save(feature_file, values.values)
else:
_, values = calc_williamsr(df['High'], df['Low'], df['Close'])
features_dict['williams_r'] = values
# EMA 14
if csv_prefix:
feature_file = f'../data/{csv_prefix}_ema_14.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['ema_14'] = pd.Series(arr, index=df.index)
else:
_, values = calc_ema(df['Close'])
features_dict['ema_14'] = values
np.save(feature_file, values.values)
else:
_, values = calc_ema(df['Close'])
features_dict['ema_14'] = values
# OBV
if csv_prefix:
feature_file = f'../data/{csv_prefix}_obv.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['obv'] = pd.Series(arr, index=df.index)
else:
_, values = calc_obv(df['Close'], df['Volume'])
features_dict['obv'] = values
np.save(feature_file, values.values)
else:
_, values = calc_obv(df['Close'], df['Volume'])
features_dict['obv'] = values
# CMF
if csv_prefix:
feature_file = f'../data/{csv_prefix}_cmf.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['cmf'] = pd.Series(arr, index=df.index)
else:
_, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume'])
features_dict['cmf'] = values
np.save(feature_file, values.values)
else:
_, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume'])
features_dict['cmf'] = values
# ROC 10
if csv_prefix:
feature_file = f'../data/{csv_prefix}_roc_10.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['roc_10'] = pd.Series(arr, index=df.index)
else:
_, values = calc_roc(df['Close'])
features_dict['roc_10'] = values
np.save(feature_file, values.values)
else:
_, values = calc_roc(df['Close'])
features_dict['roc_10'] = values
# DPO 20
if csv_prefix:
feature_file = f'../data/{csv_prefix}_dpo_20.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['dpo_20'] = pd.Series(arr, index=df.index)
else:
_, values = calc_dpo(df['Close'])
features_dict['dpo_20'] = values
np.save(feature_file, values.values)
else:
_, values = calc_dpo(df['Close'])
features_dict['dpo_20'] = values
# Ultimate Oscillator
if csv_prefix:
feature_file = f'../data/{csv_prefix}_ultimate_osc.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['ultimate_osc'] = pd.Series(arr, index=df.index)
else:
_, values = calc_ultimate(df['High'], df['Low'], df['Close'])
features_dict['ultimate_osc'] = values
np.save(feature_file, values.values)
else:
_, values = calc_ultimate(df['High'], df['Low'], df['Close'])
features_dict['ultimate_osc'] = values
# Daily Return
if csv_prefix:
feature_file = f'../data/{csv_prefix}_daily_return.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['daily_return'] = pd.Series(arr, index=df.index)
else:
_, values = calc_daily_return(df['Close'])
features_dict['daily_return'] = values
np.save(feature_file, values.values)
else:
_, values = calc_daily_return(df['Close'])
features_dict['daily_return'] = values
# Multi-column indicators
# Bollinger Bands
result = calc_bollinger(df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Stochastic Oscillator
result = calc_stochastic(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# SMA
result = calc_sma(df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# PSAR
result = calc_psar(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Donchian Channel
result = calc_donchian(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Keltner Channel
result = calc_keltner(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Ichimoku
result = calc_ichimoku(df['High'], df['Low'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Elder Ray
result = calc_elder_ray(df['Close'], df['Low'], df['High'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Prepare lags, rolling stats, log returns, and volatility features sequentially
# Lags
for col in ohlcv_cols:
for lag in range(1, lags + 1):
feature_name = f'{col}_lag{lag}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_lag(df, col, lag)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_lag(df, col, lag)
features_dict[feature_name] = result
# Rolling statistics
for col in ohlcv_cols:
for window in window_sizes:
if (col == 'Open' and window == 5):
continue
if (col == 'High' and window == 5):
continue
if (col == 'High' and window == 30):
continue
if (col == 'Low' and window == 15):
continue
for stat in ['mean', 'std', 'min', 'max']:
feature_name = f'{col}_roll_{stat}_{window}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_rolling(df, col, stat, window)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_rolling(df, col, stat, window)
features_dict[feature_name] = result
# Log returns for different horizons
for horizon in [5, 15, 30]:
feature_name = f'log_return_{horizon}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_log_return(df, horizon)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_log_return(df, horizon)
features_dict[feature_name] = result
# Volatility
for window in window_sizes:
feature_name = f'volatility_{window}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_volatility(df, window)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_volatility(df, window)
features_dict[feature_name] = result
# --- Additional Technical Indicator Features ---
# ADX
adx_names = ['adx', 'adx_pos', 'adx_neg']
adx_files = [f'../data/{csv_prefix}_{name}.npy' for name in adx_names]
if csv_prefix and all(os.path.exists(f) for f in adx_files):
for name, f in zip(adx_names, adx_files):
arr = np.load(f)
features_dict[name] = pd.Series(arr, index=df.index)
else:
result = calc_adx(df['High'], df['Low'], df['Close'])
for subname, values in result:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
features_dict[subname] = values
if csv_prefix:
np.save(sub_feature_file, values.values)
# Force Index
feature_file = f'../data/{csv_prefix}_force_index.npy'
if csv_prefix:
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['force_index'] = pd.Series(arr, index=df.index)
else:
_, values = calc_force_index(df['Close'], df['Volume'])
features_dict['force_index'] = values
np.save(feature_file, values.values)
else:
_, values = calc_force_index(df['Close'], df['Volume'])
features_dict['force_index'] = values
# Supertrend indicators (simplified implementation)
for period, multiplier in [(12, 3.0), (10, 1.0), (11, 2.0)]:
st_name = f'supertrend_{period}_{multiplier}'
st_trend_name = f'supertrend_trend_{period}_{multiplier}'
st_file = f'../data/{csv_prefix}_{st_name}.npy'
st_trend_file = f'../data/{csv_prefix}_{st_trend_name}.npy'
if csv_prefix and os.path.exists(st_file) and os.path.exists(st_trend_file):
features_dict[st_name] = pd.Series(np.load(st_file), index=df.index)
features_dict[st_trend_name] = pd.Series(np.load(st_trend_file), index=df.index)
else:
# Simple supertrend alternative using ATR and moving averages
from ta.volatility import AverageTrueRange
atr = AverageTrueRange(df['High'], df['Low'], df['Close'], window=period).average_true_range()
hl_avg = (df['High'] + df['Low']) / 2
basic_ub = hl_avg + (multiplier * atr)
basic_lb = hl_avg - (multiplier * atr)
# Simplified supertrend calculation
supertrend = hl_avg.copy()
trend = pd.Series(1, index=df.index) # 1 for uptrend, -1 for downtrend
features_dict[st_name] = supertrend
features_dict[st_trend_name] = trend
if csv_prefix:
np.save(st_file, features_dict[st_name].values)
np.save(st_trend_file, features_dict[st_trend_name].values)
return features_dict