OHLCVPredictor/feature_engineering.py

590 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import numpy as np
import pandas as pd
import ta
try:
from .technical_indicator_functions import *
except ImportError:
from technical_indicator_functions import *
def feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes):
"""
Compute and/or load features for the given DataFrame.
If csv_prefix is provided, features are cached to disk; otherwise, features are only computed in memory.
Args:
df (pd.DataFrame): Input OHLCV data.
csv_prefix (str or None): Prefix for feature files (for caching). If None or '', disables caching.
ohlcv_cols (list): List of OHLCV column names.
lags (int): Number of lag features.
window_sizes (list): List of window sizes for rolling features.
Returns:
dict: Dictionary of computed features.
"""
features_dict = {}
# RSI
if csv_prefix:
feature_file = f'../data/{csv_prefix}_rsi.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['rsi'] = pd.Series(arr, index=df.index)
else:
_, values = calc_rsi(df['Close'])
features_dict['rsi'] = values
np.save(feature_file, values.values)
else:
_, values = calc_rsi(df['Close'])
features_dict['rsi'] = values
# MACD
if csv_prefix:
feature_file = f'../data/{csv_prefix}_macd.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['macd'] = pd.Series(arr, index=df.index)
else:
_, values = calc_macd(df['Close'])
features_dict['macd'] = values
np.save(feature_file, values.values)
else:
_, values = calc_macd(df['Close'])
features_dict['macd'] = values
# ATR
if csv_prefix:
feature_file = f'../data/{csv_prefix}_atr.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['atr'] = pd.Series(arr, index=df.index)
else:
_, values = calc_atr(df['High'], df['Low'], df['Close'])
features_dict['atr'] = values
np.save(feature_file, values.values)
else:
_, values = calc_atr(df['High'], df['Low'], df['Close'])
features_dict['atr'] = values
# CCI
if csv_prefix:
feature_file = f'../data/{csv_prefix}_cci.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['cci'] = pd.Series(arr, index=df.index)
else:
_, values = calc_cci(df['High'], df['Low'], df['Close'])
features_dict['cci'] = values
np.save(feature_file, values.values)
else:
_, values = calc_cci(df['High'], df['Low'], df['Close'])
features_dict['cci'] = values
# Williams %R
if csv_prefix:
feature_file = f'../data/{csv_prefix}_williams_r.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['williams_r'] = pd.Series(arr, index=df.index)
else:
_, values = calc_williamsr(df['High'], df['Low'], df['Close'])
features_dict['williams_r'] = values
np.save(feature_file, values.values)
else:
_, values = calc_williamsr(df['High'], df['Low'], df['Close'])
features_dict['williams_r'] = values
# EMA 14
if csv_prefix:
feature_file = f'../data/{csv_prefix}_ema_14.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['ema_14'] = pd.Series(arr, index=df.index)
else:
_, values = calc_ema(df['Close'])
features_dict['ema_14'] = values
np.save(feature_file, values.values)
else:
_, values = calc_ema(df['Close'])
features_dict['ema_14'] = values
# OBV
if csv_prefix:
feature_file = f'../data/{csv_prefix}_obv.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['obv'] = pd.Series(arr, index=df.index)
else:
_, values = calc_obv(df['Close'], df['Volume'])
features_dict['obv'] = values
np.save(feature_file, values.values)
else:
_, values = calc_obv(df['Close'], df['Volume'])
features_dict['obv'] = values
# CMF
if csv_prefix:
feature_file = f'../data/{csv_prefix}_cmf.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['cmf'] = pd.Series(arr, index=df.index)
else:
_, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume'])
features_dict['cmf'] = values
np.save(feature_file, values.values)
else:
_, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume'])
features_dict['cmf'] = values
# ROC 10
if csv_prefix:
feature_file = f'../data/{csv_prefix}_roc_10.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['roc_10'] = pd.Series(arr, index=df.index)
else:
_, values = calc_roc(df['Close'])
features_dict['roc_10'] = values
np.save(feature_file, values.values)
else:
_, values = calc_roc(df['Close'])
features_dict['roc_10'] = values
# DPO 20
if csv_prefix:
feature_file = f'../data/{csv_prefix}_dpo_20.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['dpo_20'] = pd.Series(arr, index=df.index)
else:
_, values = calc_dpo(df['Close'])
features_dict['dpo_20'] = values
np.save(feature_file, values.values)
else:
_, values = calc_dpo(df['Close'])
features_dict['dpo_20'] = values
# Ultimate Oscillator
if csv_prefix:
feature_file = f'../data/{csv_prefix}_ultimate_osc.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['ultimate_osc'] = pd.Series(arr, index=df.index)
else:
_, values = calc_ultimate(df['High'], df['Low'], df['Close'])
features_dict['ultimate_osc'] = values
np.save(feature_file, values.values)
else:
_, values = calc_ultimate(df['High'], df['Low'], df['Close'])
features_dict['ultimate_osc'] = values
# Daily Return
if csv_prefix:
feature_file = f'../data/{csv_prefix}_daily_return.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['daily_return'] = pd.Series(arr, index=df.index)
else:
_, values = calc_daily_return(df['Close'])
features_dict['daily_return'] = values
np.save(feature_file, values.values)
else:
_, values = calc_daily_return(df['Close'])
features_dict['daily_return'] = values
# Multi-column indicators
# Bollinger Bands
result = calc_bollinger(df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Stochastic Oscillator
result = calc_stochastic(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# SMA
result = calc_sma(df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# PSAR
result = calc_psar(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Donchian Channel
result = calc_donchian(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Keltner Channel
result = calc_keltner(df['High'], df['Low'], df['Close'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Ichimoku
result = calc_ichimoku(df['High'], df['Low'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Elder Ray
result = calc_elder_ray(df['Close'], df['Low'], df['High'])
for subname, values in result:
if csv_prefix:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
arr = np.load(sub_feature_file)
features_dict[subname] = pd.Series(arr, index=df.index)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
else:
features_dict[subname] = values
# Prepare lags, rolling stats, log returns, and volatility features sequentially
# Lags
for col in ohlcv_cols:
for lag in range(1, lags + 1):
feature_name = f'{col}_lag{lag}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_lag(df, col, lag)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_lag(df, col, lag)
features_dict[feature_name] = result
# Rolling statistics
for col in ohlcv_cols:
for window in window_sizes:
if (col == 'Open' and window == 5):
continue
if (col == 'High' and window == 5):
continue
if (col == 'High' and window == 30):
continue
if (col == 'Low' and window == 15):
continue
for stat in ['mean', 'std', 'min', 'max']:
feature_name = f'{col}_roll_{stat}_{window}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_rolling(df, col, stat, window)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_rolling(df, col, stat, window)
features_dict[feature_name] = result
# Log returns for different horizons
for horizon in [5, 15, 30]:
feature_name = f'log_return_{horizon}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_log_return(df, horizon)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_log_return(df, horizon)
features_dict[feature_name] = result
# Volatility
for window in window_sizes:
feature_name = f'volatility_{window}'
feature_file = f'../data/{csv_prefix}_{feature_name}.npy'
if csv_prefix:
if os.path.exists(feature_file):
features_dict[feature_name] = np.load(feature_file)
else:
result = compute_volatility(df, window)
features_dict[feature_name] = result
np.save(feature_file, result.values)
else:
result = compute_volatility(df, window)
features_dict[feature_name] = result
# --- Additional Technical Indicator Features ---
# ADX
adx_names = ['adx', 'adx_pos', 'adx_neg']
adx_files = [f'../data/{csv_prefix}_{name}.npy' for name in adx_names]
if csv_prefix and all(os.path.exists(f) for f in adx_files):
for name, f in zip(adx_names, adx_files):
arr = np.load(f)
features_dict[name] = pd.Series(arr, index=df.index)
else:
result = calc_adx(df['High'], df['Low'], df['Close'])
for subname, values in result:
sub_feature_file = f'../data/{csv_prefix}_{subname}.npy'
features_dict[subname] = values
if csv_prefix:
np.save(sub_feature_file, values.values)
# Force Index
feature_file = f'../data/{csv_prefix}_force_index.npy'
if csv_prefix:
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict['force_index'] = pd.Series(arr, index=df.index)
else:
_, values = calc_force_index(df['Close'], df['Volume'])
features_dict['force_index'] = values
np.save(feature_file, values.values)
else:
_, values = calc_force_index(df['Close'], df['Volume'])
features_dict['force_index'] = values
# Supertrend indicators (simplified implementation)
for period, multiplier in [(12, 3.0), (10, 1.0), (11, 2.0)]:
st_name = f'supertrend_{period}_{multiplier}'
st_trend_name = f'supertrend_trend_{period}_{multiplier}'
st_file = f'../data/{csv_prefix}_{st_name}.npy'
st_trend_file = f'../data/{csv_prefix}_{st_trend_name}.npy'
if csv_prefix and os.path.exists(st_file) and os.path.exists(st_trend_file):
features_dict[st_name] = pd.Series(np.load(st_file), index=df.index)
features_dict[st_trend_name] = pd.Series(np.load(st_trend_file), index=df.index)
else:
# Simple supertrend alternative using ATR and moving averages
from ta.volatility import AverageTrueRange
atr = AverageTrueRange(df['High'], df['Low'], df['Close'], window=period).average_true_range()
hl_avg = (df['High'] + df['Low']) / 2
basic_ub = hl_avg + (multiplier * atr)
basic_lb = hl_avg - (multiplier * atr)
# Simplified supertrend calculation
supertrend = hl_avg.copy()
trend = pd.Series(1, index=df.index) # 1 for uptrend, -1 for downtrend
features_dict[st_name] = supertrend
features_dict[st_trend_name] = trend
if csv_prefix:
np.save(st_file, features_dict[st_name].values)
np.save(st_trend_file, features_dict[st_trend_name].values)
# --- OHLCV-only additional features ---
# Helper for caching single-series features using the same pattern as above
def _save_or_load_feature(name, series):
if csv_prefix:
feature_file = f'../data/{csv_prefix}_{name}.npy'
if os.path.exists(feature_file):
arr = np.load(feature_file)
features_dict[name] = pd.Series(arr, index=df.index)
else:
# Ensure pandas Series with correct index
series = pd.Series(series, index=df.index)
features_dict[name] = series
np.save(feature_file, series.values)
else:
series = pd.Series(series, index=df.index)
features_dict[name] = series
eps = 1e-9
# Candle shape/position
body = (df['Close'] - df['Open']).abs()
rng = (df['High'] - df['Low'])
upper_wick = df['High'] - df[['Open', 'Close']].max(axis=1)
lower_wick = df[['Open', 'Close']].min(axis=1) - df['Low']
_save_or_load_feature('candle_body', body)
_save_or_load_feature('candle_upper_wick', upper_wick)
_save_or_load_feature('candle_lower_wick', lower_wick)
_save_or_load_feature('candle_body_to_range', body / (rng + eps))
_save_or_load_feature('candle_upper_wick_to_range', upper_wick / (rng + eps))
_save_or_load_feature('candle_lower_wick_to_range', lower_wick / (rng + eps))
_save_or_load_feature('close_pos_in_bar', (df['Close'] - df['Low']) / (rng + eps))
for w in window_sizes:
roll_max = df['High'].rolling(w).max()
roll_min = df['Low'].rolling(w).min()
close_pos_roll = (df['Close'] - roll_min) / ((roll_max - roll_min) + eps)
_save_or_load_feature(f'close_pos_in_roll_{w}', close_pos_roll)
# Range-based volatility (Parkinson, GarmanKlass, RogersSatchell, YangZhang)
log_hl = np.log((df['High'] / df['Low']).replace(0, np.nan))
log_co = np.log((df['Close'] / df['Open']).replace(0, np.nan))
log_close = np.log(df['Close'].replace(0, np.nan))
ret1 = log_close.diff()
for w in window_sizes:
# Parkinson
parkinson_var = (log_hl.pow(2)).rolling(w).mean() / (4.0 * np.log(2.0))
_save_or_load_feature(f'park_vol_{w}', np.sqrt(parkinson_var.clip(lower=0)))
# GarmanKlass
gk_var = 0.5 * (log_hl.pow(2)).rolling(w).mean() - (2.0 * np.log(2.0) - 1.0) * (log_co.pow(2)).rolling(w).mean()
_save_or_load_feature(f'gk_vol_{w}', np.sqrt(gk_var.clip(lower=0)))
# RogersSatchell
u = np.log((df['High'] / df['Close']).replace(0, np.nan))
d = np.log((df['Low'] / df['Close']).replace(0, np.nan))
uo = np.log((df['High'] / df['Open']).replace(0, np.nan))
do = np.log((df['Low'] / df['Open']).replace(0, np.nan))
rs_term = u * uo + d * do
rs_var = rs_term.rolling(w).mean()
_save_or_load_feature(f'rs_vol_{w}', np.sqrt(rs_var.clip(lower=0)))
# YangZhang
g = np.log((df['Open'] / df['Close'].shift(1)).replace(0, np.nan))
u_yz = np.log((df['High'] / df['Open']).replace(0, np.nan))
d_yz = np.log((df['Low'] / df['Open']).replace(0, np.nan))
c_yz = np.log((df['Close'] / df['Open']).replace(0, np.nan))
sigma_g2 = g.rolling(w).var()
sigma_c2 = c_yz.rolling(w).var()
sigma_rs = (u_yz * (u_yz - c_yz) + d_yz * (d_yz - c_yz)).rolling(w).mean()
k = 0.34 / (1.34 + (w + 1.0) / max(w - 1.0, 1.0))
yz_var = sigma_g2 + k * sigma_c2 + (1.0 - k) * sigma_rs
_save_or_load_feature(f'yz_vol_{w}', np.sqrt(yz_var.clip(lower=0)))
# Trend strength: rolling linear-regression slope and R² of log price
def _linreg_slope(arr):
y = np.asarray(arr, dtype=float)
n = y.size
x = np.arange(n, dtype=float)
xmean = (n - 1.0) / 2.0
ymean = np.nanmean(y)
xm = x - xmean
ym = y - ymean
cov = np.nansum(xm * ym)
varx = np.nansum(xm * xm) + eps
return cov / varx
def _linreg_r2(arr):
y = np.asarray(arr, dtype=float)
n = y.size
x = np.arange(n, dtype=float)
xmean = (n - 1.0) / 2.0
ymean = np.nanmean(y)
slope = _linreg_slope(arr)
intercept = ymean - slope * xmean
yhat = slope * x + intercept
ss_tot = np.nansum((y - ymean) ** 2)
ss_res = np.nansum((y - yhat) ** 2)
return 1.0 - ss_res / (ss_tot + eps)
for w in window_sizes:
_save_or_load_feature(f'lr_slope_log_close_{w}', log_close.rolling(w).apply(_linreg_slope, raw=True))
_save_or_load_feature(f'lr_r2_log_close_{w}', log_close.rolling(w).apply(_linreg_r2, raw=True))
# EMA(7), EMA(21), their slopes and spread
ema_7 = df['Close'].ewm(span=7, adjust=False).mean()
ema_21 = df['Close'].ewm(span=21, adjust=False).mean()
_save_or_load_feature('ema_7', ema_7)
_save_or_load_feature('ema_21', ema_21)
_save_or_load_feature('ema_7_slope', ema_7.pct_change())
_save_or_load_feature('ema_21_slope', ema_21.pct_change())
_save_or_load_feature('ema_7_21_spread', ema_7 - ema_21)
# VWAP over windows and distance of Close from VWAP
tp = (df['High'] + df['Low'] + df['Close']) / 3.0
for w in window_sizes:
vwap_w = (tp * df['Volume']).rolling(w).sum() / (df['Volume'].rolling(w).sum() + eps)
_save_or_load_feature(f'vwap_{w}', vwap_w)
_save_or_load_feature(f'vwap_dist_{w}', (df['Close'] - vwap_w) / (vwap_w + eps))
# Autocorrelation of log returns at lags 15 (rolling window 30)
for lag in range(1, 6):
ac = ret1.rolling(30).corr(ret1.shift(lag))
_save_or_load_feature(f'ret_autocorr_lag{lag}_30', ac)
# Rolling skewness and kurtosis of returns (15, 30)
for w in [15, 30]:
_save_or_load_feature(f'ret_skew_{w}', ret1.rolling(w).skew())
_save_or_load_feature(f'ret_kurt_{w}', ret1.rolling(w).kurt())
# Volume z-score and return-volume rolling correlation (15, 30)
for w in [15, 30]:
vol_mean = df['Volume'].rolling(w).mean()
vol_std = df['Volume'].rolling(w).std()
_save_or_load_feature(f'volume_zscore_{w}', (df['Volume'] - vol_mean) / (vol_std + eps))
_save_or_load_feature(f'ret_vol_corr_{w}', ret1.rolling(w).corr(df['Volume']))
# Cyclical time features and relative volume vs hour-of-day average
try:
hours = pd.to_datetime(df['Timestamp']).dt.hour
except Exception:
try:
hours = pd.to_datetime(df['Timestamp'], unit='s', errors='coerce').dt.hour
except Exception:
hours = pd.Series(np.nan, index=df.index)
_save_or_load_feature('sin_hour', np.sin(2.0 * np.pi * (hours.fillna(0)) / 24.0))
_save_or_load_feature('cos_hour', np.cos(2.0 * np.pi * (hours.fillna(0)) / 24.0))
hourly_mean_vol = df['Volume'].groupby(hours).transform('mean')
_save_or_load_feature('relative_volume_hour', df['Volume'] / (hourly_mean_vol + eps))
return features_dict