Implemented Supertrend indicators for feature engineering in main.py, including caching of computed features. Updated plotting functions in plot_results.py to save charts in a dedicated directory and added new functions for directional accuracy and prediction transition heatmaps.

This commit is contained in:
Simon Moisy 2025-05-30 18:14:42 +08:00
parent ada6150413
commit 082a2835b6
2 changed files with 156 additions and 40 deletions

View File

@ -11,6 +11,7 @@ import time
from numba import njit
import itertools
import csv
import pandas_ta as ta
def run_indicator(func, *args):
return func(*args)
@ -675,6 +676,25 @@ if __name__ == '__main__':
np.save(feature_file, values.values)
print(f'Saved feature: {feature_file}')
# Supertrend indicators
for period, multiplier in [(12, 3.0), (10, 1.0), (11, 2.0)]:
st_name = f'supertrend_{period}_{multiplier}'
st_trend_name = f'supertrend_trend_{period}_{multiplier}'
st_file = f'./data/{csv_prefix}_{st_name}.npy'
st_trend_file = f'./data/{csv_prefix}_{st_trend_name}.npy'
if os.path.exists(st_file) and os.path.exists(st_trend_file):
print(f'L Loading cached features: {st_file}, {st_trend_file}')
features_dict[st_name] = pd.Series(np.load(st_file), index=df.index)
features_dict[st_trend_name] = pd.Series(np.load(st_trend_file), index=df.index)
else:
print(f'Calculating Supertrend indicator: {st_name}')
st = ta.supertrend(df['High'], df['Low'], df['Close'], length=period, multiplier=multiplier)
features_dict[st_name] = st[f'SUPERT_{period}_{multiplier}']
features_dict[st_trend_name] = st[f'SUPERTd_{period}_{multiplier}']
np.save(st_file, features_dict[st_name].values)
np.save(st_trend_file, features_dict[st_trend_name].values)
print(f'Saved features: {st_file}, {st_trend_file}')
# Concatenate all new features at once
print('Concatenating all new features to DataFrame...')
features_df = pd.DataFrame(features_dict)
@ -693,32 +713,6 @@ if __name__ == '__main__':
except Exception:
pass
# Drop intermediate features_df to free memory
print('Dropping intermediate features_df to free memory...')
del features_df
import gc
gc.collect()
feature_end_time = time.time()
print(f'Feature computation completed in {feature_end_time - feature_start_time:.2f} seconds.')
# Add Supertrend indicators (custom)
print('Preparing data for Supertrend calculation...')
st_df = df.rename(columns={'High': 'high', 'Low': 'low', 'Close': 'close'})
print('Calculating Supertrend indicators...')
supertrend = Supertrends(st_df)
st_results = supertrend.calculate_supertrend_indicators()
for idx, st in enumerate(st_results):
period = st['params']['period']
multiplier = st['params']['multiplier']
# Skip useless supertrend features
if (period == 10 and multiplier == 1.0) or (period == 11 and multiplier == 2.0):
continue
print(f'Adding Supertrend features: supertrend_{period}_{multiplier} and supertrend_trend_{period}_{multiplier}')
df[f'supertrend_{period}_{multiplier}'] = st['results']['supertrend']
df[f'supertrend_trend_{period}_{multiplier}'] = st['results']['trend']
# Add time features (exclude 'dayofweek')
print('Adding hour feature...')
df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
@ -749,7 +743,10 @@ if __name__ == '__main__':
writer.writerow(['left_out_feature', 'used_features', 'rmse', 'mae', 'r2', 'mape', 'directional_accuracy'])
total_features = len(feature_cols)
never_leave_out = {'Open', 'High', 'Low', 'Close', 'Volume'}
for idx, left_out in enumerate(feature_cols):
if left_out in never_leave_out:
continue
used = [f for f in feature_cols if f != left_out]
print(f'\n=== Leave-one-out {idx+1}/{total_features}: left out {left_out} ===')
try:
@ -763,8 +760,7 @@ if __name__ == '__main__':
model = CustomXGBoostGPU(X_train, X_test, y_train, y_test)
booster = model.train()
unique_prefix = str(int(time.time() * 1000))
# model.save_model(f'./data/xgboost_model_{unique_prefix}.json')
model.save_model(f'./data/xgboost_model_wo_{left_out}.json')
test_preds = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, test_preds))
@ -796,6 +792,14 @@ if __name__ == '__main__':
writer = csv.writer(f)
writer.writerow([left_out, "|".join(used), rmse, mae, r2, mape, directional_accuracy])
print(f'Left out {left_out}: RMSE={rmse:.4f}, MAE={mae:.4f}, R2={r2:.4f}, MAPE={mape:.2f}%, DirAcc={directional_accuracy*100:.2f}%')
# Plotting for this run
plot_prefix = f'loo_{left_out}'
print('Plotting distribution of absolute prediction errors...')
plot_prediction_error_distribution(predicted_prices, actual_prices, prefix=plot_prefix)
print('Plotting directional accuracy...')
plot_direction_transition_heatmap(actual_prices, predicted_prices, prefix=plot_prefix)
except Exception as e:
print(f'Leave-one-out failed for {left_out}: {e}')
print(f'All leave-one-out runs completed. Results saved to {results_csv}')

View File

@ -24,7 +24,7 @@ def display_actual_vs_predicted(y_test, test_preds, timestamps, n_plot=200):
hovermode='closest'
)
fig = go.Figure(data=data, layout=layout)
pyo.plot(fig)
pyo.plot(fig, auto_open=False)
def plot_target_distribution(y_train, y_test):
import plotly.offline as pyo
@ -50,7 +50,7 @@ def plot_target_distribution(y_train, y_test):
barmode='overlay'
)
fig = go.Figure(data=data, layout=layout)
pyo.plot(fig)
pyo.plot(fig, auto_open=False)
def plot_predicted_vs_actual_log_returns(y_test, test_preds, timestamps=None, n_plot=200):
import plotly.offline as pyo
@ -78,7 +78,7 @@ def plot_predicted_vs_actual_log_returns(y_test, test_preds, timestamps=None, n_
hovermode='closest'
)
fig_line = go.Figure(data=data_line, layout=layout_line)
pyo.plot(fig_line, filename='log_return_line_plot.html')
pyo.plot(fig_line, filename='charts/log_return_line_plot.html', auto_open=False)
# Scatter plot: Predicted vs Actual
trace_scatter = go.Scatter(
@ -108,7 +108,7 @@ def plot_predicted_vs_actual_log_returns(y_test, test_preds, timestamps=None, n_
hovermode='closest'
)
fig_scatter = go.Figure(data=data_scatter, layout=layout_scatter)
pyo.plot(fig_scatter, filename='log_return_scatter_plot.html')
pyo.plot(fig_scatter, filename='charts/log_return_scatter_plot.html', auto_open=False)
def plot_predicted_vs_actual_prices(actual_prices, predicted_prices, timestamps=None, n_plot=200):
import plotly.offline as pyo
@ -136,7 +136,7 @@ def plot_predicted_vs_actual_prices(actual_prices, predicted_prices, timestamps=
hovermode='closest'
)
fig_line = go.Figure(data=data_line, layout=layout_line)
pyo.plot(fig_line, filename='price_line_plot.html')
pyo.plot(fig_line, filename='charts/price_line_plot.html', auto_open=False)
# Scatter plot: Predicted vs Actual
trace_scatter = go.Scatter(
@ -166,9 +166,9 @@ def plot_predicted_vs_actual_prices(actual_prices, predicted_prices, timestamps=
hovermode='closest'
)
fig_scatter = go.Figure(data=data_scatter, layout=layout_scatter)
pyo.plot(fig_scatter, filename='price_scatter_plot.html')
pyo.plot(fig_scatter, filename='charts/price_scatter_plot.html', auto_open=False)
def plot_prediction_error_distribution(predicted_prices, actual_prices, nbins=100):
def plot_prediction_error_distribution(predicted_prices, actual_prices, nbins=100, prefix=""):
"""
Plots the distribution of signed prediction errors between predicted and actual prices,
coloring negative errors (under-prediction) and positive errors (over-prediction) differently.
@ -181,19 +181,25 @@ def plot_prediction_error_distribution(predicted_prices, actual_prices, nbins=10
neg_errors = errors[errors < 0]
pos_errors = errors[errors >= 0]
# Calculate common bin edges
min_error = np.min(errors)
max_error = np.max(errors)
bin_edges = np.linspace(min_error, max_error, nbins + 1)
xbins = dict(start=min_error, end=max_error, size=(max_error - min_error) / nbins)
trace_neg = go.Histogram(
x=neg_errors,
nbinsx=nbins,
opacity=0.75,
marker=dict(color='blue'),
name='Negative Error (Under-prediction)'
name='Negative Error (Under-prediction)',
xbins=xbins
)
trace_pos = go.Histogram(
x=pos_errors,
nbinsx=nbins,
opacity=0.75,
marker=dict(color='orange'),
name='Positive Error (Over-prediction)'
name='Positive Error (Over-prediction)',
xbins=xbins
)
layout = go.Layout(
title='Distribution of Prediction Errors (Signed)',
@ -203,4 +209,110 @@ def plot_prediction_error_distribution(predicted_prices, actual_prices, nbins=10
bargap=0.05
)
fig = go.Figure(data=[trace_neg, trace_pos], layout=layout)
pyo.plot(fig, filename='prediction_error_distribution.html')
filename = f'charts/{prefix}_prediction_error_distribution.html'
pyo.plot(fig, filename=filename, auto_open=False)
def plot_directional_accuracy(actual_prices, predicted_prices, timestamps=None, n_plot=200):
"""
Plots the directional accuracy of predictions compared to actual price movements.
Shows whether the predicted direction matches the actual direction of price movement.
Args:
actual_prices: Array of actual price values
predicted_prices: Array of predicted price values
timestamps: Optional array of timestamps for x-axis
n_plot: Number of points to plot (default 200, plots last n_plot points)
"""
import plotly.graph_objs as go
import plotly.offline as pyo
import numpy as np
# Calculate price changes
actual_changes = np.diff(actual_prices)
predicted_changes = np.diff(predicted_prices)
# Determine if directions match
actual_direction = np.sign(actual_changes)
predicted_direction = np.sign(predicted_changes)
correct_direction = actual_direction == predicted_direction
# Get last n_plot points
actual_changes = actual_changes[-n_plot:]
predicted_changes = predicted_changes[-n_plot:]
correct_direction = correct_direction[-n_plot:]
if timestamps is not None:
x_values = timestamps[1:] # Skip first since we took diff
x_values = x_values[-n_plot:] # Get last n_plot points
else:
x_values = list(range(len(actual_changes)))
# Create traces for correct and incorrect predictions
correct_trace = go.Scatter(
x=np.array(x_values)[correct_direction],
y=actual_changes[correct_direction],
mode='markers',
name='Correct Direction',
marker=dict(color='green', size=8)
)
incorrect_trace = go.Scatter(
x=np.array(x_values)[~correct_direction],
y=actual_changes[~correct_direction],
mode='markers',
name='Incorrect Direction',
marker=dict(color='red', size=8)
)
# Calculate accuracy percentage
accuracy = np.mean(correct_direction) * 100
layout = go.Layout(
title=f'Directional Accuracy (Overall: {accuracy:.1f}%)',
xaxis=dict(title='Time' if timestamps is not None else 'Sample'),
yaxis=dict(title='Price Change'),
showlegend=True
)
fig = go.Figure(data=[correct_trace, incorrect_trace], layout=layout)
pyo.plot(fig, filename='charts/directional_accuracy.html', auto_open=False)
def plot_direction_transition_heatmap(actual_prices, predicted_prices, prefix=""):
"""
Plots a heatmap showing the frequency of each (actual, predicted) direction pair.
"""
import numpy as np
import plotly.graph_objs as go
import plotly.offline as pyo
# Calculate directions
actual_direction = np.sign(np.diff(actual_prices))
predicted_direction = np.sign(np.diff(predicted_prices))
# Build 3x3 matrix: rows=actual, cols=predicted, values=counts
# Map -1 -> 0, 0 -> 1, 1 -> 2 for indexing
mapping = {-1: 0, 0: 1, 1: 2}
matrix = np.zeros((3, 3), dtype=int)
for a, p in zip(actual_direction, predicted_direction):
matrix[mapping[a], mapping[p]] += 1
# Axis labels
directions = ['Down (-1)', 'No Change (0)', 'Up (+1)']
# Plot heatmap
heatmap = go.Heatmap(
z=matrix,
x=directions, # predicted
y=directions, # actual
colorscale='Viridis',
colorbar=dict(title='Count')
)
layout = go.Layout(
title='Direction Prediction Transition Matrix',
xaxis=dict(title='Predicted Direction'),
yaxis=dict(title='Actual Direction')
)
fig = go.Figure(data=[heatmap], layout=layout)
filename = f'charts/{prefix}_direction_transition_heatmap.html'
pyo.plot(fig, filename=filename, auto_open=False)