From 2dba88b6202af8667636847fcc213c053ec1ed15 Mon Sep 17 00:00:00 2001 From: Simon Moisy Date: Thu, 29 May 2025 12:45:45 +0800 Subject: [PATCH] Added mode indicators, still WIP --- .gitignore | 1 + xgboost/main.py | 456 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 330 insertions(+), 127 deletions(-) diff --git a/.gitignore b/.gitignore index 4e08be1..25f2fd8 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ __pycache__/ *.py[cod] *$py.class +/data/*.npy # C extensions *.so diff --git a/xgboost/main.py b/xgboost/main.py index 8d6f7d1..4110d96 100644 --- a/xgboost/main.py +++ b/xgboost/main.py @@ -10,12 +10,216 @@ from plot_results import display_actual_vs_predicted, plot_target_distribution, import ta from cycles.supertrend import Supertrends from ta.trend import SMAIndicator, DPOIndicator, IchimokuIndicator, PSARIndicator -from ta.momentum import ROCIndicator, KAMAIndicator, UltimateOscillatorIndicator, StochasticOscillator, WilliamsRIndicator +from ta.momentum import ROCIndicator, KAMAIndicator, UltimateOscillator, StochasticOscillator, WilliamsRIndicator from ta.volatility import KeltnerChannel, DonchianChannel from ta.others import DailyReturnIndicator +import time +import concurrent.futures +from numba import njit + +def run_indicator(func, *args): + return func(*args) + +def run_indicator_job(job): + import time + func, *args = job + indicator_name = func.__name__ + start = time.time() + result = func(*args) + elapsed = time.time() - start + print(f'Indicator {indicator_name} computed in {elapsed:.4f} seconds') + return result + +def calc_rsi(close): + from ta.momentum import RSIIndicator + return ('rsi', RSIIndicator(close, window=14).rsi()) + +def calc_macd(close): + from ta.trend import MACD + return ('macd', MACD(close).macd()) + +def calc_bollinger(close): + from ta.volatility import BollingerBands + bb = BollingerBands(close=close, window=20, window_dev=2) + return [ + ('bb_bbm', bb.bollinger_mavg()), + ('bb_bbh', bb.bollinger_hband()), + ('bb_bbl', bb.bollinger_lband()), + ('bb_bb_width', bb.bollinger_hband() - bb.bollinger_lband()) + ] + +def calc_stochastic(high, low, close): + from ta.momentum import StochasticOscillator + stoch = StochasticOscillator(high=high, low=low, close=close, window=14, smooth_window=3) + return [ + ('stoch_k', stoch.stoch()), + ('stoch_d', stoch.stoch_signal()) + ] + +def calc_atr(high, low, close): + from ta.volatility import AverageTrueRange + atr = AverageTrueRange(high=high, low=low, close=close, window=14) + return ('atr', atr.average_true_range()) + +def calc_cci(high, low, close): + from ta.trend import CCIIndicator + cci = CCIIndicator(high=high, low=low, close=close, window=20) + return ('cci', cci.cci()) + +def calc_williamsr(high, low, close): + from ta.momentum import WilliamsRIndicator + willr = WilliamsRIndicator(high=high, low=low, close=close, lbp=14) + return ('williams_r', willr.williams_r()) + +def calc_ema(close): + from ta.trend import EMAIndicator + ema = EMAIndicator(close=close, window=14) + return ('ema_14', ema.ema_indicator()) + +def calc_obv(close, volume): + from ta.volume import OnBalanceVolumeIndicator + obv = OnBalanceVolumeIndicator(close=close, volume=volume) + return ('obv', obv.on_balance_volume()) + +def calc_cmf(high, low, close, volume): + from ta.volume import ChaikinMoneyFlowIndicator + cmf = ChaikinMoneyFlowIndicator(high=high, low=low, close=close, volume=volume, window=20) + return ('cmf', cmf.chaikin_money_flow()) + +def calc_sma(close): + from ta.trend import SMAIndicator + return [ + ('sma_50', SMAIndicator(close, window=50).sma_indicator()), + ('sma_200', SMAIndicator(close, window=200).sma_indicator()) + ] + +def calc_roc(close): + from ta.momentum import ROCIndicator + return ('roc_10', ROCIndicator(close, window=10).roc()) + +def calc_momentum(close): + return ('momentum_10', close - close.shift(10)) + +def calc_psar(high, low, close): + from ta.trend import PSARIndicator + psar = PSARIndicator(high, low, close) + return [ + ('psar', psar.psar()), + ('psar_up', psar.psar_up()), + ('psar_down', psar.psar_down()) + ] + +def calc_donchian(high, low, close): + from ta.volatility import DonchianChannel + donchian = DonchianChannel(high, low, close, window=20) + return [ + ('donchian_hband', donchian.donchian_channel_hband()), + ('donchian_lband', donchian.donchian_channel_lband()), + ('donchian_mband', donchian.donchian_channel_mband()) + ] + +def calc_keltner(high, low, close): + from ta.volatility import KeltnerChannel + keltner = KeltnerChannel(high, low, close, window=20) + return [ + ('keltner_hband', keltner.keltner_channel_hband()), + ('keltner_lband', keltner.keltner_channel_lband()), + ('keltner_mband', keltner.keltner_channel_mband()) + ] + +def calc_dpo(close): + from ta.trend import DPOIndicator + return ('dpo_20', DPOIndicator(close, window=20).dpo()) + +def calc_ultimate(high, low, close): + from ta.momentum import UltimateOscillator + return ('ultimate_osc', UltimateOscillator(high, low, close).ultimate_oscillator()) + +def calc_ichimoku(high, low): + from ta.trend import IchimokuIndicator + ichimoku = IchimokuIndicator(high, low, window1=9, window2=26, window3=52) + return [ + ('ichimoku_a', ichimoku.ichimoku_a()), + ('ichimoku_b', ichimoku.ichimoku_b()), + ('ichimoku_base_line', ichimoku.ichimoku_base_line()), + ('ichimoku_conversion_line', ichimoku.ichimoku_conversion_line()) + ] + +def calc_elder_ray(close, low, high): + from ta.trend import EMAIndicator + ema = EMAIndicator(close, window=13).ema_indicator() + return [ + ('elder_ray_bull', ema - low), + ('elder_ray_bear', ema - high) + ] + +def calc_daily_return(close): + from ta.others import DailyReturnIndicator + return ('daily_return', DailyReturnIndicator(close).daily_return()) + +@njit +def fast_psar(high, low, close, af=0.02, max_af=0.2): + length = len(close) + psar = np.zeros(length) + bull = True + af_step = af + ep = low[0] + psar[0] = low[0] + for i in range(1, length): + prev_psar = psar[i-1] + if bull: + psar[i] = prev_psar + af_step * (ep - prev_psar) + if low[i] < psar[i]: + bull = False + psar[i] = ep + af_step = af + ep = low[i] + else: + if high[i] > ep: + ep = high[i] + af_step = min(af_step + af, max_af) + else: + psar[i] = prev_psar + af_step * (ep - prev_psar) + if high[i] > psar[i]: + bull = True + psar[i] = ep + af_step = af + ep = high[i] + else: + if low[i] < ep: + ep = low[i] + af_step = min(af_step + af, max_af) + return psar + +def compute_lag(df, col, lag): + return df[col].shift(lag) + +def compute_rolling(df, col, stat, window): + if stat == 'mean': + return df[col].rolling(window).mean() + elif stat == 'std': + return df[col].rolling(window).std() + elif stat == 'min': + return df[col].rolling(window).min() + elif stat == 'max': + return df[col].rolling(window).max() + +def compute_log_return(df, horizon): + return np.log(df['Close'] / df['Close'].shift(horizon)) + +def compute_volatility(df, window): + return df['log_return'].rolling(window).std() + +def run_feature_job(job, df): + feature_name, func, *args = job + print(f'Computing feature: {feature_name}') + result = func(df, *args) + return feature_name, result if __name__ == '__main__': csv_path = './data/btcusd_1-min_data.csv' + csv_prefix = os.path.splitext(os.path.basename(csv_path))[0] + df = pd.read_csv(csv_path) df = df[df['Volume'] != 0] @@ -26,160 +230,154 @@ if __name__ == '__main__': lags = 3 print('Calculating log returns as the new target...') - # Calculate log returns as the new target + df['log_return'] = np.log(df['Close'] / df['Close'].shift(1)) ohlcv_cols = ['Open', 'High', 'Low', 'Close', 'Volume'] window_sizes = [5, 15, 30] # in minutes, adjust as needed - # Collect new features in a dictionary features_dict = {} + print('Starting feature computation...') + feature_start_time = time.time() + + # --- Technical Indicator Features: Calculate or Load from Cache --- + print('Calculating or loading technical indicator features...') + indicator_jobs = [ + ('rsi', calc_rsi, [df['Close']]), + ('macd', calc_macd, [df['Close']]), + ('atr', calc_atr, [df['High'], df['Low'], df['Close']]), + ('cci', calc_cci, [df['High'], df['Low'], df['Close']]), + ('williams_r', calc_williamsr, [df['High'], df['Low'], df['Close']]), + ('ema_14', calc_ema, [df['Close']]), + ('obv', calc_obv, [df['Close'], df['Volume']]), + ('cmf', calc_cmf, [df['High'], df['Low'], df['Close'], df['Volume']]), + ('roc_10', calc_roc, [df['Close']]), + ('dpo_20', calc_dpo, [df['Close']]), + ('ultimate_osc', calc_ultimate, [df['High'], df['Low'], df['Close']]), + ('daily_return', calc_daily_return, [df['Close']]), + ] + # Multi-column indicators + multi_indicator_jobs = [ + ('bollinger', calc_bollinger, [df['Close']]), + ('stochastic', calc_stochastic, [df['High'], df['Low'], df['Close']]), + ('sma', calc_sma, [df['Close']]), + ('psar', calc_psar, [df['High'], df['Low'], df['Close']]), + ('donchian', calc_donchian, [df['High'], df['Low'], df['Close']]), + ('keltner', calc_keltner, [df['High'], df['Low'], df['Close']]), + ('ichimoku', calc_ichimoku, [df['High'], df['Low']]), + ('elder_ray', calc_elder_ray, [df['Close'], df['Low'], df['High']]), + ] + for feature_name, func, args in indicator_jobs: + feature_file = f'./data/{csv_prefix}_{feature_name}.npy' + if os.path.exists(feature_file): + print(f'Loading cached feature: {feature_file}') + features_dict[feature_name] = np.load(feature_file) + else: + result = func(*args) + if isinstance(result, tuple): + _, values = result + features_dict[feature_name] = values + np.save(feature_file, values.values) + else: + raise ValueError(f"Unexpected result for {feature_name}") + for feature_name, func, args in multi_indicator_jobs: + # These return a list of (name, values) + result = func(*args) + for subname, values in result: + sub_feature_file = f'./data/{csv_prefix}_{subname}.npy' + if os.path.exists(sub_feature_file): + print(f'Loading cached feature: {sub_feature_file}') + features_dict[subname] = np.load(sub_feature_file) + else: + features_dict[subname] = values + np.save(sub_feature_file, values.values) + + # Prepare jobs for lags, rolling stats, log returns, and volatility + feature_jobs = [] # Lags for col in ohlcv_cols: for lag in range(1, lags + 1): - print(f'Calculating lag feature: {col}_lag{lag}') - features_dict[f'{col}_lag{lag}'] = df[col].shift(lag) - + feature_name = f'{col}_lag{lag}' + feature_file = f'./data/{csv_prefix}_{feature_name}.npy' + if os.path.exists(feature_file): + print(f'Loading cached feature: {feature_file}') + features_dict[feature_name] = np.load(feature_file) + else: + feature_jobs.append((feature_name, compute_lag, col, lag)) # Rolling statistics for col in ohlcv_cols: for window in window_sizes: - # Skip useless features if (col == 'Open' and window == 5): - continue # Open_roll_min_5 + continue if (col == 'High' and window == 5): - continue # High_roll_mean_5, High_roll_min_5, High_roll_max_5 + continue if (col == 'High' and window == 30): - continue # High_roll_max_30 + continue if (col == 'Low' and window == 15): - continue # Low_roll_max_15 - print(f'Calculating rolling mean: {col}_roll_mean_{window}') - features_dict[f'{col}_roll_mean_{window}'] = df[col].rolling(window).mean() - print(f'Calculating rolling std: {col}_roll_std_{window}') - features_dict[f'{col}_roll_std_{window}'] = df[col].rolling(window).std() - print(f'Calculating rolling min: {col}_roll_min_{window}') - features_dict[f'{col}_roll_min_{window}'] = df[col].rolling(window).min() - print(f'Calculating rolling max: {col}_roll_max_{window}') - features_dict[f'{col}_roll_max_{window}'] = df[col].rolling(window).max() - + continue + for stat in ['mean', 'std', 'min', 'max']: + feature_name = f'{col}_roll_{stat}_{window}' + feature_file = f'./data/{csv_prefix}_{feature_name}.npy' + if os.path.exists(feature_file): + print(f'Loading cached feature: {feature_file}') + features_dict[feature_name] = np.load(feature_file) + else: + feature_jobs.append((feature_name, compute_rolling, col, stat, window)) # Log returns for different horizons for horizon in [5, 15, 30]: - print(f'Calculating log return for horizon {horizon}...') - features_dict[f'log_return_{horizon}'] = np.log(df['Close'] / df['Close'].shift(horizon)) - + feature_name = f'log_return_{horizon}' + feature_file = f'./data/{csv_prefix}_{feature_name}.npy' + if os.path.exists(feature_file): + print(f'Loading cached feature: {feature_file}') + features_dict[feature_name] = np.load(feature_file) + else: + feature_jobs.append((feature_name, compute_log_return, horizon)) # Volatility for window in window_sizes: - print(f'Calculating volatility for window {window}...') - features_dict[f'volatility_{window}'] = df['log_return'].rolling(window).std() + feature_name = f'volatility_{window}' + feature_file = f'./data/{csv_prefix}_{feature_name}.npy' + if os.path.exists(feature_file): + print(f'Loading cached feature: {feature_file}') + features_dict[feature_name] = np.load(feature_file) + else: + feature_jobs.append((feature_name, compute_volatility, window)) - # Technical indicators (except Supertrend) - print('Calculating RSI...') - features_dict['rsi'] = ta.momentum.RSIIndicator(df['Close'], window=14).rsi() - - print('Calculating MACD...') - features_dict['macd'] = ta.trend.MACD(df['Close']).macd() - - print('Calculating Bollinger Bands...') - bb = ta.volatility.BollingerBands(close=df['Close'], window=20, window_dev=2) - features_dict['bb_bbm'] = bb.bollinger_mavg() - features_dict['bb_bbh'] = bb.bollinger_hband() - features_dict['bb_bbl'] = bb.bollinger_lband() - features_dict['bb_bb_width'] = features_dict['bb_bbh'] - features_dict['bb_bbl'] - - print('Calculating Stochastic Oscillator...') - stoch = ta.momentum.StochasticOscillator(high=df['High'], low=df['Low'], close=df['Close'], window=14, smooth_window=3) - features_dict['stoch_k'] = stoch.stoch() - features_dict['stoch_d'] = stoch.stoch_signal() - - print('Calculating Average True Range (ATR)...') - atr = ta.volatility.AverageTrueRange(high=df['High'], low=df['Low'], close=df['Close'], window=14) - features_dict['atr'] = atr.average_true_range() - - print('Calculating Commodity Channel Index (CCI)...') - cci = ta.trend.CCIIndicator(high=df['High'], low=df['Low'], close=df['Close'], window=20) - features_dict['cci'] = cci.cci() - - print('Calculating Williams %R...') - willr = ta.momentum.WilliamsRIndicator(high=df['High'], low=df['Low'], close=df['Close'], lbp=14) - features_dict['williams_r'] = willr.williams_r() - - print('Calculating Exponential Moving Average (EMA)...') - ema = ta.trend.EMAIndicator(close=df['Close'], window=14) - features_dict['ema_14'] = ema.ema_indicator() - - print('Calculating On-Balance Volume (OBV)...') - obv = ta.volume.OnBalanceVolumeIndicator(close=df['Close'], volume=df['Volume']) - features_dict['obv'] = obv.on_balance_volume() - - print('Calculating Chaikin Money Flow (CMF)...') - cmf = ta.volume.ChaikinMoneyFlowIndicator(high=df['High'], low=df['Low'], close=df['Close'], volume=df['Volume'], window=20) - features_dict['cmf'] = cmf.chaikin_money_flow() - - # Additional TA indicators - # SMA - print('Calculating SMA 50 and 200...') - features_dict['sma_50'] = SMAIndicator(df['Close'], window=50).sma_indicator() - features_dict['sma_200'] = SMAIndicator(df['Close'], window=200).sma_indicator() - - # Rate of Change - print('Calculating ROC 10...') - features_dict['roc_10'] = ROCIndicator(df['Close'], window=10).roc() - - # Momentum - print('Calculating Momentum 10...') - features_dict['momentum_10'] = ta.momentum.MomentumIndicator(df['Close'], window=10).momentum() - - # Parabolic SAR - print('Calculating Parabolic SAR...') - psar = PSARIndicator(df['High'], df['Low'], df['Close']) - features_dict['psar'] = psar.psar() - features_dict['psar_up'] = psar.psar_up() - features_dict['psar_down'] = psar.psar_down() - - # Donchian Channel - print('Calculating Donchian Channel 20...') - donchian = DonchianChannel(df['High'], df['Low'], df['Close'], window=20) - features_dict['donchian_hband'] = donchian.donchian_channel_hband() - features_dict['donchian_lband'] = donchian.donchian_channel_lband() - features_dict['donchian_mband'] = donchian.donchian_channel_mband() - - # Keltner Channel - print('Calculating Keltner Channel 20...') - keltner = KeltnerChannel(df['High'], df['Low'], df['Close'], window=20) - features_dict['keltner_hband'] = keltner.keltner_channel_hband() - features_dict['keltner_lband'] = keltner.keltner_channel_lband() - features_dict['keltner_mband'] = keltner.keltner_channel_mband() - - # Detrended Price Oscillator - print('Calculating DPO 20...') - features_dict['dpo_20'] = DPOIndicator(df['Close'], window=20).dpo() - - # Ultimate Oscillator - print('Calculating Ultimate Oscillator...') - features_dict['ultimate_osc'] = UltimateOscillatorIndicator(df['High'], df['Low'], df['Close']).ultimate_oscillator() - - # Ichimoku - print('Calculating Ichimoku...') - ichimoku = IchimokuIndicator(df['High'], df['Low'], window1=9, window2=26, window3=52) - features_dict['ichimoku_a'] = ichimoku.ichimoku_a() - features_dict['ichimoku_b'] = ichimoku.ichimoku_b() - features_dict['ichimoku_base_line'] = ichimoku.ichimoku_base_line() - features_dict['ichimoku_conversion_line'] = ichimoku.ichimoku_conversion_line() - - # Elder Ray Index (Bull Power, Bear Power) - print('Calculating Elder Ray Index...') - features_dict['elder_ray_bull'] = ta.trend.EMAIndicator(df['Close'], window=13).ema_indicator() - df['Low'] - features_dict['elder_ray_bear'] = ta.trend.EMAIndicator(df['Close'], window=13).ema_indicator() - df['High'] - - # Pivot Points (Daily) - print('Calculating Daily Pivot Points...') - features_dict['daily_return'] = DailyReturnIndicator(df['Close']).daily_return() + # Parallel computation for all non-cached features + if feature_jobs: + print(f'Computing {len(feature_jobs)} features in parallel...') + with concurrent.futures.ProcessPoolExecutor() as executor: + futures = [executor.submit(run_feature_job, job, df) for job in feature_jobs] + for future in concurrent.futures.as_completed(futures): + feature_name, result = future.result() + features_dict[feature_name] = result + feature_file = f'./data/{csv_prefix}_{feature_name}.npy' + np.save(feature_file, result.values) + print('All parallel features computed.') + else: + print('All features loaded from cache.') # Concatenate all new features at once print('Concatenating all new features to DataFrame...') features_df = pd.DataFrame(features_dict) df = pd.concat([df, features_df], axis=1) + # Downcast all float columns to save memory + print('Downcasting float columns to save memory...') + for col in df.columns: + try: + df[col] = pd.to_numeric(df[col], downcast='float') + except Exception: + pass + + # Drop intermediate features_df to free memory + del features_df + import gc + gc.collect() + + feature_end_time = time.time() + print(f'Feature computation completed in {feature_end_time - feature_start_time:.2f} seconds.') + # Add Supertrend indicators (custom) print('Preparing data for Supertrend calculation...') st_df = df.rename(columns={'High': 'high', 'Low': 'low', 'Close': 'close'}) @@ -199,6 +397,7 @@ if __name__ == '__main__': # Add time features (exclude 'dayofweek') print('Adding hour feature...') + df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce') df['hour'] = df['Timestamp'].dt.hour # Drop NaNs after all feature engineering @@ -209,6 +408,9 @@ if __name__ == '__main__': print('Selecting feature columns...') exclude_cols = ['Timestamp', 'Close', 'log_return', 'log_return_5', 'log_return_15', 'log_return_30'] feature_cols = [col for col in df.columns if col not in exclude_cols] + # Drop excluded columns to save memory + print('Dropping excluded columns to save memory...') + df = df[feature_cols + ['log_return', 'Timestamp']] print('Preparing X and y...') X = df[feature_cols].values.astype(np.float32)