Cycles/xgboost/main.py
2025-05-29 12:45:45 +08:00

459 lines
17 KiB
Python

import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from custom_xgboost import CustomXGBoostGPU
from sklearn.metrics import mean_squared_error
from plot_results import display_actual_vs_predicted, plot_target_distribution, plot_predicted_vs_actual_log_returns
import ta
from cycles.supertrend import Supertrends
from ta.trend import SMAIndicator, DPOIndicator, IchimokuIndicator, PSARIndicator
from ta.momentum import ROCIndicator, KAMAIndicator, UltimateOscillator, StochasticOscillator, WilliamsRIndicator
from ta.volatility import KeltnerChannel, DonchianChannel
from ta.others import DailyReturnIndicator
import time
import concurrent.futures
from numba import njit
def run_indicator(func, *args):
return func(*args)
def run_indicator_job(job):
import time
func, *args = job
indicator_name = func.__name__
start = time.time()
result = func(*args)
elapsed = time.time() - start
print(f'Indicator {indicator_name} computed in {elapsed:.4f} seconds')
return result
def calc_rsi(close):
from ta.momentum import RSIIndicator
return ('rsi', RSIIndicator(close, window=14).rsi())
def calc_macd(close):
from ta.trend import MACD
return ('macd', MACD(close).macd())
def calc_bollinger(close):
from ta.volatility import BollingerBands
bb = BollingerBands(close=close, window=20, window_dev=2)
return [
('bb_bbm', bb.bollinger_mavg()),
('bb_bbh', bb.bollinger_hband()),
('bb_bbl', bb.bollinger_lband()),
('bb_bb_width', bb.bollinger_hband() - bb.bollinger_lband())
]
def calc_stochastic(high, low, close):
from ta.momentum import StochasticOscillator
stoch = StochasticOscillator(high=high, low=low, close=close, window=14, smooth_window=3)
return [
('stoch_k', stoch.stoch()),
('stoch_d', stoch.stoch_signal())
]
def calc_atr(high, low, close):
from ta.volatility import AverageTrueRange
atr = AverageTrueRange(high=high, low=low, close=close, window=14)
return ('atr', atr.average_true_range())
def calc_cci(high, low, close):
from ta.trend import CCIIndicator
cci = CCIIndicator(high=high, low=low, close=close, window=20)
return ('cci', cci.cci())
def calc_williamsr(high, low, close):
from ta.momentum import WilliamsRIndicator
willr = WilliamsRIndicator(high=high, low=low, close=close, lbp=14)
return ('williams_r', willr.williams_r())
def calc_ema(close):
from ta.trend import EMAIndicator
ema = EMAIndicator(close=close, window=14)
return ('ema_14', ema.ema_indicator())
def calc_obv(close, volume):
from ta.volume import OnBalanceVolumeIndicator
obv = OnBalanceVolumeIndicator(close=close, volume=volume)
return ('obv', obv.on_balance_volume())
def calc_cmf(high, low, close, volume):
from ta.volume import ChaikinMoneyFlowIndicator
cmf = ChaikinMoneyFlowIndicator(high=high, low=low, close=close, volume=volume, window=20)
return ('cmf', cmf.chaikin_money_flow())
def calc_sma(close):
from ta.trend import SMAIndicator
return [
('sma_50', SMAIndicator(close, window=50).sma_indicator()),
('sma_200', SMAIndicator(close, window=200).sma_indicator())
]
def calc_roc(close):
from ta.momentum import ROCIndicator
return ('roc_10', ROCIndicator(close, window=10).roc())
def calc_momentum(close):
return ('momentum_10', close - close.shift(10))
def calc_psar(high, low, close):
from ta.trend import PSARIndicator
psar = PSARIndicator(high, low, close)
return [
('psar', psar.psar()),
('psar_up', psar.psar_up()),
('psar_down', psar.psar_down())
]
def calc_donchian(high, low, close):
from ta.volatility import DonchianChannel
donchian = DonchianChannel(high, low, close, window=20)
return [
('donchian_hband', donchian.donchian_channel_hband()),
('donchian_lband', donchian.donchian_channel_lband()),
('donchian_mband', donchian.donchian_channel_mband())
]
def calc_keltner(high, low, close):
from ta.volatility import KeltnerChannel
keltner = KeltnerChannel(high, low, close, window=20)
return [
('keltner_hband', keltner.keltner_channel_hband()),
('keltner_lband', keltner.keltner_channel_lband()),
('keltner_mband', keltner.keltner_channel_mband())
]
def calc_dpo(close):
from ta.trend import DPOIndicator
return ('dpo_20', DPOIndicator(close, window=20).dpo())
def calc_ultimate(high, low, close):
from ta.momentum import UltimateOscillator
return ('ultimate_osc', UltimateOscillator(high, low, close).ultimate_oscillator())
def calc_ichimoku(high, low):
from ta.trend import IchimokuIndicator
ichimoku = IchimokuIndicator(high, low, window1=9, window2=26, window3=52)
return [
('ichimoku_a', ichimoku.ichimoku_a()),
('ichimoku_b', ichimoku.ichimoku_b()),
('ichimoku_base_line', ichimoku.ichimoku_base_line()),
('ichimoku_conversion_line', ichimoku.ichimoku_conversion_line())
]
def calc_elder_ray(close, low, high):
from ta.trend import EMAIndicator
ema = EMAIndicator(close, window=13).ema_indicator()
return [
('elder_ray_bull', ema - low),
('elder_ray_bear', ema - high)
]
def calc_daily_return(close):
from ta.others import DailyReturnIndicator
return ('daily_return', DailyReturnIndicator(close).daily_return())
@njit
def fast_psar(high, low, close, af=0.02, max_af=0.2):
length = len(close)
psar = np.zeros(length)
bull = True
af_step = af
ep = low[0]
psar[0] = low[0]
for i in range(1, length):
prev_psar = psar[i-1]
if bull:
psar[i] = prev_psar + af_step * (ep - prev_psar)
if low[i] < psar[i]:
bull = False
psar[i] = ep
af_step = af
ep = low[i]
else:
if high[i] > ep:
ep = high[i]
af_step = min(af_step + af, max_af)
else:
psar[i] = prev_psar + af_step * (ep - prev_psar)
if high[i] > psar[i]:
bull = True
psar[i] = ep
af_step = af
ep = high[i]
else:
if low[i] < ep:
ep = low[i]
af_step = min(af_step + af, max_af)
return psar
def compute_lag(df, col, lag):
return df[col].shift(lag)
def compute_rolling(df, col, stat, window):
if stat == 'mean':
return df[col].rolling(window).mean()
elif stat == 'std':
return df[col].rolling(window).std()
elif stat == 'min':
return df[col].rolling(window).min()
elif stat == 'max':
return df[col].rolling(window).max()
def compute_log_return(df, horizon):
return np.log(df['Close'] / df['Close'].shift(horizon))
def compute_volatility(df, window):
return df['log_return'].rolling(window).std()
def run_feature_job(job, df):
feature_name, func, *args = job
print(f'Computing feature: {feature_name}')
result = func(df, *args)
return feature_name, result
if __name__ == '__main__':
csv_path = './data/btcusd_1-min_data.csv'
csv_prefix = os.path.splitext(os.path.basename(csv_path))[0]
df = pd.read_csv(csv_path)
df = df[df['Volume'] != 0]
min_date = '2017-06-01'
df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
df = df[df['Timestamp'] >= min_date]
lags = 3
print('Calculating log returns as the new target...')
df['log_return'] = np.log(df['Close'] / df['Close'].shift(1))
ohlcv_cols = ['Open', 'High', 'Low', 'Close', 'Volume']
window_sizes = [5, 15, 30] # in minutes, adjust as needed
features_dict = {}
print('Starting feature computation...')
feature_start_time = time.time()
# --- Technical Indicator Features: Calculate or Load from Cache ---
print('Calculating or loading technical indicator features...')
indicator_jobs = [
('rsi', calc_rsi, [df['Close']]),
('macd', calc_macd, [df['Close']]),
('atr', calc_atr, [df['High'], df['Low'], df['Close']]),
('cci', calc_cci, [df['High'], df['Low'], df['Close']]),
('williams_r', calc_williamsr, [df['High'], df['Low'], df['Close']]),
('ema_14', calc_ema, [df['Close']]),
('obv', calc_obv, [df['Close'], df['Volume']]),
('cmf', calc_cmf, [df['High'], df['Low'], df['Close'], df['Volume']]),
('roc_10', calc_roc, [df['Close']]),
('dpo_20', calc_dpo, [df['Close']]),
('ultimate_osc', calc_ultimate, [df['High'], df['Low'], df['Close']]),
('daily_return', calc_daily_return, [df['Close']]),
]
# Multi-column indicators
multi_indicator_jobs = [
('bollinger', calc_bollinger, [df['Close']]),
('stochastic', calc_stochastic, [df['High'], df['Low'], df['Close']]),
('sma', calc_sma, [df['Close']]),
('psar', calc_psar, [df['High'], df['Low'], df['Close']]),
('donchian', calc_donchian, [df['High'], df['Low'], df['Close']]),
('keltner', calc_keltner, [df['High'], df['Low'], df['Close']]),
('ichimoku', calc_ichimoku, [df['High'], df['Low']]),
('elder_ray', calc_elder_ray, [df['Close'], df['Low'], df['High']]),
]
for feature_name, func, args in indicator_jobs:
feature_file = f'./data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
result = func(*args)
if isinstance(result, tuple):
_, values = result
features_dict[feature_name] = values
np.save(feature_file, values.values)
else:
raise ValueError(f"Unexpected result for {feature_name}")
for feature_name, func, args in multi_indicator_jobs:
# These return a list of (name, values)
result = func(*args)
for subname, values in result:
sub_feature_file = f'./data/{csv_prefix}_{subname}.npy'
if os.path.exists(sub_feature_file):
print(f'Loading cached feature: {sub_feature_file}')
features_dict[subname] = np.load(sub_feature_file)
else:
features_dict[subname] = values
np.save(sub_feature_file, values.values)
# Prepare jobs for lags, rolling stats, log returns, and volatility
feature_jobs = []
# Lags
for col in ohlcv_cols:
for lag in range(1, lags + 1):
feature_name = f'{col}_lag{lag}'
feature_file = f'./data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
feature_jobs.append((feature_name, compute_lag, col, lag))
# Rolling statistics
for col in ohlcv_cols:
for window in window_sizes:
if (col == 'Open' and window == 5):
continue
if (col == 'High' and window == 5):
continue
if (col == 'High' and window == 30):
continue
if (col == 'Low' and window == 15):
continue
for stat in ['mean', 'std', 'min', 'max']:
feature_name = f'{col}_roll_{stat}_{window}'
feature_file = f'./data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
feature_jobs.append((feature_name, compute_rolling, col, stat, window))
# Log returns for different horizons
for horizon in [5, 15, 30]:
feature_name = f'log_return_{horizon}'
feature_file = f'./data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
feature_jobs.append((feature_name, compute_log_return, horizon))
# Volatility
for window in window_sizes:
feature_name = f'volatility_{window}'
feature_file = f'./data/{csv_prefix}_{feature_name}.npy'
if os.path.exists(feature_file):
print(f'Loading cached feature: {feature_file}')
features_dict[feature_name] = np.load(feature_file)
else:
feature_jobs.append((feature_name, compute_volatility, window))
# Parallel computation for all non-cached features
if feature_jobs:
print(f'Computing {len(feature_jobs)} features in parallel...')
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(run_feature_job, job, df) for job in feature_jobs]
for future in concurrent.futures.as_completed(futures):
feature_name, result = future.result()
features_dict[feature_name] = result
feature_file = f'./data/{csv_prefix}_{feature_name}.npy'
np.save(feature_file, result.values)
print('All parallel features computed.')
else:
print('All features loaded from cache.')
# Concatenate all new features at once
print('Concatenating all new features to DataFrame...')
features_df = pd.DataFrame(features_dict)
df = pd.concat([df, features_df], axis=1)
# Downcast all float columns to save memory
print('Downcasting float columns to save memory...')
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col], downcast='float')
except Exception:
pass
# Drop intermediate features_df to free memory
del features_df
import gc
gc.collect()
feature_end_time = time.time()
print(f'Feature computation completed in {feature_end_time - feature_start_time:.2f} seconds.')
# Add Supertrend indicators (custom)
print('Preparing data for Supertrend calculation...')
st_df = df.rename(columns={'High': 'high', 'Low': 'low', 'Close': 'close'})
print('Calculating Supertrend indicators...')
supertrend = Supertrends(st_df)
st_results = supertrend.calculate_supertrend_indicators()
for idx, st in enumerate(st_results):
period = st['params']['period']
multiplier = st['params']['multiplier']
# Skip useless supertrend features
if (period == 10 and multiplier == 1.0) or (period == 11 and multiplier == 2.0):
continue
print(f'Adding Supertrend features: supertrend_{period}_{multiplier} and supertrend_trend_{period}_{multiplier}')
df[f'supertrend_{period}_{multiplier}'] = st['results']['supertrend']
df[f'supertrend_trend_{period}_{multiplier}'] = st['results']['trend']
# Add time features (exclude 'dayofweek')
print('Adding hour feature...')
df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
df['hour'] = df['Timestamp'].dt.hour
# Drop NaNs after all feature engineering
print('Dropping NaNs after feature engineering...')
df = df.dropna().reset_index(drop=True)
# Exclude 'Timestamp', 'Close', 'log_return', and any future target columns from features
print('Selecting feature columns...')
exclude_cols = ['Timestamp', 'Close', 'log_return', 'log_return_5', 'log_return_15', 'log_return_30']
feature_cols = [col for col in df.columns if col not in exclude_cols]
# Drop excluded columns to save memory
print('Dropping excluded columns to save memory...')
df = df[feature_cols + ['log_return', 'Timestamp']]
print('Preparing X and y...')
X = df[feature_cols].values.astype(np.float32)
y = df['log_return'].values.astype(np.float32)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
test_timestamps = df['Timestamp'].values[split_idx:]
print('Initializing model...')
model = CustomXGBoostGPU(X_train, X_test, y_train, y_test)
print('Training model...')
booster = model.train()
print('Training complete.')
if hasattr(model, 'params'):
print("Model hyperparameters:", model.params)
if hasattr(model, 'model') and hasattr(model.model, 'get_score'):
import operator
importances = model.model.get_score(importance_type='weight')
# Map f0, f1, ... to actual feature names
feature_map = {f"f{idx}": name for idx, name in enumerate(feature_cols)}
sorted_importances = sorted(importances.items(), key=operator.itemgetter(1), reverse=True)
print('Feature importances (sorted, with names):')
for feat, score in sorted_importances:
print(f'{feature_map.get(feat, feat)}: {score}')
preds = model.predict(X_test[:5])
print('Predictions for first 5 test samples:', preds)
print('Actual values for first 5 test samples:', y_test[:5])
test_preds = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, test_preds))
print(f'RMSE on test set: {rmse:.4f}')
np.save('./data/y_test.npy', y_test)
np.save('./data/test_preds.npy', test_preds)
# display_actual_vs_predicted(y_test, test_preds, test_timestamps)
# plot_target_distribution(y_train, y_test)
plot_predicted_vs_actual_log_returns(y_test, test_preds, test_timestamps)