CryptoMarketParser/bitcoin_trend_analysis.py
2025-03-25 08:16:13 +08:00

187 lines
7.3 KiB
Python

import numpy as np
import pandas as pd
from sqlalchemy import create_engine
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import matplotlib
from sklearn.linear_model import LinearRegression
from sqlalchemy import create_engine, Column, Integer, String, Float, MetaData, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import OperationalError
Base = declarative_base()
class PriceExtreme(Base):
__tablename__ = 'price_extremes'
id = Column(Integer, primary_key=True, autoincrement=True)
timestamp = Column(String)
price = Column(Float)
type = Column(String)
prominence = Column(Float)
class BitcoinTrendAnalysis:
def __init__(self, db_path):
self.df = None
self.db_path = db_path
self.engine = create_engine(f'sqlite:///{self.db_path}')
def load_data(self):
self.df = pd.read_sql(
"SELECT Timestamp, Close FROM bitcoin_data WHERE strftime('%Y', Timestamp) >= '2019'",
self.engine,
index_col='Timestamp',
parse_dates=['Timestamp']
)
if self.df is not None and not self.df.empty:
print(f"Data loaded successfully. Shape: {self.df.shape}")
else:
print("Failed to load data. DataFrame is empty or None.")
def adaptive_find_peaks(self, smooth_prices, window, factor, distance):
prominences = np.zeros_like(smooth_prices)
for i in range(len(smooth_prices)):
start = max(0, i - window // 2)
end = min(len(smooth_prices), i + window // 2)
local_max = np.max(smooth_prices[start:end])
local_min = np.min(smooth_prices[start:end])
prominences[i] = (local_max - local_min) * factor
peaks, _ = find_peaks(smooth_prices, prominence=prominences, distance=distance)
valleys, _ = find_peaks(-smooth_prices, prominence=prominences, distance=distance)
return peaks, valleys, prominences
def analyze_trends_peaks(self, resample_window='D', smoothing_window=1, prominence_factor=0.5, window=30,
distance=None):
matplotlib.use('TkAgg')
if not hasattr(self, 'df') or self.df is None:
print("Data not loaded. Call load_and_prepare_data() first.")
return
self.df = self.df.resample(resample_window).agg({'Close': 'last'})
prices = self.df['Close'].values
smooth_prices = pd.Series(prices).rolling(window=smoothing_window).mean()
print(f"Smooth prices: {len(smooth_prices)} vs prices {len(prices)}")
fig, ax = plt.subplots(figsize=(14, 7))
plt.subplots_adjust(bottom=0.25)
peaks, valleys, prominences = self.adaptive_find_peaks(smooth_prices, window=window, factor=prominence_factor,
distance=distance)
ax.plot(self.df.index, smooth_prices, label='Bitcoin Smooth Price')
ax.plot(self.df.index, prices, label='Bitcoin Price')
ax.scatter(self.df.index[peaks], smooth_prices[peaks], color='green', s=100, marker='^', label='Local Maxima')
ax.scatter(self.df.index[valleys], smooth_prices[valleys], color='red', s=100, marker='v', label='Local Minima')
ax.set_title(f'Bitcoin Price Trends Analysis\nfactor={prominence_factor}')
ax.set_xlabel('Date')
ax.set_ylabel('Price')
ax.legend()
ax.grid(True)
engine = create_engine('sqlite:///databases/bitcoin_trends.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
try:
session.query(PriceExtreme).delete()
except OperationalError as e:
print(f"Error occurred: {e}. The table may not exist.")
extremes_to_insert = []
with open(f'peaks_and_valleys_{resample_window}_{smoothing_window}_{prominence_factor}_{window}_{distance}.txt', 'w') as file:
for peak in peaks:
peak_date = self.df.index[peak].strftime('%Y-%m-%d %H:%M:%S')
peak_price = float(smooth_prices[peak])
peak_prominence = float(prominences[peak])
extremes_to_insert.append(
PriceExtreme(
timestamp=peak_date,
price=peak_price,
type='peak',
prominence=peak_prominence
)
)
file.write(f"Peak: {peak_date}, Price: {peak_price}, Prominence: {peak_prominence}\n")
for valley in valleys:
valley_date = self.df.index[valley].strftime('%Y-%m-%d %H:%M:%S')
valley_price = float(smooth_prices[valley])
valley_prominence = float(prominences[valley])
extremes_to_insert.append(
PriceExtreme(
timestamp=valley_date,
price=valley_price,
type='valley',
prominence=valley_prominence
)
)
file.write(f"Valley: {valley_date}, Price: {valley_price}, Prominence: {valley_prominence}\n")
session.bulk_save_objects(extremes_to_insert)
session.commit()
session.close()
print(f"Saved {len(peaks)} peaks and {len(valleys)} valleys to bitcoin_trends.db")
print("Peaks and valleys written to peaks_and_valleys.txt")
plt.show()
def analyze_trends_linear_regression(self):
if self.df is None or self.df.empty:
print("No data loaded.")
return
self.df['Timestamp_num'] = (self.df.index - self.df.index[0]).total_seconds()
x = self.df['Timestamp_num'].values.reshape(-1, 1)
y = self.df['Close'].values
model = LinearRegression()
model.fit(x, y)
trend_line = model.predict(x)
matplotlib.use('TkAgg')
fig, ax = plt.subplots(figsize=(14, 7))
plt.subplots_adjust(bottom=0.2)
ax.plot(self.df.index, self.df['Close'], label='Bitcoin Price', color='blue')
ax.plot(self.df.index, trend_line, label='Linear Trend', color='red', linestyle='dashed')
ax.set_title("Bitcoin Price Linear Trend")
ax.set_xlabel("Date")
ax.set_ylabel("Price")
ax.legend()
ax.grid(True)
def zoom(event):
scale_factor = 1.2 if event.button == 'up' else 0.8
xlim = ax.get_xlim()
x_range = (xlim[1] - xlim[0]) * scale_factor
x_mid = (xlim[0] + xlim[1]) / 2
ax.set_xlim(x_mid - x_range / 2, x_mid + x_range / 2)
ax.figure.canvas.draw()
def pan(event):
step = (ax.get_xlim()[1] - ax.get_xlim()[0]) * 0.1
if event.key == 'right':
ax.set_xlim(ax.get_xlim()[0] + step, ax.get_xlim()[1] + step)
elif event.key == 'left':
ax.set_xlim(ax.get_xlim()[0] - step, ax.get_xlim()[1] - step)
ax.figure.canvas.draw()
fig.canvas.mpl_connect('scroll_event', zoom)
fig.canvas.mpl_connect('key_press_event', pan)
plt.show()
slope = model.coef_[0]
print(f"Trend Slope: {slope:.6f} (positive = uptrend, negative = downtrend)")