187 lines
7.3 KiB
Python
187 lines
7.3 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
from scipy.signal import find_peaks
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib
|
|
from sklearn.linear_model import LinearRegression
|
|
from sqlalchemy import create_engine, Column, Integer, String, Float, MetaData, Table
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.exc import OperationalError
|
|
|
|
|
|
Base = declarative_base()
|
|
|
|
class PriceExtreme(Base):
|
|
__tablename__ = 'price_extremes'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
timestamp = Column(String)
|
|
price = Column(Float)
|
|
type = Column(String)
|
|
prominence = Column(Float)
|
|
|
|
|
|
class BitcoinTrendAnalysis:
|
|
def __init__(self, db_path):
|
|
self.df = None
|
|
self.db_path = db_path
|
|
self.engine = create_engine(f'sqlite:///{self.db_path}')
|
|
|
|
def load_data(self):
|
|
self.df = pd.read_sql(
|
|
"SELECT Timestamp, Close FROM bitcoin_data WHERE strftime('%Y', Timestamp) >= '2019'",
|
|
self.engine,
|
|
index_col='Timestamp',
|
|
parse_dates=['Timestamp']
|
|
)
|
|
|
|
if self.df is not None and not self.df.empty:
|
|
print(f"Data loaded successfully. Shape: {self.df.shape}")
|
|
else:
|
|
print("Failed to load data. DataFrame is empty or None.")
|
|
|
|
def adaptive_find_peaks(self, smooth_prices, window, factor, distance):
|
|
prominences = np.zeros_like(smooth_prices)
|
|
|
|
for i in range(len(smooth_prices)):
|
|
start = max(0, i - window // 2)
|
|
end = min(len(smooth_prices), i + window // 2)
|
|
local_max = np.max(smooth_prices[start:end])
|
|
local_min = np.min(smooth_prices[start:end])
|
|
prominences[i] = (local_max - local_min) * factor
|
|
|
|
peaks, _ = find_peaks(smooth_prices, prominence=prominences, distance=distance)
|
|
valleys, _ = find_peaks(-smooth_prices, prominence=prominences, distance=distance)
|
|
return peaks, valleys, prominences
|
|
|
|
def analyze_trends_peaks(self, resample_window='D', smoothing_window=1, prominence_factor=0.5, window=30,
|
|
distance=None):
|
|
matplotlib.use('TkAgg')
|
|
|
|
if not hasattr(self, 'df') or self.df is None:
|
|
print("Data not loaded. Call load_and_prepare_data() first.")
|
|
return
|
|
|
|
self.df = self.df.resample(resample_window).agg({'Close': 'last'})
|
|
prices = self.df['Close'].values
|
|
smooth_prices = pd.Series(prices).rolling(window=smoothing_window).mean()
|
|
print(f"Smooth prices: {len(smooth_prices)} vs prices {len(prices)}")
|
|
|
|
fig, ax = plt.subplots(figsize=(14, 7))
|
|
plt.subplots_adjust(bottom=0.25)
|
|
|
|
peaks, valleys, prominences = self.adaptive_find_peaks(smooth_prices, window=window, factor=prominence_factor,
|
|
distance=distance)
|
|
|
|
ax.plot(self.df.index, smooth_prices, label='Bitcoin Smooth Price')
|
|
ax.plot(self.df.index, prices, label='Bitcoin Price')
|
|
ax.scatter(self.df.index[peaks], smooth_prices[peaks], color='green', s=100, marker='^', label='Local Maxima')
|
|
ax.scatter(self.df.index[valleys], smooth_prices[valleys], color='red', s=100, marker='v', label='Local Minima')
|
|
|
|
ax.set_title(f'Bitcoin Price Trends Analysis\nfactor={prominence_factor}')
|
|
ax.set_xlabel('Date')
|
|
ax.set_ylabel('Price')
|
|
ax.legend()
|
|
ax.grid(True)
|
|
|
|
engine = create_engine('sqlite:///databases/bitcoin_trends.db')
|
|
Base.metadata.create_all(engine)
|
|
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
try:
|
|
session.query(PriceExtreme).delete()
|
|
except OperationalError as e:
|
|
print(f"Error occurred: {e}. The table may not exist.")
|
|
|
|
extremes_to_insert = []
|
|
|
|
with open(f'peaks_and_valleys_{resample_window}_{smoothing_window}_{prominence_factor}_{window}_{distance}.txt', 'w') as file:
|
|
for peak in peaks:
|
|
peak_date = self.df.index[peak].strftime('%Y-%m-%d %H:%M:%S')
|
|
peak_price = float(smooth_prices[peak])
|
|
peak_prominence = float(prominences[peak])
|
|
extremes_to_insert.append(
|
|
PriceExtreme(
|
|
timestamp=peak_date,
|
|
price=peak_price,
|
|
type='peak',
|
|
prominence=peak_prominence
|
|
)
|
|
)
|
|
file.write(f"Peak: {peak_date}, Price: {peak_price}, Prominence: {peak_prominence}\n")
|
|
|
|
for valley in valleys:
|
|
valley_date = self.df.index[valley].strftime('%Y-%m-%d %H:%M:%S')
|
|
valley_price = float(smooth_prices[valley])
|
|
valley_prominence = float(prominences[valley])
|
|
extremes_to_insert.append(
|
|
PriceExtreme(
|
|
timestamp=valley_date,
|
|
price=valley_price,
|
|
type='valley',
|
|
prominence=valley_prominence
|
|
)
|
|
)
|
|
file.write(f"Valley: {valley_date}, Price: {valley_price}, Prominence: {valley_prominence}\n")
|
|
|
|
session.bulk_save_objects(extremes_to_insert)
|
|
|
|
session.commit()
|
|
session.close()
|
|
|
|
print(f"Saved {len(peaks)} peaks and {len(valleys)} valleys to bitcoin_trends.db")
|
|
print("Peaks and valleys written to peaks_and_valleys.txt")
|
|
|
|
plt.show()
|
|
|
|
def analyze_trends_linear_regression(self):
|
|
if self.df is None or self.df.empty:
|
|
print("No data loaded.")
|
|
return
|
|
|
|
self.df['Timestamp_num'] = (self.df.index - self.df.index[0]).total_seconds()
|
|
x = self.df['Timestamp_num'].values.reshape(-1, 1)
|
|
y = self.df['Close'].values
|
|
|
|
model = LinearRegression()
|
|
model.fit(x, y)
|
|
trend_line = model.predict(x)
|
|
|
|
matplotlib.use('TkAgg')
|
|
fig, ax = plt.subplots(figsize=(14, 7))
|
|
plt.subplots_adjust(bottom=0.2)
|
|
|
|
ax.plot(self.df.index, self.df['Close'], label='Bitcoin Price', color='blue')
|
|
ax.plot(self.df.index, trend_line, label='Linear Trend', color='red', linestyle='dashed')
|
|
ax.set_title("Bitcoin Price Linear Trend")
|
|
ax.set_xlabel("Date")
|
|
ax.set_ylabel("Price")
|
|
ax.legend()
|
|
ax.grid(True)
|
|
|
|
def zoom(event):
|
|
scale_factor = 1.2 if event.button == 'up' else 0.8
|
|
xlim = ax.get_xlim()
|
|
x_range = (xlim[1] - xlim[0]) * scale_factor
|
|
x_mid = (xlim[0] + xlim[1]) / 2
|
|
ax.set_xlim(x_mid - x_range / 2, x_mid + x_range / 2)
|
|
ax.figure.canvas.draw()
|
|
|
|
def pan(event):
|
|
step = (ax.get_xlim()[1] - ax.get_xlim()[0]) * 0.1
|
|
if event.key == 'right':
|
|
ax.set_xlim(ax.get_xlim()[0] + step, ax.get_xlim()[1] + step)
|
|
elif event.key == 'left':
|
|
ax.set_xlim(ax.get_xlim()[0] - step, ax.get_xlim()[1] - step)
|
|
ax.figure.canvas.draw()
|
|
|
|
fig.canvas.mpl_connect('scroll_event', zoom)
|
|
fig.canvas.mpl_connect('key_press_event', pan)
|
|
plt.show()
|
|
|
|
slope = model.coef_[0]
|
|
print(f"Trend Slope: {slope:.6f} (positive = uptrend, negative = downtrend)") |