OHLCVPredictor/feature_engineering.py

590 lines
24 KiB
Python
Raw Permalink Normal View History

import os
import numpy as np
import pandas as pd
import ta
try:
from .technical_indicator_functions import *
except ImportError:
from technical_indicator_functions import *
def feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes):
"""
Compute and/or load features for the given DataFrame.
If csv_prefix is provided, features are cached to disk; otherwise, features are only computed in memory.
Args:
df (pd.DataFrame): Input OHLCV data.
csv_prefix (str or None): Prefix for feature files (for caching). If None or '', disables caching.
ohlcv_cols (list): List of OHLCV column names.
lags (int): Number of lag features.
window_sizes (list): List of window sizes for rolling features.
Returns:
dict: Dictionary of computed features.
"""
features_dict = {}
# RSI
if csv_prefix:
feature_file = f'../data/{csv_prefix}_rsi.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['rsi'] = pd.Series(arr, index=df.index)
else:
_, values = calc_rsi(df['Close'])
features_dict['rsi'] = values
np.save(feature_file, values.values)
else:
_, values = calc_rsi(df['Close'])
features_dict['rsi'] = values
# MACD
if csv_prefix:
feature_file = f'../data/{csv_prefix}_macd.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['macd'] = pd.Series(arr, index=df.index)
else:
_, values = calc_macd(df['Close'])
features_dict['macd'] = values
np.save(feature_file, values.values)
else:
_, values = calc_macd(df['Close'])
features_dict['macd'] = values
# ATR
if csv_prefix:
feature_file = f'../data/{csv_prefix}_atr.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['atr'] = pd.Series(arr, index=df.index)
else:
_, values = calc_atr(df['High'], df['Low'], df['Close'])
features_dict['atr'] = values
np.save(feature_file, values.values)
else:
_, values = calc_atr(df['High'], df['Low'], df['Close'])
features_dict['atr'] = values
# CCI
if csv_prefix:
feature_file = f'../data/{csv_prefix}_cci.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['cci'] = pd.Series(arr, index=df.index)
else:
_, values = calc_cci(df['High'], df['Low'], df['Close'])
features_dict['cci'] = values
np.save(feature_file, values.values)
else:
_, values = calc_cci(df['High'], df['Low'], df['Close'])
features_dict['cci'] = values
# Williams %R
if csv_prefix:
feature_file = f'../data/{csv_prefix}_williams_r.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['williams_r'] = pd.Series(arr, index=df.index)
else:
_, values = calc_williamsr(df['High'], df['Low'], df['Close'])
features_dict['williams_r'] = values
np.save(feature_file, values.values)
else:
_, values = calc_williamsr(df['High'], df['Low'], df['Close'])
features_dict['williams_r'] = values
# EMA 14
if csv_prefix:
feature_file = f'../data/{csv_prefix}_ema_14.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['ema_14'] = pd.Series(arr, index=df.index)
else:
_, values = calc_ema(df['Close'])
features_dict['ema_14'] = values
np.save(feature_file, values.values)
else:
_, values = calc_ema(df['Close'])
features_dict['ema_14'] = values
# OBV
if csv_prefix:
feature_file = f'../data/{csv_prefix}_obv.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['obv'] = pd.Series(arr, index=df.index)
else:
_, values = calc_obv(df['Close'], df['Volume'])
features_dict['obv'] = values
np.save(feature_file, values.values)
else:
_, values = calc_obv(df['Close'], df['Volume'])
features_dict['obv'] = values
# CMF
if csv_prefix:
feature_file = f'../data/{csv_prefix}_cmf.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['cmf'] = pd.Series(arr, index=df.index)
else:
_, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume'])
features_dict['cmf'] = values
np.save(feature_file, values.values)
else:
_, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume'])
features_dict['cmf'] = values
# ROC 10
if csv_prefix:
feature_file = f'../data/{csv_prefix}_roc_10.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['roc_10'] = pd.Series(arr, index=df.index)
else:
_, values = calc_roc(df['Close'])
features_dict['roc_10'] = values
np.save(feature_file, values.values)
else:
_, values = calc_roc(df['Close'])
features_dict['roc_10'] = values
# DPO 20
if csv_prefix:
feature_file = f'../data/{csv_prefix}_dpo_20.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['dpo_20'] = pd.Series(arr, index=df.index)
else:
_, values = calc_dpo(df['Close'])
features_dict['dpo_20'] = values
np.save(feature_file, values.values)
else:
_, values = calc_dpo(df['Close'])
features_dict['dpo_20'] = values
# Ultimate Oscillator
if csv_prefix:
feature_file = f'../data/{csv_prefix}_ultimate_osc.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['ultimate_osc'] = pd.Series(arr, index=df.index)
else:
_, values = calc_ultimate(df['High'], df['Low'], df['Close'])
features_dict['ultimate_osc'] = values
np.save(feature_file, values.values)
else:
_, values = calc_ultimate(df['High'], df['Low'], df['Close'])
features_dict['ultimate_osc'] = values
# Daily Return
if csv_prefix:
feature_file = f'../data/{csv_prefix}_daily_return.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['daily_return'] = pd.Series(arr, index=df.index)
else:
_, values = calc_daily_return(df['Close'])
features_dict['daily_return'] = values
np.save(feature_file, values.values)
else:
_, values = calc_daily_return(df['Close'])
features_dict['daily_return'] = values
# Multi-column indicators
# Bollinger Bands
result = calc_bollinger(df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Stochastic Oscillator
result = calc_stochastic(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# SMA
result = calc_sma(df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# PSAR
result = calc_psar(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Donchian Channel
result = calc_donchian(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Keltner Channel
result = calc_keltner(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Ichimoku
result = calc_ichimoku(df['High'], df['Low'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Elder Ray
result = calc_elder_ray(df['Close'], df['Low'], df['High'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Prepare lags, rolling stats, log returns, and volatility features sequentially
# Lags
for col in ohlcv_cols:
for lag in range(1, lags + 1):
feature_name = f'{col}_lag{lag}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_lag(df, col, lag)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_lag(df, col, lag)
features_dict[feature_name] = result
# Rolling statistics
for col in ohlcv_cols:
for window in window_sizes:
if (col == 'Open' and window == 5):
continue
if (col == 'High' and window == 5):
continue
if (col == 'High' and window == 30):
continue
if (col == 'Low' and window == 15):
continue
for stat in ['mean', 'std', 'min', 'max']:
feature_name = f'{col}_roll_{stat}_{window}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_rolling(df, col, stat, window)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_rolling(df, col, stat, window)
features_dict[feature_name] = result
# Log returns for different horizons
for horizon in [5, 15, 30]:
feature_name = f'log_return_{horizon}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_log_return(df, horizon)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_log_return(df, horizon)
features_dict[feature_name] = result
# Volatility
for window in window_sizes:
feature_name = f'volatility_{window}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_volatility(df, window)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_volatility(df, window)
features_dict[feature_name] = result
# --- Additional Technical Indicator Features ---
# ADX
adx_names = ['adx', 'adx_pos', 'adx_neg']
adx_files = [f'../data/{csv_prefix}_{name}.npy' for name in adx_names]
if csv_prefix and all(os.path.exists(f) for f in adx_files):
for name, f in zip(adx_names, adx_files):
arr = np.load(f)
features_dict[name] = pd.Series(arr, index=df.index)
else:
result = calc_adx(df['High'], df['Low'], df['Close'])
for subname, values in result:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
features_dict[subname] = values
if csv_prefix:
np.save(sub_feature_file, values.values)
# Force Index
feature_file = f'../data/{csv_prefix}_force_index.npy'
if csv_prefix:
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['force_index'] = pd.Series(arr, index=df.index)
else:
_, values = calc_force_index(df['Close'], df['Volume'])
features_dict['force_index'] = values
np.save(feature_file, values.values)
else:
_, values = calc_force_index(df['Close'], df['Volume'])
features_dict['force_index'] = values
# Supertrend indicators (simplified implementation)
for period, multiplier in [(12, 3.0), (10, 1.0), (11, 2.0)]:
st_name = f'supertrend_{period}_{multiplier}'
st_trend_name = f'supertrend_trend_{period}_{multiplier}'
st_file = f'../data/{csv_prefix}_{st_name}.npy'
st_trend_file = f'../data/{csv_prefix}_{st_trend_name}.npy'
if csv_prefix and os.path.exists(st_file) and os.path.exists(st_trend_file):
features_dict[st_name] = pd.Series(np.load(st_file), index=df.index)
features_dict[st_trend_name] = pd.Series(np.load(st_trend_file), index=df.index)
else:
# Simple supertrend alternative using ATR and moving averages
from ta.volatility import AverageTrueRange
atr = AverageTrueRange(df['High'], df['Low'], df['Close'], window=period).average_true_range()
hl_avg = (df['High'] + df['Low']) / 2
basic_ub = hl_avg + (multiplier * atr)
basic_lb = hl_avg - (multiplier * atr)
# Simplified supertrend calculation
supertrend = hl_avg.copy()
trend = pd.Series(1, index=df.index) # 1 for uptrend, -1 for downtrend
features_dict[st_name] = supertrend
features_dict[st_trend_name] = trend
if csv_prefix:
np.save(st_file, features_dict[st_name].values)
np.save(st_trend_file, features_dict[st_trend_name].values)
# --- OHLCV-only additional features ---
# Helper for caching single-series features using the same pattern as above
def _save_or_load_feature(name, series):
if csv_prefix:
feature_file = f'../data/{csv_prefix}_{name}.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict[name] = pd.Series(arr, index=df.index)
else:
# Ensure pandas Series with correct index
series = pd.Series(series, index=df.index)
features_dict[name] = series
np.save(feature_file, series.values)
else:
series = pd.Series(series, index=df.index)
features_dict[name] = series
eps = 1e-9
# Candle shape/position
body = (df['Close'] - df['Open']).abs()
rng = (df['High'] - df['Low'])
upper_wick = df['High'] - df[['Open', 'Close']].max(axis=1)
lower_wick = df[['Open', 'Close']].min(axis=1) - df['Low']
_save_or_load_feature('candle_body', body)
_save_or_load_feature('candle_upper_wick', upper_wick)
_save_or_load_feature('candle_lower_wick', lower_wick)
_save_or_load_feature('candle_body_to_range', body / (rng + eps))
_save_or_load_feature('candle_upper_wick_to_range', upper_wick / (rng + eps))
_save_or_load_feature('candle_lower_wick_to_range', lower_wick / (rng + eps))
_save_or_load_feature('close_pos_in_bar', (df['Close'] - df['Low']) / (rng + eps))
for w in window_sizes:
roll_max = df['High'].rolling(w).max()
roll_min = df['Low'].rolling(w).min()
close_pos_roll = (df['Close'] - roll_min) / ((roll_max - roll_min) + eps)
_save_or_load_feature(f'close_pos_in_roll_{w}', close_pos_roll)
# Range-based volatility (Parkinson, GarmanKlass, RogersSatchell, YangZhang)
log_hl = np.log((df['High'] / df['Low']).replace(0, np.nan))
log_co = np.log((df['Close'] / df['Open']).replace(0, np.nan))
log_close = np.log(df['Close'].replace(0, np.nan))
ret1 = log_close.diff()
for w in window_sizes:
# Parkinson
parkinson_var = (log_hl.pow(2)).rolling(w).mean() / (4.0 * np.log(2.0))
_save_or_load_feature(f'park_vol_{w}', np.sqrt(parkinson_var.clip(lower=0)))
# GarmanKlass
gk_var = 0.5 * (log_hl.pow(2)).rolling(w).mean() - (2.0 * np.log(2.0) - 1.0) * (log_co.pow(2)).rolling(w).mean()
_save_or_load_feature(f'gk_vol_{w}', np.sqrt(gk_var.clip(lower=0)))
# RogersSatchell
u = np.log((df['High'] / df['Close']).replace(0, np.nan))
d = np.log((df['Low'] / df['Close']).replace(0, np.nan))
uo = np.log((df['High'] / df['Open']).replace(0, np.nan))
do = np.log((df['Low'] / df['Open']).replace(0, np.nan))
rs_term = u * uo + d * do
rs_var = rs_term.rolling(w).mean()
_save_or_load_feature(f'rs_vol_{w}', np.sqrt(rs_var.clip(lower=0)))
# YangZhang
g = np.log((df['Open'] / df['Close'].shift(1)).replace(0, np.nan))
u_yz = np.log((df['High'] / df['Open']).replace(0, np.nan))
d_yz = np.log((df['Low'] / df['Open']).replace(0, np.nan))
c_yz = np.log((df['Close'] / df['Open']).replace(0, np.nan))
sigma_g2 = g.rolling(w).var()
sigma_c2 = c_yz.rolling(w).var()
sigma_rs = (u_yz * (u_yz - c_yz) + d_yz * (d_yz - c_yz)).rolling(w).mean()
k = 0.34 / (1.34 + (w + 1.0) / max(w - 1.0, 1.0))
yz_var = sigma_g2 + k * sigma_c2 + (1.0 - k) * sigma_rs
_save_or_load_feature(f'yz_vol_{w}', np.sqrt(yz_var.clip(lower=0)))
# Trend strength: rolling linear-regression slope and R² of log price
def _linreg_slope(arr):
y = np.asarray(arr, dtype=float)
n = y.size
x = np.arange(n, dtype=float)
xmean = (n - 1.0) / 2.0
ymean = np.nanmean(y)
xm = x - xmean
ym = y - ymean
cov = np.nansum(xm * ym)
varx = np.nansum(xm * xm) + eps
return cov / varx
def _linreg_r2(arr):
y = np.asarray(arr, dtype=float)
n = y.size
x = np.arange(n, dtype=float)
xmean = (n - 1.0) / 2.0
ymean = np.nanmean(y)
slope = _linreg_slope(arr)
intercept = ymean - slope * xmean
yhat = slope * x + intercept
ss_tot = np.nansum((y - ymean) ** 2)
ss_res = np.nansum((y - yhat) ** 2)
return 1.0 - ss_res / (ss_tot + eps)
for w in window_sizes:
_save_or_load_feature(f'lr_slope_log_close_{w}', log_close.rolling(w).apply(_linreg_slope, raw=True))
_save_or_load_feature(f'lr_r2_log_close_{w}', log_close.rolling(w).apply(_linreg_r2, raw=True))
# EMA(7), EMA(21), their slopes and spread
ema_7 = df['Close'].ewm(span=7, adjust=False).mean()
ema_21 = df['Close'].ewm(span=21, adjust=False).mean()
_save_or_load_feature('ema_7', ema_7)
_save_or_load_feature('ema_21', ema_21)
_save_or_load_feature('ema_7_slope', ema_7.pct_change())
_save_or_load_feature('ema_21_slope', ema_21.pct_change())
_save_or_load_feature('ema_7_21_spread', ema_7 - ema_21)
# VWAP over windows and distance of Close from VWAP
tp = (df['High'] + df['Low'] + df['Close']) / 3.0
for w in window_sizes:
vwap_w = (tp * df['Volume']).rolling(w).sum() / (df['Volume'].rolling(w).sum() + eps)
_save_or_load_feature(f'vwap_{w}', vwap_w)
_save_or_load_feature(f'vwap_dist_{w}', (df['Close'] - vwap_w) / (vwap_w + eps))
# Autocorrelation of log returns at lags 15 (rolling window 30)
for lag in range(1, 6):
ac = ret1.rolling(30).corr(ret1.shift(lag))
_save_or_load_feature(f'ret_autocorr_lag{lag}_30', ac)
# Rolling skewness and kurtosis of returns (15, 30)
for w in [15, 30]:
_save_or_load_feature(f'ret_skew_{w}', ret1.rolling(w).skew())
_save_or_load_feature(f'ret_kurt_{w}', ret1.rolling(w).kurt())
# Volume z-score and return-volume rolling correlation (15, 30)
for w in [15, 30]:
vol_mean = df['Volume'].rolling(w).mean()
vol_std = df['Volume'].rolling(w).std()
_save_or_load_feature(f'volume_zscore_{w}', (df['Volume'] - vol_mean) / (vol_std + eps))
_save_or_load_feature(f'ret_vol_corr_{w}', ret1.rolling(w).corr(df['Volume']))
# Cyclical time features and relative volume vs hour-of-day average
try:
hours = pd.to_datetime(df['Timestamp']).dt.hour
except Exception:
try:
hours = pd.to_datetime(df['Timestamp'], unit='s', errors='coerce').dt.hour
except Exception:
hours = pd.Series(np.nan, index=df.index)
_save_or_load_feature('sin_hour', np.sin(2.0 * np.pi * (hours.fillna(0)) / 24.0))
_save_or_load_feature('cos_hour', np.cos(2.0 * np.pi * (hours.fillna(0)) / 24.0))
hourly_mean_vol = df['Volume'].groupby(hours).transform('mean')
_save_or_load_feature('relative_volume_hour', df['Volume'] / (hourly_mean_vol + eps))
return features_dict