WIP trend analysis

This commit is contained in:
Simon Moisy 2025-03-18 10:13:37 +08:00
parent 302be95ce7
commit 6d9189d0be
6 changed files with 318 additions and 15 deletions

View File

@ -15,6 +15,10 @@ from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
import gc
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from matplotlib.backends.backend_agg import FigureCanvasAgg
from matplotlib.figure import Figure
import matplotlib
class BitcoinPricePredictor:
@ -158,30 +162,39 @@ class BitcoinPricePredictor:
print(model.summary())
return model
def load_data(self):
import pandas as pd
import sqlite3
def load_and_prepare_data(self):
conn = sqlite3.connect(self.db_path)
self.df = pd.read_sql_query("SELECT * FROM bitcoin_data", conn)
if self.df is not None and not self.df.empty:
print(f"Data loaded successfully. Shape: {self.df.shape}")
else:
print("Failed to load data. DataFrame is empty or None.")
conn.close()
def prepare_data(self):
start_time = time.time()
print("Loading data from database...")
df = pd.read_sql('SELECT * FROM bitcoin_data', self.engine, index_col='Timestamp', parse_dates=['Timestamp'])
print(f"Initial dataset shape: {df.shape}")
print(f"Timeframe: {self.timeframe}")
#df = self.resample_data(df)
df = self.resample_data(df)
df = self.add_essential_features(df)
self.df = self.add_essential_features(self.df)
# Define target variable - binary classification for price movement
df['Next_Period_Return'] = df['Close'].pct_change(periods=1).shift(-1).clip(lower=-0.5, upper=0.5)
df['Next_Period_Up'] = (df['Next_Period_Return'] > 0).astype(np.int8)
df = df.dropna()
self.df['Next_Period_Return'] = self.df['Close'].pct_change(periods=1).shift(-1).clip(lower=-0.5, upper=0.5)
self.df['Next_Period_Up'] = (self.df['Next_Period_Return'] > 0).astype(np.int8)
self.df = self.df.dropna()
# Scale features
self.scaler = RobustScaler()
df[self.feature_columns] = self.scaler.fit_transform(df[self.feature_columns])
self.df[self.feature_columns] = self.scaler.fit_transform(self.df[self.feature_columns])
# Create sequences for LSTM
x, y = self.create_sequences(df[self.feature_columns].values, df['Next_Period_Up'].values)
x, y = self.create_sequences(self.df[self.feature_columns].values, self.df['Next_Period_Up'].values)
print(f"Sequence shape: {x.shape}, Target shape: {y.shape}")
# Class balance check
@ -195,8 +208,8 @@ class BitcoinPricePredictor:
y_train, y_test = y[:split_idx], y[split_idx:]
# Free memory
del df
gc.collect()
# del self.df
# gc.collect()
self.X_train, self.X_test = x_train, x_test
self.y_train, self.y_test = y_train, y_test
@ -494,3 +507,125 @@ class BitcoinPricePredictor:
plt.savefig(f"./plots/training_history_{current_date}.png")
plt.show()
def analyze_market_trends(self, window_size=100, prominence=0.01, height=None, threshold=0.0, distance=None):
"""
Analyze market trends by finding local minima and maxima in the price data.
Args:
window_size (int): Default distance between peaks if distance is not provided
prominence (float): Minimum prominence of peaks (relative to price range)
height (float): Minimum height of peaks (absolute value)
threshold (float): Required threshold of peaks relative to neighbors
distance (int): Minimum distance between peaks in number of data points
"""
matplotlib.use('TkAgg') # Use TkAgg backend for interactive plotting
# Make sure data is loaded
if not hasattr(self, 'df') or self.df is None:
print("Data not loaded. Call load_and_prepare_data() first.")
return
# Get the closing prices
prices = self.df['Close'].values
# Calculate prominence as a percentage of price range if provided as a relative value
price_range = np.max(prices) - np.min(prices)
if prominence < 1: # If prominence is provided as a relative value
prominence_abs = prominence * price_range
else:
prominence_abs = prominence
# Use provided distance or default to window_size
if distance is None:
distance = window_size
# Find local maxima (peaks) with adjustable parameters
peaks, peaks_props = find_peaks(
prices,
height=height,
threshold=threshold,
distance=distance,
prominence=prominence_abs
)
# Find local minima (valleys) by inverting the signal
valleys, valleys_props = find_peaks(
-prices,
height=-height if height is not None else None,
threshold=threshold,
distance=distance,
prominence=prominence_abs
)
# Create a new figure for trend analysis
plt.figure(figsize=(14, 7))
# Plot the price data
plt.plot(self.df.index, prices, label='Bitcoin Price')
# Highlight the peaks and valleys
plt.scatter(self.df.index[peaks], prices[peaks], color='green', s=100, marker='^', label='Local Maxima')
plt.scatter(self.df.index[valleys], prices[valleys], color='red', s=100, marker='v', label='Local Minima')
# Identify trends by connecting consecutive extrema
all_points = np.sort(np.concatenate([peaks, valleys]))
up_trends = []
down_trends = []
for i in range(len(all_points) - 1):
start_idx = all_points[i]
end_idx = all_points[i+1]
# Determine if it's an uptrend or downtrend
if start_idx in valleys and end_idx in peaks:
# Uptrend
plt.plot([self.df.index[start_idx], self.df.index[end_idx]],
[prices[start_idx], prices[end_idx]],
'g-', linewidth=2, alpha=0.7)
duration = end_idx - start_idx
magnitude = prices[end_idx] - prices[start_idx]
percent_change = 100 * magnitude / prices[start_idx]
up_trends.append((duration, magnitude, percent_change))
elif start_idx in peaks and end_idx in valleys:
# Downtrend
plt.plot([self.df.index[start_idx], self.df.index[end_idx]],
[prices[start_idx], prices[end_idx]],
'r-', linewidth=2, alpha=0.7)
duration = end_idx - start_idx
magnitude = prices[start_idx] - prices[end_idx]
percent_change = 100 * magnitude / prices[start_idx]
down_trends.append((duration, magnitude, percent_change))
plt.title(f'Bitcoin Price Trends Analysis\nParameters: prominence={prominence}, distance={distance}')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('bitcoin_trends_analysis.png')
# Print some statistics about the trends
print(f"Found {len(peaks)} local maxima and {len(valleys)} local minima")
# Calculate average trend durations and magnitudes
if up_trends:
avg_up_duration = sum(t[0] for t in up_trends) / len(up_trends)
avg_up_magnitude = sum(t[1] for t in up_trends) / len(up_trends)
avg_up_percent = sum(t[2] for t in up_trends) / len(up_trends)
print(f"Average uptrend: {avg_up_duration:.1f} periods, {avg_up_magnitude:.2f} price change ({avg_up_percent:.2f}%)")
if down_trends:
avg_down_duration = sum(t[0] for t in down_trends) / len(down_trends)
avg_down_magnitude = sum(t[1] for t in down_trends) / len(down_trends)
avg_down_percent = sum(t[2] for t in down_trends) / len(down_trends)
print(f"Average downtrend: {avg_down_duration:.1f} periods, {avg_down_magnitude:.2f} price change ({avg_down_percent:.2f}%)")
# Show the plot interactively
plt.show(block=True) # block=True ensures the plot window stays open
return peaks, valleys

168
bitcoin_trend_analysis.py Normal file
View File

@ -0,0 +1,168 @@
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import matplotlib
from sklearn.linear_model import LinearRegression
from matplotlib.widgets import Slider
class BitcoinTrendAnalysis:
def __init__(self, db_path):
self.df = None
self.db_path = db_path
self.engine = create_engine(f'sqlite:///{self.db_path}')
def load_data(self):
self.df = pd.read_sql(
"SELECT Timestamp, Close FROM bitcoin_data WHERE strftime('%Y', Timestamp) >= '2019'",
self.engine,
index_col='Timestamp',
parse_dates=['Timestamp']
)
if self.df is not None and not self.df.empty:
print(f"Data loaded successfully. Shape: {self.df.shape}")
else:
print("Failed to load data. DataFrame is empty or None.")
def adaptive_find_peaks(self, smooth_prices, window, factor, distance):
print(factor)
prominences = np.zeros_like(smooth_prices)
for i in range(len(smooth_prices)):
start = max(0, i - window // 2)
end = min(len(smooth_prices), i + window // 2)
local_max = np.max(smooth_prices[start:end])
local_min = np.min(smooth_prices[start:end])
prominences[i] = (local_max - local_min) * factor
print(prominences)
peaks, _ = find_peaks(smooth_prices, prominence=prominences, distance=distance)
valleys, _ = find_peaks(-smooth_prices, prominence=prominences, distance=distance)
return peaks, valleys, prominences
def analyze_trends_peaks(self, resample_window='D', smoothing_window=10, prominence_factor=0.5, window=30,
distance=None):
matplotlib.use('TkAgg')
if not hasattr(self, 'df') or self.df is None:
print("Data not loaded. Call load_and_prepare_data() first.")
return
self.df = self.df.resample(resample_window).agg({'Close': 'last'})
prices = self.df['Close'].values
smooth_prices = pd.Series(prices).rolling(window=smoothing_window).mean()
fig, ax = plt.subplots(figsize=(14, 7))
plt.subplots_adjust(bottom=0.25) # Space for widgets
ax2 = ax.twinx() # Secondary axis for prominence
# Initial peaks and prominences
peaks, valleys, prominences = self.adaptive_find_peaks(smooth_prices, window=window, factor=prominence_factor,
distance=distance)
# Plot main price curve
price_line, = ax.plot(self.df.index, smooth_prices, label='Bitcoin Smooth Price')
# Scatter plots for peaks/valleys
peaks_plot = ax.scatter(self.df.index[peaks], smooth_prices[peaks], color='green', s=100, marker='^',
label='Local Maxima')
valleys_plot = ax.scatter(self.df.index[valleys], smooth_prices[valleys], color='red', s=100, marker='v',
label='Local Minima')
# Prominence line on secondary y-axis
prominence_line, = ax2.plot(self.df.index, prominences, color="purple", linestyle="dashed", alpha=0.7,
label="Prominence")
ax2.set_ylabel("Prominence")
ax.set_title(f'Bitcoin Price Trends Analysis\nfactor={prominence_factor}')
ax.set_xlabel('Date')
ax.set_ylabel('Price')
ax.legend()
ax2.legend(loc="upper right")
ax.grid(True)
# Slider setup
ax_slider = plt.axes([0.2, 0.05, 0.65, 0.03]) # Positioning of slider
slider = Slider(ax_slider, 'Prom Factor', 0.1, 2.0, valinit=prominence_factor, valstep=0.05)
# Update function for slider
def update_plot(factor):
# Recalculate peaks and prominences
peaks, valleys, prominences = self.adaptive_find_peaks(smooth_prices.to_numpy(), window=window,
factor=factor, distance=distance)
print(len(peaks))
# Update scatter points for peaks
peaks_plot.set_offsets(np.column_stack([
(self.df.index[peaks] - np.datetime64('1970-01-01')) / np.timedelta64(1, 's'),
smooth_prices[peaks]
]))
# Update scatter points for valleys
valleys_plot.set_offsets(np.column_stack([
(self.df.index[valleys] - np.datetime64('1970-01-01')) / np.timedelta64(1, 's'),
smooth_prices[valleys]
]))
# Update prominence line
prominence_line.set_ydata(prominences)
# Update the title to reflect the current prominence factor
ax.set_title(f'Bitcoin Price Trends Analysis\nfactor={factor}')
# Redraw the figure
fig.canvas.draw_idle()
slider.on_changed(update_plot) # Update plot when slider changes
plt.show()
def analyze_trends_linear_regression(self):
if self.df is None or self.df.empty:
print("No data loaded.")
return
self.df['Timestamp_num'] = (self.df.index - self.df.index[0]).total_seconds()
x = self.df['Timestamp_num'].values.reshape(-1, 1)
y = self.df['Close'].values
model = LinearRegression()
model.fit(x, y)
trend_line = model.predict(x)
matplotlib.use('TkAgg')
fig, ax = plt.subplots(figsize=(14, 7))
plt.subplots_adjust(bottom=0.2)
ax.plot(self.df.index, self.df['Close'], label='Bitcoin Price', color='blue')
ax.plot(self.df.index, trend_line, label='Linear Trend', color='red', linestyle='dashed')
ax.set_title("Bitcoin Price Linear Trend")
ax.set_xlabel("Date")
ax.set_ylabel("Price")
ax.legend()
ax.grid(True)
def zoom(event):
scale_factor = 1.2 if event.button == 'up' else 0.8
xlim = ax.get_xlim()
x_range = (xlim[1] - xlim[0]) * scale_factor
x_mid = (xlim[0] + xlim[1]) / 2
ax.set_xlim(x_mid - x_range / 2, x_mid + x_range / 2)
ax.figure.canvas.draw()
def pan(event):
step = (ax.get_xlim()[1] - ax.get_xlim()[0]) * 0.1
if event.key == 'right':
ax.set_xlim(ax.get_xlim()[0] + step, ax.get_xlim()[1] + step)
elif event.key == 'left':
ax.set_xlim(ax.get_xlim()[0] - step, ax.get_xlim()[1] - step)
ax.figure.canvas.draw()
fig.canvas.mpl_connect('scroll_event', zoom)
fig.canvas.mpl_connect('key_press_event', pan)
plt.show()
slope = model.coef_[0]
print(f"Trend Slope: {slope:.6f} (positive = uptrend, negative = downtrend)")