import os import numpy as np import pandas as pd import ta from technical_indicator_functions import * def feature_engineering(df, csv_prefix, ohlcv_cols, lags, window_sizes): feature_file = f'../data/{csv_prefix}_rsi.npy' features_dict = {} if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['rsi'] = pd.Series(arr, index=df.index) else: print('Calculating feature: rsi') _, values = calc_rsi(df['Close']) features_dict['rsi'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # MACD feature_file = f'../data/{csv_prefix}_macd.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['macd'] = pd.Series(arr, index=df.index) else: print('Calculating feature: macd') _, values = calc_macd(df['Close']) features_dict['macd'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # ATR feature_file = f'../data/{csv_prefix}_atr.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['atr'] = pd.Series(arr, index=df.index) else: print('Calculating feature: atr') _, values = calc_atr(df['High'], df['Low'], df['Close']) features_dict['atr'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # CCI feature_file = f'../data/{csv_prefix}_cci.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['cci'] = pd.Series(arr, index=df.index) else: print('Calculating feature: cci') _, values = calc_cci(df['High'], df['Low'], df['Close']) features_dict['cci'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # Williams %R feature_file = f'../data/{csv_prefix}_williams_r.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['williams_r'] = pd.Series(arr, index=df.index) else: print('Calculating feature: williams_r') _, values = calc_williamsr(df['High'], df['Low'], df['Close']) features_dict['williams_r'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # EMA 14 feature_file = f'../data/{csv_prefix}_ema_14.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['ema_14'] = pd.Series(arr, index=df.index) else: print('Calculating feature: ema_14') _, values = calc_ema(df['Close']) features_dict['ema_14'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # OBV feature_file = f'../data/{csv_prefix}_obv.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['obv'] = pd.Series(arr, index=df.index) else: print('Calculating feature: obv') _, values = calc_obv(df['Close'], df['Volume']) features_dict['obv'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # CMF feature_file = f'../data/{csv_prefix}_cmf.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['cmf'] = pd.Series(arr, index=df.index) else: print('Calculating feature: cmf') _, values = calc_cmf(df['High'], df['Low'], df['Close'], df['Volume']) features_dict['cmf'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # ROC 10 feature_file = f'../data/{csv_prefix}_roc_10.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['roc_10'] = pd.Series(arr, index=df.index) else: print('Calculating feature: roc_10') _, values = calc_roc(df['Close']) features_dict['roc_10'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # DPO 20 feature_file = f'../data/{csv_prefix}_dpo_20.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['dpo_20'] = pd.Series(arr, index=df.index) else: print('Calculating feature: dpo_20') _, values = calc_dpo(df['Close']) features_dict['dpo_20'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # Ultimate Oscillator feature_file = f'../data/{csv_prefix}_ultimate_osc.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['ultimate_osc'] = pd.Series(arr, index=df.index) else: print('Calculating feature: ultimate_osc') _, values = calc_ultimate(df['High'], df['Low'], df['Close']) features_dict['ultimate_osc'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # Daily Return feature_file = f'../data/{csv_prefix}_daily_return.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['daily_return'] = pd.Series(arr, index=df.index) else: print('Calculating feature: daily_return') _, values = calc_daily_return(df['Close']) features_dict['daily_return'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # Multi-column indicators # Bollinger Bands result = calc_bollinger(df['Close']) for subname, values in result: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) print(f'Saved feature: {sub_feature_file}') # Stochastic Oscillator result = calc_stochastic(df['High'], df['Low'], df['Close']) for subname, values in result: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) print(f'Saved feature: {sub_feature_file}') # SMA result = calc_sma(df['Close']) for subname, values in result: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) print(f'Saved feature: {sub_feature_file}') # PSAR result = calc_psar(df['High'], df['Low'], df['Close']) for subname, values in result: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) print(f'Saved feature: {sub_feature_file}') # Donchian Channel result = calc_donchian(df['High'], df['Low'], df['Close']) for subname, values in result: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) print(f'Saved feature: {sub_feature_file}') # Keltner Channel result = calc_keltner(df['High'], df['Low'], df['Close']) for subname, values in result: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) print(f'Saved feature: {sub_feature_file}') # Ichimoku result = calc_ichimoku(df['High'], df['Low']) for subname, values in result: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) print(f'Saved feature: {sub_feature_file}') # Elder Ray result = calc_elder_ray(df['Close'], df['Low'], df['High']) for subname, values in result: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' if os.path.exists(sub_feature_file): arr = np.load(sub_feature_file) features_dict[subname] = pd.Series(arr, index=df.index) else: features_dict[subname] = values np.save(sub_feature_file, values.values) print(f'Saved feature: {sub_feature_file}') # Prepare lags, rolling stats, log returns, and volatility features sequentially # Lags for col in ohlcv_cols: for lag in range(1, lags + 1): feature_name = f'{col}_lag{lag}' feature_file = f'../data/{csv_prefix}_{feature_name}.npy' if os.path.exists(feature_file): features_dict[feature_name] = np.load(feature_file) else: print(f'Computing lag feature: {feature_name}') result = compute_lag(df, col, lag) features_dict[feature_name] = result np.save(feature_file, result.values) print(f'Saved feature: {feature_file}') # Rolling statistics for col in ohlcv_cols: for window in window_sizes: if (col == 'Open' and window == 5): continue if (col == 'High' and window == 5): continue if (col == 'High' and window == 30): continue if (col == 'Low' and window == 15): continue for stat in ['mean', 'std', 'min', 'max']: feature_name = f'{col}_roll_{stat}_{window}' feature_file = f'../data/{csv_prefix}_{feature_name}.npy' if os.path.exists(feature_file): features_dict[feature_name] = np.load(feature_file) else: print(f'Computing rolling stat feature: {feature_name}') result = compute_rolling(df, col, stat, window) features_dict[feature_name] = result np.save(feature_file, result.values) print(f'Saved feature: {feature_file}') # Log returns for different horizons for horizon in [5, 15, 30]: feature_name = f'log_return_{horizon}' feature_file = f'../data/{csv_prefix}_{feature_name}.npy' if os.path.exists(feature_file): features_dict[feature_name] = np.load(feature_file) else: print(f'Computing log return feature: {feature_name}') result = compute_log_return(df, horizon) features_dict[feature_name] = result np.save(feature_file, result.values) print(f'Saved feature: {feature_file}') # Volatility for window in window_sizes: feature_name = f'volatility_{window}' feature_file = f'../data/{csv_prefix}_{feature_name}.npy' if os.path.exists(feature_file): features_dict[feature_name] = np.load(feature_file) else: print(f'Computing volatility feature: {feature_name}') result = compute_volatility(df, window) features_dict[feature_name] = result np.save(feature_file, result.values) print(f'Saved feature: {feature_file}') # --- Additional Technical Indicator Features --- # ADX adx_names = ['adx', 'adx_pos', 'adx_neg'] adx_files = [f'../data/{csv_prefix}_{name}.npy' for name in adx_names] if all(os.path.exists(f) for f in adx_files): for name, f in zip(adx_names, adx_files): arr = np.load(f) features_dict[name] = pd.Series(arr, index=df.index) else: result = calc_adx(df['High'], df['Low'], df['Close']) for subname, values in result: sub_feature_file = f'../data/{csv_prefix}_{subname}.npy' features_dict[subname] = values np.save(sub_feature_file, values.values) print(f'Saved feature: {sub_feature_file}') # Force Index feature_file = f'../data/{csv_prefix}_force_index.npy' if os.path.exists(feature_file): arr = np.load(feature_file) features_dict['force_index'] = pd.Series(arr, index=df.index) else: print('Calculating feature: force_index') _, values = calc_force_index(df['Close'], df['Volume']) features_dict['force_index'] = values np.save(feature_file, values.values) print(f'Saved feature: {feature_file}') # Supertrend indicators (simplified implementation) for period, multiplier in [(12, 3.0), (10, 1.0), (11, 2.0)]: st_name = f'supertrend_{period}_{multiplier}' st_trend_name = f'supertrend_trend_{period}_{multiplier}' st_file = f'../data/{csv_prefix}_{st_name}.npy' st_trend_file = f'../data/{csv_prefix}_{st_trend_name}.npy' if os.path.exists(st_file) and os.path.exists(st_trend_file): features_dict[st_name] = pd.Series(np.load(st_file), index=df.index) features_dict[st_trend_name] = pd.Series(np.load(st_trend_file), index=df.index) else: print(f'Calculating Supertrend indicator: {st_name}') # Simple supertrend alternative using ATR and moving averages from ta.volatility import AverageTrueRange atr = AverageTrueRange(df['High'], df['Low'], df['Close'], window=period).average_true_range() hl_avg = (df['High'] + df['Low']) / 2 basic_ub = hl_avg + (multiplier * atr) basic_lb = hl_avg - (multiplier * atr) # Simplified supertrend calculation supertrend = hl_avg.copy() trend = pd.Series(1, index=df.index) # 1 for uptrend, -1 for downtrend features_dict[st_name] = supertrend features_dict[st_trend_name] = trend np.save(st_file, features_dict[st_name].values) np.save(st_trend_file, features_dict[st_trend_name].values) print(f'Saved features: {st_file}, {st_trend_file}') return features_dict