Added multiple technical indicators for feature engineering, including ADX, TRIX, Vortex, KAMA, Force Index, EOM, MFI, ADI, TEMA, StochRSI, and Awesome Oscillator. Improved NaN handling and implemented leave-one-out feature evaluation with results saved to CSV.

This commit is contained in:
Simon Moisy 2025-05-30 17:59:09 +08:00
parent ced64825bd
commit ada6150413

View File

@ -3,13 +3,14 @@ import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from custom_xgboost import CustomXGBoostGPU
from sklearn.metrics import mean_squared_error
from plot_results import plot_predicted_vs_actual_prices, plot_prediction_error_distribution
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from plot_results import plot_prediction_error_distribution, plot_direction_transition_heatmap
from cycles.supertrend import Supertrends
import time
from numba import njit
import itertools
import csv
def run_indicator(func, *args):
return func(*args)
@ -206,7 +207,74 @@ def run_feature_job(job, df):
result = func(df, *args)
return feature_name, result
def calc_adx(high, low, close):
from ta.trend import ADXIndicator
adx = ADXIndicator(high=high, low=low, close=close, window=14)
return [
('adx', adx.adx()),
('adx_pos', adx.adx_pos()),
('adx_neg', adx.adx_neg())
]
def calc_trix(close):
from ta.trend import TRIXIndicator
trix = TRIXIndicator(close=close, window=15)
return ('trix', trix.trix())
def calc_vortex(high, low, close):
from ta.trend import VortexIndicator
vortex = VortexIndicator(high=high, low=low, close=close, window=14)
return [
('vortex_pos', vortex.vortex_indicator_pos()),
('vortex_neg', vortex.vortex_indicator_neg())
]
def calc_kama(close):
import pandas_ta as ta
kama = ta.kama(close, length=10)
return ('kama', kama)
def calc_force_index(close, volume):
from ta.volume import ForceIndexIndicator
fi = ForceIndexIndicator(close=close, volume=volume, window=13)
return ('force_index', fi.force_index())
def calc_eom(high, low, volume):
from ta.volume import EaseOfMovementIndicator
eom = EaseOfMovementIndicator(high=high, low=low, volume=volume, window=14)
return ('eom', eom.ease_of_movement())
def calc_mfi(high, low, close, volume):
from ta.volume import MFIIndicator
mfi = MFIIndicator(high=high, low=low, close=close, volume=volume, window=14)
return ('mfi', mfi.money_flow_index())
def calc_adi(high, low, close, volume):
from ta.volume import AccDistIndexIndicator
adi = AccDistIndexIndicator(high=high, low=low, close=close, volume=volume)
return ('adi', adi.acc_dist_index())
def calc_tema(close):
import pandas_ta as ta
tema = ta.tema(close, length=10)
return ('tema', tema)
def calc_stochrsi(close):
from ta.momentum import StochRSIIndicator
stochrsi = StochRSIIndicator(close=close, window=14, smooth1=3, smooth2=3)
return [
('stochrsi', stochrsi.stochrsi()),
('stochrsi_k', stochrsi.stochrsi_k()),
('stochrsi_d', stochrsi.stochrsi_d())
]
def calc_awesome_oscillator(high, low):
from ta.momentum import AwesomeOscillatorIndicator
ao = AwesomeOscillatorIndicator(high=high, low=low, window1=5, window2=34)
return ('awesome_osc', ao.awesome_oscillator())
if __name__ == '__main__':
IMPUTE_NANS = True # Set to True to impute NaNs, False to drop rows with NaNs
csv_path = './data/btcusd_1-min_data.csv'
csv_prefix = os.path.splitext(os.path.basename(csv_path))[0]
@ -576,6 +644,37 @@ if __name__ == '__main__':
np.save(feature_file, result.values)
print(f'Saved feature: {feature_file}')
# --- Additional Technical Indicator Features ---
# ADX
adx_names = ['adx', 'adx_pos', 'adx_neg']
adx_files = [f'./data/{csv_prefix}_{name}.npy' for name in adx_names]
if all(os.path.exists(f) for f in adx_files):
print('G Loading cached features: ADX')
for name, f in zip(adx_names, adx_files):
arr = np.load(f)
features_dict[name] = pd.Series(arr, index=df.index)
else:
print('Calculating multi-column indicator: adx')
result = calc_adx(df['High'], df['Low'], df['Close'])
for subname, values in result:
sub_feature_file = f'./data/{csv_prefix}_{subname}.npy'
features_dict[subname] = values
np.save(sub_feature_file, values.values)
print(f'Saved feature: {sub_feature_file}')
# Force Index
feature_file = f'./data/{csv_prefix}_force_index.npy'
if os.path.exists(feature_file):
print(f'K Loading cached feature: {feature_file}')
arr = np.load(feature_file)
features_dict['force_index'] = pd.Series(arr, index=df.index)
else:
print('Calculating feature: force_index')
_, values = calc_force_index(df['Close'], df['Volume'])
features_dict['force_index'] = values
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Concatenate all new features at once
print('Concatenating all new features to DataFrame...')
features_df = pd.DataFrame(features_dict)
@ -625,96 +724,79 @@ if __name__ == '__main__':
df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
df['hour'] = df['Timestamp'].dt.hour
# Impute NaNs after all feature engineering
print('Imputing NaNs after feature engineering (using column means)...')
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean())
# If you want to impute non-numeric columns differently, add logic here
# Drop excluded columns to save memory
print('Dropping excluded columns to save memory...')
df = df[feature_cols + ['log_return', 'Timestamp']]
print('Preparing X and y...')
X = df[feature_cols].values.astype(np.float32)
y = df['log_return'].values.astype(np.float32)
split_idx = int(len(X) * 0.8)
print(f'Splitting data: {split_idx} train, {len(X) - split_idx} test')
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
test_timestamps = df['Timestamp'].values[split_idx:]
print('Initializing model...')
model = CustomXGBoostGPU(X_train, X_test, y_train, y_test)
print('Training model...')
booster = model.train()
print('Training complete.')
# Save the trained model
model.save_model('./data/xgboost_model.json')
print('Model saved to ./data/xgboost_model.json')
if hasattr(model, 'params'):
print("Model hyperparameters:", model.params)
if hasattr(model, 'model') and hasattr(model.model, 'get_score'):
import operator
importances = model.model.get_score(importance_type='weight')
# Map f0, f1, ... to actual feature names
feature_map = {f"f{idx}": name for idx, name in enumerate(feature_cols)}
sorted_importances = sorted(importances.items(), key=operator.itemgetter(1), reverse=True)
print('Feature importances (sorted, with names):')
for feat, score in sorted_importances:
print(f'{feature_map.get(feat, feat)}: {score}')
print('Making predictions for first 5 test samples...')
preds = model.predict(X_test[:5])
print('Predictions for first 5 test samples:', preds)
print('Actual values for first 5 test samples:', y_test[:5])
print('Making predictions for all test samples...')
test_preds = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, test_preds))
print(f'RMSE on test set: {rmse:.4f}')
print('Saving y_test and test_preds to disk...')
np.save('./data/y_test.npy', y_test)
np.save('./data/test_preds.npy', test_preds)
# Reconstruct price series from log returns
print('Reconstructing price series from log returns...')
# Get the last available Close price before the test set
# The DataFrame df has been reset, so use split_idx to get the right row
if 'Close' in df.columns:
close_prices = df['Close'].values
# Handle NaNs after all feature engineering
if IMPUTE_NANS:
print('Imputing NaNs after feature engineering (using mean imputation)...')
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df[col] = df[col].fillna(df[col].mean())
# If you want to impute non-numeric columns differently, add logic here
else:
# Reload original CSV to get Close prices if not present
close_prices = pd.read_csv(csv_path)['Close'].values
start_price = close_prices[split_idx] # This is the price at the split point
# Actual prices
actual_prices = [start_price]
for r in y_test:
actual_prices.append(actual_prices[-1] * np.exp(r))
actual_prices = np.array(actual_prices[1:])
# Predicted prices
predicted_prices = [start_price]
for r in test_preds:
predicted_prices.append(predicted_prices[-1] * np.exp(r))
predicted_prices = np.array(predicted_prices[1:])
print('Dropping NaNs after feature engineering...')
df = df.dropna().reset_index(drop=True)
print('Plotting predicted vs actual prices...')
plot_predicted_vs_actual_prices(actual_prices, predicted_prices, test_timestamps)
# Exclude 'Timestamp', 'Close', 'log_return', and any future target columns from features
print('Selecting feature columns...')
exclude_cols = ['Timestamp', 'Close', 'log_return', 'log_return_5', 'log_return_15', 'log_return_30']
feature_cols = [col for col in df.columns if col not in exclude_cols]
print('Features used for training:', feature_cols)
print('Plotting distribution of absolute prediction errors...')
plot_prediction_error_distribution(predicted_prices, actual_prices)
# Prepare CSV for results
results_csv = './data/leave_one_out_results.csv'
if not os.path.exists(results_csv):
with open(results_csv, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['left_out_feature', 'used_features', 'rmse', 'mae', 'r2', 'mape', 'directional_accuracy'])
print("Final features used for training:", feature_cols)
total_features = len(feature_cols)
for idx, left_out in enumerate(feature_cols):
used = [f for f in feature_cols if f != left_out]
print(f'\n=== Leave-one-out {idx+1}/{total_features}: left out {left_out} ===')
try:
# Prepare X and y for this combination
X = df[used].values.astype(np.float32)
y = df["log_return"].values.astype(np.float32)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
test_timestamps = df['Timestamp'].values[split_idx:]
print("Shape of X:", X.shape)
print("First row of X:", X[0])
print("stoch_k in feature_cols?", "stoch_k" in feature_cols)
if "stoch_k" in feature_cols:
idx = feature_cols.index("stoch_k")
print("First 10 values of stoch_k:", X[:10, idx])
model = CustomXGBoostGPU(X_train, X_test, y_train, y_test)
booster = model.train()
unique_prefix = str(int(time.time() * 1000))
# model.save_model(f'./data/xgboost_model_{unique_prefix}.json')
test_preds = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, test_preds))
# Reconstruct price series from log returns
if 'Close' in df.columns:
close_prices = df['Close'].values
else:
close_prices = pd.read_csv(csv_path)['Close'].values
start_price = close_prices[split_idx]
actual_prices = [start_price]
for r_ in y_test:
actual_prices.append(actual_prices[-1] * np.exp(r_))
actual_prices = np.array(actual_prices[1:])
predicted_prices = [start_price]
for r_ in test_preds:
predicted_prices.append(predicted_prices[-1] * np.exp(r_))
predicted_prices = np.array(predicted_prices[1:])
mae = mean_absolute_error(actual_prices, predicted_prices)
r2 = r2_score(actual_prices, predicted_prices)
direction_actual = np.sign(np.diff(actual_prices))
direction_pred = np.sign(np.diff(predicted_prices))
directional_accuracy = (direction_actual == direction_pred).mean()
mape = np.mean(np.abs((actual_prices - predicted_prices) / actual_prices)) * 100
# Save results to CSV
with open(results_csv, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([left_out, "|".join(used), rmse, mae, r2, mape, directional_accuracy])
print(f'Left out {left_out}: RMSE={rmse:.4f}, MAE={mae:.4f}, R2={r2:.4f}, MAPE={mape:.2f}%, DirAcc={directional_accuracy*100:.2f}%')
except Exception as e:
print(f'Leave-one-out failed for {left_out}: {e}')
print(f'All leave-one-out runs completed. Results saved to {results_csv}')
sys.exit(0)