import os import numpy as np import pandas as pd import ta try: from .technical_indicator_functions import * except ImportError: from technical_indicator_functions import * def feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes): """ Compute and/or load features for the given DataFrame. If csv_prefix is provided, features are cached to disk; otherwise, features are only computed in memory. Args: df (pd.DataFrame): Input OHLCV data. csv_prefix (str or None): Prefix for feature files (for caching). If None or '', disables caching. ohlcv_cols (list): List of OHLCV column names. lags (int): Number of lag features. window_sizes (list): List of window sizes for rolling features. Returns: dict: Dictionary of computed features. """ features_dict = {} # RSI if csv_prefix: feature_file = f'../data/{csv_prefix}_rsi.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['rsi'] = pd.Series(arr, index=df.index) else: _, values = calc_rsi(df['Close']) features_dict['rsi'] = values np.save(feature_file, values.values) else: _, values = calc_rsi(df['Close']) features_dict['rsi'] = values # MACD if csv_prefix: feature_file = f'../data/{csv_prefix}_macd.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['macd'] = pd.Series(arr, index=df.index) else: _, values = calc_macd(df['Close']) features_dict['macd'] = values np.save(feature_file, values.values) else: _, values = calc_macd(df['Close']) features_dict['macd'] = values # ATR if csv_prefix: feature_file = f'../data/{csv_prefix}_atr.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['atr'] = pd.Series(arr, index=df.index) else: _, values = calc_atr(df['High'], df['Low'], df['Close']) features_dict['atr'] = values np.save(feature_file, values.values) else: _, values = calc_atr(df['High'], df['Low'], df['Close']) features_dict['atr'] = values # CCI if csv_prefix: feature_file = f'../data/{csv_prefix}_cci.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['cci'] = pd.Series(arr, index=df.index) else: _, values = calc_cci(df['High'], df['Low'], df['Close']) features_dict['cci'] = values np.save(feature_file, values.values) else: _, values = calc_cci(df['High'], df['Low'], df['Close']) features_dict['cci'] = values # Williams %R if csv_prefix: feature_file = f'../data/{csv_prefix}_williams_r.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['williams_r'] = pd.Series(arr, index=df.index) else: _, values = calc_williamsr(df['High'], df['Low'], df['Close']) features_dict['williams_r'] = values np.save(feature_file, values.values) else: _, values = calc_williamsr(df['High'], df['Low'], df['Close']) features_dict['williams_r'] = values # EMA 14 if csv_prefix: feature_file = f'../data/{csv_prefix}_ema_14.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['ema_14'] = pd.Series(arr, index=df.index) else: _, values = calc_ema(df['Close']) features_dict['ema_14'] = values np.save(feature_file, values.values) else: _, values = calc_ema(df['Close']) features_dict['ema_14'] = values # OBV if csv_prefix: feature_file = f'../data/{csv_prefix}_obv.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['obv'] = pd.Series(arr, index=df.index) else: _, values = calc_obv(df['Close'], df['Volume']) features_dict['obv'] = values np.save(feature_file, values.values) else: _, values = calc_obv(df['Close'], df['Volume']) features_dict['obv'] = values # CMF if csv_prefix: feature_file = f'../data/{csv_prefix}_cmf.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['cmf'] = pd.Series(arr, index=df.index) else: _, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume']) features_dict['cmf'] = values np.save(feature_file, values.values) else: _, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume']) features_dict['cmf'] = values # ROC 10 if csv_prefix: feature_file = f'../data/{csv_prefix}_roc_10.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['roc_10'] = pd.Series(arr, index=df.index) else: _, values = calc_roc(df['Close']) features_dict['roc_10'] = values np.save(feature_file, values.values) else: _, values = calc_roc(df['Close']) features_dict['roc_10'] = values # DPO 20 if csv_prefix: feature_file = f'../data/{csv_prefix}_dpo_20.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['dpo_20'] = pd.Series(arr, index=df.index) else: _, values = calc_dpo(df['Close']) features_dict['dpo_20'] = values np.save(feature_file, values.values) else: _, values = calc_dpo(df['Close']) features_dict['dpo_20'] = values # Ultimate Oscillator if csv_prefix: feature_file = f'../data/{csv_prefix}_ultimate_osc.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['ultimate_osc'] = pd.Series(arr, index=df.index) else: _, values = calc_ultimate(df['High'], df['Low'], df['Close']) features_dict['ultimate_osc'] = values np.save(feature_file, values.values) else: _, values = calc_ultimate(df['High'], df['Low'], df['Close']) features_dict['ultimate_osc'] = values # Daily Return if csv_prefix: feature_file = f'../data/{csv_prefix}_daily_return.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['daily_return'] = pd.Series(arr, index=df.index) else: _, values = calc_daily_return(df['Close']) features_dict['daily_return'] = values np.save(feature_file, values.values) else: _, values = calc_daily_return(df['Close']) features_dict['daily_return'] = values # Multi-column indicators # Bollinger Bands result = calc_bollinger(df['Close']) for subname, values in result: if csv_prefix: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) else: features_dict[subname] = values # Stochastic Oscillator result = calc_stochastic(df['High'], df['Low'], df['Close']) for subname, values in result: if csv_prefix: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) else: features_dict[subname] = values # SMA result = calc_sma(df['Close']) for subname, values in result: if csv_prefix: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) else: features_dict[subname] = values # PSAR result = calc_psar(df['High'], df['Low'], df['Close']) for subname, values in result: if csv_prefix: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) else: features_dict[subname] = values # Donchian Channel result = calc_donchian(df['High'], df['Low'], df['Close']) for subname, values in result: if csv_prefix: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) else: features_dict[subname] = values # Keltner Channel result = calc_keltner(df['High'], df['Low'], df['Close']) for subname, values in result: if csv_prefix: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) else: features_dict[subname] = values # Ichimoku result = calc_ichimoku(df['High'], df['Low']) for subname, values in result: if csv_prefix: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) else: features_dict[subname] = values # Elder Ray result = calc_elder_ray(df['Close'], df['Low'], df['High']) for subname, values in result: if csv_prefix: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) else: features_dict[subname] = values # Prepare lags, rolling stats, log returns, and volatility features sequentially # Lags for col in ohlcv_cols: for lag in range(1, lags + 1): feature_name = f'{col}_lag{lag}' feature_file = f'../data/{csv_prefix}_{feature_name}.npy' if csv_prefix: if os.path.exists(feature_file): features_dict[feature_name] = np.load(feature_file) else: result = compute_lag(df, col, lag) features_dict[feature_name] = result np.save(feature_file, result.values) else: result = compute_lag(df, col, lag) features_dict[feature_name] = result # Rolling statistics for col in ohlcv_cols: for window in window_sizes: if (col == 'Open' and window == 5): continue if (col == 'High' and window == 5): continue if (col == 'High' and window == 30): continue if (col == 'Low' and window == 15): continue for stat in ['mean', 'std', 'min', 'max']: feature_name = f'{col}_roll_{stat}_{window}' feature_file = f'../data/{csv_prefix}_{feature_name}.npy' if csv_prefix: if os.path.exists(feature_file): features_dict[feature_name] = np.load(feature_file) else: result = compute_rolling(df, col, stat, window) features_dict[feature_name] = result np.save(feature_file, result.values) else: result = compute_rolling(df, col, stat, window) features_dict[feature_name] = result # Log returns for different horizons for horizon in [5, 15, 30]: feature_name = f'log_return_{horizon}' feature_file = f'../data/{csv_prefix}_{feature_name}.npy' if csv_prefix: if os.path.exists(feature_file): features_dict[feature_name] = np.load(feature_file) else: result = compute_log_return(df, horizon) features_dict[feature_name] = result np.save(feature_file, result.values) else: result = compute_log_return(df, horizon) features_dict[feature_name] = result # Volatility for window in window_sizes: feature_name = f'volatility_{window}' feature_file = f'../data/{csv_prefix}_{feature_name}.npy' if csv_prefix: if os.path.exists(feature_file): features_dict[feature_name] = np.load(feature_file) else: result = compute_volatility(df, window) features_dict[feature_name] = result np.save(feature_file, result.values) else: result = compute_volatility(df, window) features_dict[feature_name] = result # --- Additional Technical Indicator Features --- # ADX adx_names = ['adx', 'adx_pos', 'adx_neg'] adx_files = [f'../data/{csv_prefix}_{name}.npy' for name in adx_names] if csv_prefix and all(os.path.exists(f) for f in adx_files): for name, f in zip(adx_names, adx_files): arr = np.load(f) features_dict[name] = pd.Series(arr, index=df.index) else: result = calc_adx(df['High'], df['Low'], df['Close']) for subname, values in result: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' features_dict[subname] = values if csv_prefix: np.save(sub_feature_file, values.values) # Force Index feature_file = f'../data/{csv_prefix}_force_index.npy' if csv_prefix: if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['force_index'] = pd.Series(arr, index=df.index) else: _, values = calc_force_index(df['Close'], df['Volume']) features_dict['force_index'] = values np.save(feature_file, values.values) else: _, values = calc_force_index(df['Close'], df['Volume']) features_dict['force_index'] = values # Supertrend indicators (simplified implementation) for period, multiplier in [(12, 3.0), (10, 1.0), (11, 2.0)]: st_name = f'supertrend_{period}_{multiplier}' st_trend_name = f'supertrend_trend_{period}_{multiplier}' st_file = f'../data/{csv_prefix}_{st_name}.npy' st_trend_file = f'../data/{csv_prefix}_{st_trend_name}.npy' if csv_prefix and os.path.exists(st_file) and os.path.exists(st_trend_file): features_dict[st_name] = pd.Series(np.load(st_file), index=df.index) features_dict[st_trend_name] = pd.Series(np.load(st_trend_file), index=df.index) else: # Simple supertrend alternative using ATR and moving averages from ta.volatility import AverageTrueRange atr = AverageTrueRange(df['High'], df['Low'], df['Close'], window=period).average_true_range() hl_avg = (df['High'] + df['Low']) / 2 basic_ub = hl_avg + (multiplier * atr) basic_lb = hl_avg - (multiplier * atr) # Simplified supertrend calculation supertrend = hl_avg.copy() trend = pd.Series(1, index=df.index) # 1 for uptrend, -1 for downtrend features_dict[st_name] = supertrend features_dict[st_trend_name] = trend if csv_prefix: np.save(st_file, features_dict[st_name].values) np.save(st_trend_file, features_dict[st_trend_name].values) # --- OHLCV-only additional features --- # Helper for caching single-series features using the same pattern as above def _save_or_load_feature(name, series): if csv_prefix: feature_file = f'../data/{csv_prefix}_{name}.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict[name] = pd.Series(arr, index=df.index) else: # Ensure pandas Series with correct index series = pd.Series(series, index=df.index) features_dict[name] = series np.save(feature_file, series.values) else: series = pd.Series(series, index=df.index) features_dict[name] = series eps = 1e-9 # Candle shape/position body = (df['Close'] - df['Open']).abs() rng = (df['High'] - df['Low']) upper_wick = df['High'] - df[['Open', 'Close']].max(axis=1) lower_wick = df[['Open', 'Close']].min(axis=1) - df['Low'] _save_or_load_feature('candle_body', body) _save_or_load_feature('candle_upper_wick', upper_wick) _save_or_load_feature('candle_lower_wick', lower_wick) _save_or_load_feature('candle_body_to_range', body / (rng + eps)) _save_or_load_feature('candle_upper_wick_to_range', upper_wick / (rng + eps)) _save_or_load_feature('candle_lower_wick_to_range', lower_wick / (rng + eps)) _save_or_load_feature('close_pos_in_bar', (df['Close'] - df['Low']) / (rng + eps)) for w in window_sizes: roll_max = df['High'].rolling(w).max() roll_min = df['Low'].rolling(w).min() close_pos_roll = (df['Close'] - roll_min) / ((roll_max - roll_min) + eps) _save_or_load_feature(f'close_pos_in_roll_{w}', close_pos_roll) # Range-based volatility (Parkinson, Garman–Klass, Rogers–Satchell, Yang–Zhang) log_hl = np.log((df['High'] / df['Low']).replace(0, np.nan)) log_co = np.log((df['Close'] / df['Open']).replace(0, np.nan)) log_close = np.log(df['Close'].replace(0, np.nan)) ret1 = log_close.diff() for w in window_sizes: # Parkinson parkinson_var = (log_hl.pow(2)).rolling(w).mean() / (4.0 * np.log(2.0)) _save_or_load_feature(f'park_vol_{w}', np.sqrt(parkinson_var.clip(lower=0))) # Garman–Klass gk_var = 0.5 * (log_hl.pow(2)).rolling(w).mean() - (2.0 * np.log(2.0) - 1.0) * (log_co.pow(2)).rolling(w).mean() _save_or_load_feature(f'gk_vol_{w}', np.sqrt(gk_var.clip(lower=0))) # Rogers–Satchell u = np.log((df['High'] / df['Close']).replace(0, np.nan)) d = np.log((df['Low'] / df['Close']).replace(0, np.nan)) uo = np.log((df['High'] / df['Open']).replace(0, np.nan)) do = np.log((df['Low'] / df['Open']).replace(0, np.nan)) rs_term = u * uo + d * do rs_var = rs_term.rolling(w).mean() _save_or_load_feature(f'rs_vol_{w}', np.sqrt(rs_var.clip(lower=0))) # Yang–Zhang g = np.log((df['Open'] / df['Close'].shift(1)).replace(0, np.nan)) u_yz = np.log((df['High'] / df['Open']).replace(0, np.nan)) d_yz = np.log((df['Low'] / df['Open']).replace(0, np.nan)) c_yz = np.log((df['Close'] / df['Open']).replace(0, np.nan)) sigma_g2 = g.rolling(w).var() sigma_c2 = c_yz.rolling(w).var() sigma_rs = (u_yz * (u_yz - c_yz) + d_yz * (d_yz - c_yz)).rolling(w).mean() k = 0.34 / (1.34 + (w + 1.0) / max(w - 1.0, 1.0)) yz_var = sigma_g2 + k * sigma_c2 + (1.0 - k) * sigma_rs _save_or_load_feature(f'yz_vol_{w}', np.sqrt(yz_var.clip(lower=0))) # Trend strength: rolling linear-regression slope and R² of log price def _linreg_slope(arr): y = np.asarray(arr, dtype=float) n = y.size x = np.arange(n, dtype=float) xmean = (n - 1.0) / 2.0 ymean = np.nanmean(y) xm = x - xmean ym = y - ymean cov = np.nansum(xm * ym) varx = np.nansum(xm * xm) + eps return cov / varx def _linreg_r2(arr): y = np.asarray(arr, dtype=float) n = y.size x = np.arange(n, dtype=float) xmean = (n - 1.0) / 2.0 ymean = np.nanmean(y) slope = _linreg_slope(arr) intercept = ymean - slope * xmean yhat = slope * x + intercept ss_tot = np.nansum((y - ymean) ** 2) ss_res = np.nansum((y - yhat) ** 2) return 1.0 - ss_res / (ss_tot + eps) for w in window_sizes: _save_or_load_feature(f'lr_slope_log_close_{w}', log_close.rolling(w).apply(_linreg_slope, raw=True)) _save_or_load_feature(f'lr_r2_log_close_{w}', log_close.rolling(w).apply(_linreg_r2, raw=True)) # EMA(7), EMA(21), their slopes and spread ema_7 = df['Close'].ewm(span=7, adjust=False).mean() ema_21 = df['Close'].ewm(span=21, adjust=False).mean() _save_or_load_feature('ema_7', ema_7) _save_or_load_feature('ema_21', ema_21) _save_or_load_feature('ema_7_slope', ema_7.pct_change()) _save_or_load_feature('ema_21_slope', ema_21.pct_change()) _save_or_load_feature('ema_7_21_spread', ema_7 - ema_21) # VWAP over windows and distance of Close from VWAP tp = (df['High'] + df['Low'] + df['Close']) / 3.0 for w in window_sizes: vwap_w = (tp * df['Volume']).rolling(w).sum() / (df['Volume'].rolling(w).sum() + eps) _save_or_load_feature(f'vwap_{w}', vwap_w) _save_or_load_feature(f'vwap_dist_{w}', (df['Close'] - vwap_w) / (vwap_w + eps)) # Autocorrelation of log returns at lags 1–5 (rolling window 30) for lag in range(1, 6): ac = ret1.rolling(30).corr(ret1.shift(lag)) _save_or_load_feature(f'ret_autocorr_lag{lag}_30', ac) # Rolling skewness and kurtosis of returns (15, 30) for w in [15, 30]: _save_or_load_feature(f'ret_skew_{w}', ret1.rolling(w).skew()) _save_or_load_feature(f'ret_kurt_{w}', ret1.rolling(w).kurt()) # Volume z-score and return-volume rolling correlation (15, 30) for w in [15, 30]: vol_mean = df['Volume'].rolling(w).mean() vol_std = df['Volume'].rolling(w).std() _save_or_load_feature(f'volume_zscore_{w}', (df['Volume'] - vol_mean) / (vol_std + eps)) _save_or_load_feature(f'ret_vol_corr_{w}', ret1.rolling(w).corr(df['Volume'])) # Cyclical time features and relative volume vs hour-of-day average try: hours = pd.to_datetime(df['Timestamp']).dt.hour except Exception: try: hours = pd.to_datetime(df['Timestamp'], unit='s', errors='coerce').dt.hour except Exception: hours = pd.Series(np.nan, index=df.index) _save_or_load_feature('sin_hour', np.sin(2.0 * np.pi * (hours.fillna(0)) / 24.0)) _save_or_load_feature('cos_hour', np.cos(2.0 * np.pi * (hours.fillna(0)) / 24.0)) hourly_mean_vol = df['Volume'].groupby(hours).transform('mean') _save_or_load_feature('relative_volume_hour', df['Volume'] / (hourly_mean_vol + eps)) return features_dict