CryptoMarketParser/bitcoin_trend_analysis.py

168 lines
6.7 KiB
Python
Raw Normal View History

2025-03-18 10:13:37 +08:00
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import matplotlib
from sklearn.linear_model import LinearRegression
from matplotlib.widgets import Slider
class BitcoinTrendAnalysis:
def __init__(self, db_path):
self.df = None
self.db_path = db_path
self.engine = create_engine(f'sqlite:///{self.db_path}')
def load_data(self):
self.df = pd.read_sql(
"SELECT Timestamp, Close FROM bitcoin_data WHERE strftime('%Y', Timestamp) >= '2019'",
self.engine,
index_col='Timestamp',
parse_dates=['Timestamp']
)
if self.df is not None and not self.df.empty:
print(f"Data loaded successfully. Shape: {self.df.shape}")
else:
print("Failed to load data. DataFrame is empty or None.")
def adaptive_find_peaks(self, smooth_prices, window, factor, distance):
print(factor)
prominences = np.zeros_like(smooth_prices)
for i in range(len(smooth_prices)):
start = max(0, i - window // 2)
end = min(len(smooth_prices), i + window // 2)
local_max = np.max(smooth_prices[start:end])
local_min = np.min(smooth_prices[start:end])
prominences[i] = (local_max - local_min) * factor
print(prominences)
peaks, _ = find_peaks(smooth_prices, prominence=prominences, distance=distance)
valleys, _ = find_peaks(-smooth_prices, prominence=prominences, distance=distance)
return peaks, valleys, prominences
def analyze_trends_peaks(self, resample_window='D', smoothing_window=10, prominence_factor=0.5, window=30,
distance=None):
matplotlib.use('TkAgg')
if not hasattr(self, 'df') or self.df is None:
print("Data not loaded. Call load_and_prepare_data() first.")
return
self.df = self.df.resample(resample_window).agg({'Close': 'last'})
prices = self.df['Close'].values
smooth_prices = pd.Series(prices).rolling(window=smoothing_window).mean()
fig, ax = plt.subplots(figsize=(14, 7))
plt.subplots_adjust(bottom=0.25) # Space for widgets
ax2 = ax.twinx() # Secondary axis for prominence
# Initial peaks and prominences
peaks, valleys, prominences = self.adaptive_find_peaks(smooth_prices, window=window, factor=prominence_factor,
distance=distance)
# Plot main price curve
price_line, = ax.plot(self.df.index, smooth_prices, label='Bitcoin Smooth Price')
# Scatter plots for peaks/valleys
peaks_plot = ax.scatter(self.df.index[peaks], smooth_prices[peaks], color='green', s=100, marker='^',
label='Local Maxima')
valleys_plot = ax.scatter(self.df.index[valleys], smooth_prices[valleys], color='red', s=100, marker='v',
label='Local Minima')
# Prominence line on secondary y-axis
prominence_line, = ax2.plot(self.df.index, prominences, color="purple", linestyle="dashed", alpha=0.7,
label="Prominence")
ax2.set_ylabel("Prominence")
ax.set_title(f'Bitcoin Price Trends Analysis\nfactor={prominence_factor}')
ax.set_xlabel('Date')
ax.set_ylabel('Price')
ax.legend()
ax2.legend(loc="upper right")
ax.grid(True)
# Slider setup
ax_slider = plt.axes([0.2, 0.05, 0.65, 0.03]) # Positioning of slider
slider = Slider(ax_slider, 'Prom Factor', 0.1, 2.0, valinit=prominence_factor, valstep=0.05)
# Update function for slider
def update_plot(factor):
# Recalculate peaks and prominences
peaks, valleys, prominences = self.adaptive_find_peaks(smooth_prices.to_numpy(), window=window,
factor=factor, distance=distance)
print(len(peaks))
# Update scatter points for peaks
peaks_plot.set_offsets(np.column_stack([
(self.df.index[peaks] - np.datetime64('1970-01-01')) / np.timedelta64(1, 's'),
smooth_prices[peaks]
]))
# Update scatter points for valleys
valleys_plot.set_offsets(np.column_stack([
(self.df.index[valleys] - np.datetime64('1970-01-01')) / np.timedelta64(1, 's'),
smooth_prices[valleys]
]))
# Update prominence line
prominence_line.set_ydata(prominences)
# Update the title to reflect the current prominence factor
ax.set_title(f'Bitcoin Price Trends Analysis\nfactor={factor}')
# Redraw the figure
fig.canvas.draw_idle()
slider.on_changed(update_plot) # Update plot when slider changes
plt.show()
def analyze_trends_linear_regression(self):
if self.df is None or self.df.empty:
print("No data loaded.")
return
self.df['Timestamp_num'] = (self.df.index - self.df.index[0]).total_seconds()
x = self.df['Timestamp_num'].values.reshape(-1, 1)
y = self.df['Close'].values
model = LinearRegression()
model.fit(x, y)
trend_line = model.predict(x)
matplotlib.use('TkAgg')
fig, ax = plt.subplots(figsize=(14, 7))
plt.subplots_adjust(bottom=0.2)
ax.plot(self.df.index, self.df['Close'], label='Bitcoin Price', color='blue')
ax.plot(self.df.index, trend_line, label='Linear Trend', color='red', linestyle='dashed')
ax.set_title("Bitcoin Price Linear Trend")
ax.set_xlabel("Date")
ax.set_ylabel("Price")
ax.legend()
ax.grid(True)
def zoom(event):
scale_factor = 1.2 if event.button == 'up' else 0.8
xlim = ax.get_xlim()
x_range = (xlim[1] - xlim[0]) * scale_factor
x_mid = (xlim[0] + xlim[1]) / 2
ax.set_xlim(x_mid - x_range / 2, x_mid + x_range / 2)
ax.figure.canvas.draw()
def pan(event):
step = (ax.get_xlim()[1] - ax.get_xlim()[0]) * 0.1
if event.key == 'right':
ax.set_xlim(ax.get_xlim()[0] + step, ax.get_xlim()[1] + step)
elif event.key == 'left':
ax.set_xlim(ax.get_xlim()[0] - step, ax.get_xlim()[1] - step)
ax.figure.canvas.draw()
fig.canvas.mpl_connect('scroll_event', zoom)
fig.canvas.mpl_connect('key_press_event', pan)
plt.show()
slope = model.coef_[0]
print(f"Trend Slope: {slope:.6f} (positive = uptrend, negative = downtrend)")