import numpy as np import pandas as pd from sqlalchemy import create_engine from scipy.signal import find_peaks import matplotlib.pyplot as plt import matplotlib from sklearn.linear_model import LinearRegression from matplotlib.widgets import Slider class BitcoinTrendAnalysis: def __init__(self, db_path): self.df = None self.db_path = db_path self.engine = create_engine(f'sqlite:///{self.db_path}') def load_data(self): self.df = pd.read_sql( "SELECT Timestamp, Close FROM bitcoin_data WHERE strftime('%Y', Timestamp) >= '2019'", self.engine, index_col='Timestamp', parse_dates=['Timestamp'] ) if self.df is not None and not self.df.empty: print(f"Data loaded successfully. Shape: {self.df.shape}") else: print("Failed to load data. DataFrame is empty or None.") def adaptive_find_peaks(self, smooth_prices, window, factor, distance): print(factor) prominences = np.zeros_like(smooth_prices) for i in range(len(smooth_prices)): start = max(0, i - window // 2) end = min(len(smooth_prices), i + window // 2) local_max = np.max(smooth_prices[start:end]) local_min = np.min(smooth_prices[start:end]) prominences[i] = (local_max - local_min) * factor print(prominences) peaks, _ = find_peaks(smooth_prices, prominence=prominences, distance=distance) valleys, _ = find_peaks(-smooth_prices, prominence=prominences, distance=distance) return peaks, valleys, prominences def analyze_trends_peaks(self, resample_window='D', smoothing_window=10, prominence_factor=0.5, window=30, distance=None): matplotlib.use('TkAgg') if not hasattr(self, 'df') or self.df is None: print("Data not loaded. Call load_and_prepare_data() first.") return self.df = self.df.resample(resample_window).agg({'Close': 'last'}) prices = self.df['Close'].values smooth_prices = pd.Series(prices).rolling(window=smoothing_window).mean() fig, ax = plt.subplots(figsize=(14, 7)) plt.subplots_adjust(bottom=0.25) peaks, valleys, prominences = self.adaptive_find_peaks(smooth_prices, window=window, factor=prominence_factor, distance=distance) ax.plot(self.df.index, smooth_prices, label='Bitcoin Smooth Price') ax.scatter(self.df.index[peaks], smooth_prices[peaks], color='green', s=100, marker='^', label='Local Maxima') ax.scatter(self.df.index[valleys], smooth_prices[valleys], color='red', s=100, marker='v', label='Local Minima') for peak, valley in zip(peaks, valleys): ax.plot([self.df.index[peak], self.df.index[valley]], [smooth_prices[peak], smooth_prices[valley]], color='orange', lw=1) ax.set_title(f'Bitcoin Price Trends Analysis\nfactor={prominence_factor}') ax.set_xlabel('Date') ax.set_ylabel('Price') ax.legend() ax.grid(True) plt.show() def analyze_trends_linear_regression(self): if self.df is None or self.df.empty: print("No data loaded.") return self.df['Timestamp_num'] = (self.df.index - self.df.index[0]).total_seconds() x = self.df['Timestamp_num'].values.reshape(-1, 1) y = self.df['Close'].values model = LinearRegression() model.fit(x, y) trend_line = model.predict(x) matplotlib.use('TkAgg') fig, ax = plt.subplots(figsize=(14, 7)) plt.subplots_adjust(bottom=0.2) ax.plot(self.df.index, self.df['Close'], label='Bitcoin Price', color='blue') ax.plot(self.df.index, trend_line, label='Linear Trend', color='red', linestyle='dashed') ax.set_title("Bitcoin Price Linear Trend") ax.set_xlabel("Date") ax.set_ylabel("Price") ax.legend() ax.grid(True) def zoom(event): scale_factor = 1.2 if event.button == 'up' else 0.8 xlim = ax.get_xlim() x_range = (xlim[1] - xlim[0]) * scale_factor x_mid = (xlim[0] + xlim[1]) / 2 ax.set_xlim(x_mid - x_range / 2, x_mid + x_range / 2) ax.figure.canvas.draw() def pan(event): step = (ax.get_xlim()[1] - ax.get_xlim()[0]) * 0.1 if event.key == 'right': ax.set_xlim(ax.get_xlim()[0] + step, ax.get_xlim()[1] + step) elif event.key == 'left': ax.set_xlim(ax.get_xlim()[0] - step, ax.get_xlim()[1] - step) ax.figure.canvas.draw() fig.canvas.mpl_connect('scroll_event', zoom) fig.canvas.mpl_connect('key_press_event', pan) plt.show() slope = model.coef_[0] print(f"Trend Slope: {slope:.6f} (positive = uptrend, negative = downtrend)")