659 lines
27 KiB
Python
659 lines
27 KiB
Python
import os
|
|
from datetime import datetime
|
|
import time
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import tensorflow as tf
|
|
from sklearn.metrics import confusion_matrix, classification_report
|
|
from sqlalchemy import create_engine
|
|
from tensorflow.keras.models import Sequential
|
|
from tensorflow.keras.regularizers import l1_l2
|
|
from tensorflow.keras.layers import LSTM, Dense, Dropout
|
|
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.preprocessing import RobustScaler
|
|
import gc
|
|
import matplotlib.pyplot as plt
|
|
from scipy.signal import find_peaks
|
|
from matplotlib.backends.backend_agg import FigureCanvasAgg
|
|
from matplotlib.figure import Figure
|
|
import matplotlib
|
|
from trend_detector_simple import TrendDetectorSimple
|
|
|
|
class BitcoinPricePredictor:
|
|
def __init__(self, db_path, timeframe, model=None, timesteps=10, batch_size=32, learning_rate=0.001, epochs=50):
|
|
self.db_path = db_path
|
|
self.engine = create_engine(f'sqlite:///{self.db_path}')
|
|
self.timesteps = timesteps
|
|
self.batch_size = batch_size
|
|
self.learning_rate = learning_rate
|
|
self.epochs = epochs
|
|
self.model = model
|
|
self.X_train = None
|
|
self.X_test = None
|
|
self.y_train = None
|
|
self.y_test = None
|
|
self.history = None
|
|
self.scaler = None
|
|
self.timeframe = timeframe
|
|
self.feature_columns = ['open', 'high', 'low', 'close', 'volume',
|
|
'hl_ratio', 'sma_7', 'sma_21', 'price_change']
|
|
self.df = None
|
|
|
|
@staticmethod
|
|
def reduce_mem_usage(df):
|
|
"""Optimize memory usage of the dataframe by downcasting numeric types."""
|
|
numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
|
|
start_mem = df.memory_usage().sum() / 1024**2
|
|
|
|
for col in df.columns:
|
|
col_type = df[col].dtypes
|
|
if col_type in numerics: # Only process numeric columns
|
|
c_min = df[col].min()
|
|
c_max = df[col].max()
|
|
if str(col_type)[:3] == 'int':
|
|
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
|
|
df[col] = df[col].astype(np.int8)
|
|
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
|
|
df[col] = df[col].astype(np.int16)
|
|
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
|
|
df[col] = df[col].astype(np.int32)
|
|
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
|
|
df[col] = df[col].astype(np.int64)
|
|
else:
|
|
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
|
|
df[col] = df[col].astype(np.float16)
|
|
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
|
|
df[col] = df[col].astype(np.float32)
|
|
else:
|
|
df[col] = df[col].astype(np.float64)
|
|
|
|
end_mem = df.memory_usage().sum() / 1024**2
|
|
print(f'Memory usage reduced from {start_mem:.2f} MB to {end_mem:.2f} MB ({(1 - end_mem/start_mem)*100:.1f}% reduction)')
|
|
return df
|
|
|
|
def add_essential_features(self, df):
|
|
"""Add technical indicators and features to the dataframe."""
|
|
print("Adding technical indicators and features...")
|
|
df = df.copy()
|
|
|
|
# Price ratio features
|
|
df['hl_ratio'] = (df['high'] / df['low']).clip(lower=0.8, upper=1.2)
|
|
|
|
# Moving averages
|
|
df['sma_7'] = df['close'].rolling(window=7, min_periods=1).mean()
|
|
df['sma_21'] = df['close'].rolling(window=21, min_periods=1).mean()
|
|
df['sma_50'] = df['close'].rolling(window=50, min_periods=1).mean()
|
|
|
|
# Exponential moving averages
|
|
df['ema_12'] = df['close'].ewm(span=12, adjust=False).mean()
|
|
df['ema_26'] = df['close'].ewm(span=26, adjust=False).mean()
|
|
|
|
# MACD
|
|
df['macd'] = df['ema_12'] - df['ema_26']
|
|
df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean()
|
|
|
|
# RSI
|
|
delta = df['close'].diff()
|
|
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
|
|
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
|
|
rs = gain / loss
|
|
df['rsi'] = 100 - (100 / (1 + rs))
|
|
|
|
# Bollinger Bands
|
|
df['bb_middle'] = df['close'].rolling(window=20).mean()
|
|
df['bb_std'] = df['close'].rolling(window=20).std()
|
|
df['bb_upper'] = df['bb_middle'] + 2 * df['bb_std']
|
|
df['bb_lower'] = df['bb_middle'] - 2 * df['bb_std']
|
|
df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['bb_middle']
|
|
|
|
# Price changes
|
|
df['price_change_1d'] = df['close'].pct_change(periods=1).clip(lower=-0.5, upper=0.5)
|
|
df['price_change_3d'] = df['close'].pct_change(periods=3).clip(lower=-0.5, upper=0.5)
|
|
df['price_change_7d'] = df['close'].pct_change(periods=7).clip(lower=-0.5, upper=0.5)
|
|
|
|
# Volatility
|
|
df['volatility'] = df['close'].rolling(window=14).std() / df['close'].rolling(window=14).mean()
|
|
|
|
# Get trend indicators from TrendDetector
|
|
# TrendDetector already expects lowercase columns, so no need for conversion
|
|
if 'datetime' not in df.columns:
|
|
df['datetime'] = df.index
|
|
|
|
trend_detector = TrendDetectorSimple(df)
|
|
trend_data, trend_analysis = trend_detector.detect_trends()
|
|
|
|
# Add supertrend signals
|
|
for i, st in enumerate(trend_analysis['supertrend']):
|
|
df[f'supertrend_{i+1}'] = st['results']['trend']
|
|
|
|
# Add meta-supertrend consensus
|
|
meta_results = trend_detector.calculate_metasupertrend(df, trend_analysis['supertrend'])
|
|
df['metasupertrend'] = meta_results['meta_trends']
|
|
|
|
# Add SMA crossover signals
|
|
df['sma_cross'] = np.where(trend_analysis['sma']['7'] > trend_analysis['sma']['15'], 1, -1)
|
|
|
|
# Clean up NaN or infinite values
|
|
df = df.fillna(0)
|
|
df = df.replace([np.inf, -np.inf], 0)
|
|
|
|
# Update feature columns list - exclude non-numeric columns
|
|
self.feature_columns = [col for col in df.columns if col not in ['next_period_return', 'next_period_up', 'datetime']]
|
|
|
|
print(f"Shape after adding features: {df.shape}")
|
|
return df
|
|
|
|
def create_sequences(self, data, target):
|
|
"""Create sequences of data for LSTM input with corresponding targets."""
|
|
x, y = [], []
|
|
for i in range(len(data) - self.timesteps):
|
|
x.append(data[i:i + self.timesteps])
|
|
y.append(target[i + self.timesteps])
|
|
return np.array(x, dtype=np.float32), np.array(y, dtype=np.float32)
|
|
|
|
def create_sequences_for_prediction(self, data):
|
|
"""Create sequences of data for prediction without targets."""
|
|
return np.array([data[i:i + self.timesteps] for i in range(len(data) - self.timesteps)], dtype=np.float32)
|
|
|
|
def create_model(self, input_shape):
|
|
"""Create and compile the LSTM model architecture."""
|
|
model = Sequential([
|
|
LSTM(64, return_sequences=True, input_shape=input_shape,
|
|
recurrent_dropout=0.2, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
|
|
Dropout(0.3),
|
|
LSTM(32, return_sequences=True, recurrent_dropout=0.1, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
|
|
Dropout(0.2),
|
|
LSTM(16),
|
|
Dropout(0.2),
|
|
Dense(8, activation='relu'),
|
|
Dense(1, activation='sigmoid')
|
|
])
|
|
|
|
optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
|
|
model.compile(optimizer=optimizer,
|
|
loss='binary_crossentropy',
|
|
metrics=['accuracy'])
|
|
|
|
print(model.summary())
|
|
return model
|
|
|
|
def load_data_csv(self, file_path):
|
|
"""Load Bitcoin price data from a CSV file."""
|
|
try:
|
|
# Read the CSV file
|
|
self.df = pd.read_csv(file_path)
|
|
|
|
# Convert column names to lowercase
|
|
self.df.columns = self.df.columns.str.lower()
|
|
|
|
# Convert timestamp to datetime
|
|
self.df['timestamp'] = pd.to_datetime(self.df['timestamp'])
|
|
self.df.set_index('timestamp', inplace=True)
|
|
|
|
if self.df is not None and not self.df.empty:
|
|
print(f"Data loaded successfully from CSV. Shape: {self.df.shape}")
|
|
else:
|
|
print("Failed to load data. DataFrame is empty or None.")
|
|
|
|
except Exception as e:
|
|
print(f"Error loading CSV data: {str(e)}")
|
|
self.df = None
|
|
|
|
def load_data(self):
|
|
"""Load data from SQLite database."""
|
|
try:
|
|
import sqlite3
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
|
self.df = pd.read_sql_query("SELECT * FROM bitcoin_data", conn)
|
|
|
|
# Convert column names to lowercase
|
|
self.df.columns = self.df.columns.str.lower()
|
|
|
|
if self.df is not None and not self.df.empty:
|
|
print(f"Data loaded successfully. Shape: {self.df.shape}")
|
|
else:
|
|
print("Failed to load data. DataFrame is empty or None.")
|
|
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Error loading database data: {str(e)}")
|
|
self.df = None
|
|
|
|
def prepare_data(self):
|
|
"""Prepare data for model training."""
|
|
start_time = time.time()
|
|
|
|
self.df = self.add_essential_features(self.df)
|
|
|
|
# Define target variable - binary classification for price movement
|
|
self.df['next_period_return'] = self.df['close'].pct_change(periods=1).shift(-1).clip(lower=-0.5, upper=0.5)
|
|
self.df['next_period_up'] = (self.df['next_period_return'] > 0).astype(np.int8)
|
|
self.df = self.df.dropna()
|
|
|
|
# Scale features
|
|
self.scaler = RobustScaler()
|
|
# Ensure we're only scaling numeric features
|
|
numeric_features = [col for col in self.feature_columns if col != 'datetime' and pd.api.types.is_numeric_dtype(self.df[col])]
|
|
self.df[numeric_features] = self.scaler.fit_transform(self.df[numeric_features])
|
|
|
|
# Create sequences for LSTM
|
|
x, y = self.create_sequences(self.df[numeric_features].values, self.df['next_period_up'].values)
|
|
print(f"Sequence shape: {x.shape}, Target shape: {y.shape}")
|
|
|
|
# Class balance check
|
|
class_distribution = np.bincount(y.astype(int))
|
|
print(f"Class distribution - 0: {class_distribution[0]}, 1: {class_distribution[1]}")
|
|
print(f"Positive class ratio: {class_distribution[1]/len(y):.2f}")
|
|
|
|
# Train-test split (chronological)
|
|
split_idx = int(len(x) * 0.8)
|
|
self.X_train, self.X_test = x[:split_idx], x[split_idx:]
|
|
self.y_train, self.y_test = y[:split_idx], y[split_idx:]
|
|
|
|
print(f"Training data shape: {self.X_train.shape}, Test data shape: {self.X_test.shape}")
|
|
print(f"Data preparation completed in {time.time() - start_time:.2f} seconds")
|
|
|
|
def resample_data(self, df):
|
|
"""Resample data to specified timeframe."""
|
|
print(f"Resampling data to {self.timeframe} timeframe...")
|
|
df = df.resample(self.timeframe).agg({
|
|
'open': 'first',
|
|
'high': 'max',
|
|
'low': 'min',
|
|
'close': 'last',
|
|
'volume': 'sum'
|
|
})
|
|
print(f"Shape after resampling: {df.shape}")
|
|
return df
|
|
|
|
def load_new_data_from_model(self):
|
|
"""Load new data and identify missing entries compared to the database."""
|
|
new_data = pd.read_csv("./data/btcusd_1-min_data.csv")
|
|
# Convert column names to lowercase
|
|
new_data.columns = new_data.columns.str.lower()
|
|
new_data['timestamp'] = pd.to_datetime(new_data['timestamp'], unit='s')
|
|
|
|
existing_data = pd.read_sql('SELECT * FROM bitcoin_data', self.engine, index_col='timestamp',
|
|
parse_dates=['timestamp'])
|
|
# Convert column names to lowercase
|
|
existing_data.columns = existing_data.columns.str.lower()
|
|
|
|
# Show the most recent entries in the database
|
|
last_entries = existing_data.sort_index(ascending=False).head(10)
|
|
print("Most recent entries in database:")
|
|
print(last_entries)
|
|
|
|
# Find missing data
|
|
latest_timestamp = existing_data.index.max()
|
|
missing_data = new_data[new_data['timestamp'] > latest_timestamp]
|
|
|
|
print(f"New data total length: {len(new_data)}")
|
|
print(f"Missing data entries: {len(missing_data)}")
|
|
print(f"Existing data entries: {len(existing_data)}")
|
|
|
|
return missing_data
|
|
|
|
def preprocess_data(self, data):
|
|
"""Preprocess new data with feature engineering and scaling."""
|
|
# Add technical indicators
|
|
data = self.add_essential_features(data)
|
|
|
|
# Scale the features using the same scaler as training data
|
|
if self.scaler is not None:
|
|
data[self.feature_columns] = self.scaler.transform(data[self.feature_columns])
|
|
else:
|
|
# If no scaler exists, fit a new one
|
|
scaler = RobustScaler()
|
|
data[self.feature_columns] = scaler.fit_transform(data[self.feature_columns])
|
|
|
|
return data
|
|
|
|
def _prepare_prediction_data(self, new_data):
|
|
"""Helper method to prepare data for prediction."""
|
|
# Ensure the 'timestamp' column is present
|
|
if 'timestamp' not in new_data.columns:
|
|
raise ValueError("Input data must contain a 'timestamp' column.")
|
|
|
|
# Convert 'timestamp' to datetime and set as index
|
|
new_data['timestamp'] = pd.to_datetime(new_data['timestamp'], errors='coerce')
|
|
new_data = new_data.dropna(subset=['timestamp']) # Drop rows where Timestamp is NaT
|
|
new_data.set_index('timestamp', inplace=True)
|
|
|
|
# Resample and aggregate data to the specified timeframe
|
|
grouped_data = new_data.resample(self.timeframe).agg({
|
|
'open': 'first',
|
|
'high': 'max',
|
|
'low': 'min',
|
|
'close': 'last',
|
|
'volume': 'sum'
|
|
}).reset_index() # Reset index to preserve 'timestamp' as a column
|
|
|
|
if grouped_data.empty:
|
|
print("No new data found.")
|
|
return None
|
|
|
|
# Preprocess the data
|
|
grouped_data = self.preprocess_data(grouped_data)
|
|
|
|
if grouped_data.empty:
|
|
print("No new data after preprocessing.")
|
|
return None
|
|
|
|
# Create sequences for the model
|
|
X = self.create_sequences_for_prediction(grouped_data[self.feature_columns].values)
|
|
|
|
if len(X) == 0:
|
|
print("Not enough data to create sequences.")
|
|
return None, None
|
|
|
|
return X, grouped_data
|
|
|
|
def make_predictions_w_reality(self, new_data):
|
|
"""Make predictions and compare with actual outcomes."""
|
|
# Convert column names to lowercase if needed
|
|
new_data.columns = new_data.columns.str.lower()
|
|
|
|
prepared_data = self._prepare_prediction_data(new_data)
|
|
if prepared_data is None:
|
|
return None, None
|
|
|
|
X, grouped_data = prepared_data
|
|
|
|
# Generate predictions
|
|
predictions = self.model.predict(X)
|
|
|
|
# Trim 'grouped_data' to align with sequence length
|
|
grouped_data = grouped_data.iloc[self.timesteps:] # Align with sequence length
|
|
|
|
# Add predictions to the grouped_data DataFrame
|
|
grouped_data['predictions'] = (predictions > 0.5).astype(int)
|
|
grouped_data['prediction_probability'] = predictions
|
|
|
|
# Calculate reality (actual price movement)
|
|
grouped_data['reality'] = (grouped_data['close'].pct_change() > 0.005).astype(int)
|
|
|
|
# Calculate accuracy
|
|
grouped_data['correct'] = (grouped_data['predictions'] == grouped_data['reality']).astype(int)
|
|
accuracy = grouped_data['correct'].mean()
|
|
print(f"Prediction accuracy: {accuracy:.2f}")
|
|
|
|
# Return predictions and reality
|
|
return grouped_data[['timestamp', 'predictions', 'prediction_probability']], grouped_data['reality']
|
|
|
|
def make_predictions(self, new_data):
|
|
"""Make predictions on new data."""
|
|
# Convert column names to lowercase if needed
|
|
new_data.columns = new_data.columns.str.lower()
|
|
|
|
prepared_data = self._prepare_prediction_data(new_data)
|
|
if prepared_data is None:
|
|
return None
|
|
|
|
X, grouped_data = prepared_data
|
|
|
|
# Generate predictions
|
|
predictions = self.model.predict(X)
|
|
|
|
# Trim 'grouped_data' to align with sequence length
|
|
grouped_data = grouped_data.iloc[self.timesteps:]
|
|
|
|
# Add predictions to the grouped_data DataFrame
|
|
grouped_data['predictions'] = (predictions > 0.5).astype(int)
|
|
grouped_data['prediction_probability'] = predictions.flatten()
|
|
|
|
# Return prediction results
|
|
return grouped_data[['timestamp', 'predictions', 'prediction_probability']]
|
|
|
|
def update_database(self, missing_data):
|
|
"""Update the database with the missing data."""
|
|
if missing_data.empty:
|
|
print("No new data to add to the database.")
|
|
return
|
|
|
|
missing_data.to_sql('bitcoin_data', self.engine, if_exists='append', index=False)
|
|
print(f"Database updated with {len(missing_data)} new rows.")
|
|
|
|
def train_model(self):
|
|
"""Train the LSTM model with early stopping and checkpointing."""
|
|
if self.X_train is None or self.y_train is None:
|
|
raise ValueError("Data not loaded. Call prepare_data() first.")
|
|
|
|
# Create model directory if it doesn't exist
|
|
os.makedirs("./models", exist_ok=True)
|
|
|
|
# Configure TensorFlow for GPU memory
|
|
self._configure_gpu_memory()
|
|
|
|
# Create the model
|
|
self.model = self.create_model(input_shape=(self.timesteps, len(self.feature_columns)))
|
|
|
|
# Setup callbacks
|
|
current_date = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
|
|
checkpoint_path = f"./models/model_checkpoint_{current_date}.keras"
|
|
|
|
callbacks = [
|
|
EarlyStopping(
|
|
monitor='val_loss',
|
|
patience=10,
|
|
restore_best_weights=True,
|
|
verbose=1
|
|
),
|
|
ModelCheckpoint(
|
|
filepath=checkpoint_path,
|
|
save_best_only=True,
|
|
monitor='val_loss',
|
|
verbose=1
|
|
)
|
|
]
|
|
|
|
# Train the model
|
|
print(f"Training model with {self.epochs} epochs and batch size {self.batch_size}...")
|
|
self.history = self.model.fit(
|
|
self.X_train, self.y_train,
|
|
validation_data=(self.X_test, self.y_test),
|
|
epochs=self.epochs,
|
|
batch_size=self.batch_size,
|
|
callbacks=callbacks,
|
|
verbose=1
|
|
)
|
|
|
|
# Save the final model
|
|
final_model_path = f"./models/model_final_{current_date}.keras"
|
|
self.model.save(final_model_path)
|
|
print(f"Model saved to {final_model_path}")
|
|
|
|
def _configure_gpu_memory(self):
|
|
"""Configure TensorFlow to use GPU memory efficiently."""
|
|
gpus = tf.config.experimental.list_physical_devices('GPU')
|
|
if gpus:
|
|
try:
|
|
# Limit TensorFlow to use only 80% of GPU memory
|
|
for gpu in gpus:
|
|
tf.config.experimental.set_virtual_device_configuration(
|
|
gpu,
|
|
[tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048)]
|
|
)
|
|
print("GPU memory limit set")
|
|
except RuntimeError as e:
|
|
print(f"GPU memory limit setting failed: {e}")
|
|
|
|
def evaluate_model(self):
|
|
"""Evaluate the trained model on test data."""
|
|
if self.model is None:
|
|
raise ValueError("Model not trained. Call train_model() first.")
|
|
|
|
# Basic evaluation
|
|
test_loss, test_accuracy = self.model.evaluate(self.X_test, self.y_test, verbose=1)
|
|
print(f"\nTest Accuracy: {test_accuracy:.4f}")
|
|
print(f"Test Loss: {test_loss:.4f}")
|
|
|
|
# Make predictions on test data
|
|
y_pred = (self.model.predict(self.X_test) > 0.5).astype(int)
|
|
|
|
# Calculate confusion matrix
|
|
cm = confusion_matrix(self.y_test, y_pred)
|
|
print("\nConfusion Matrix:")
|
|
print(cm)
|
|
|
|
# Print classification report
|
|
print("\nClassification Report:")
|
|
print(classification_report(self.y_test, y_pred))
|
|
|
|
def plot_history(self):
|
|
"""Plot the training history metrics."""
|
|
if self.history is None:
|
|
raise ValueError("No training history available. Train the model first.")
|
|
|
|
plt.figure(figsize=(15, 6))
|
|
|
|
# Plot accuracy
|
|
plt.subplot(1, 2, 1)
|
|
plt.plot(self.history.history['accuracy'], label='Training Accuracy')
|
|
plt.plot(self.history.history['val_accuracy'], label='Validation Accuracy')
|
|
plt.title('Model Accuracy')
|
|
plt.xlabel('Epoch')
|
|
plt.ylabel('Accuracy')
|
|
plt.legend(loc='lower right')
|
|
plt.grid(True)
|
|
|
|
# Plot loss
|
|
plt.subplot(1, 2, 2)
|
|
plt.plot(self.history.history['loss'], label='Training Loss')
|
|
plt.plot(self.history.history['val_loss'], label='Validation Loss')
|
|
plt.title('Model Loss')
|
|
plt.xlabel('Epoch')
|
|
plt.ylabel('Loss')
|
|
plt.legend(loc='upper right')
|
|
plt.grid(True)
|
|
|
|
plt.tight_layout()
|
|
|
|
# Save the plot
|
|
os.makedirs("./plots", exist_ok=True)
|
|
current_date = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
|
|
plt.savefig(f"./plots/training_history_{current_date}.png")
|
|
|
|
plt.show()
|
|
|
|
def analyze_market_trends(self, window_size=100, prominence=0.01, height=None, threshold=0.0, distance=None):
|
|
"""Analyze market trends by finding local minima and maxima in the price data."""
|
|
matplotlib.use('TkAgg') # Use TkAgg backend for interactive plotting
|
|
|
|
# Make sure data is loaded
|
|
if not hasattr(self, 'df') or self.df is None:
|
|
print("Data not loaded. Call load_and_prepare_data() first.")
|
|
return
|
|
|
|
# Get the closing prices
|
|
prices = self.df['close'].values
|
|
|
|
# Calculate prominence as a percentage of price range if provided as a relative value
|
|
price_range = np.max(prices) - np.min(prices)
|
|
if prominence < 1: # If prominence is provided as a relative value
|
|
prominence_abs = prominence * price_range
|
|
else:
|
|
prominence_abs = prominence
|
|
|
|
# Use provided distance or default to window_size
|
|
if distance is None:
|
|
distance = window_size
|
|
|
|
# Find local maxima (peaks)
|
|
peaks, peaks_props = find_peaks(
|
|
prices,
|
|
height=height,
|
|
threshold=threshold,
|
|
distance=distance,
|
|
prominence=prominence_abs
|
|
)
|
|
|
|
# Find local minima (valleys)
|
|
valleys, valleys_props = find_peaks(
|
|
-prices,
|
|
height=-height if height is not None else None,
|
|
threshold=threshold,
|
|
distance=distance,
|
|
prominence=prominence_abs
|
|
)
|
|
|
|
# Create figure for trend analysis
|
|
self._plot_trend_analysis(prices, peaks, valleys)
|
|
|
|
# Print trend statistics
|
|
self._print_trend_statistics(prices, peaks, valleys)
|
|
|
|
return peaks, valleys
|
|
|
|
def _plot_trend_analysis(self, prices, peaks, valleys):
|
|
"""Helper method to plot trend analysis."""
|
|
plt.figure(figsize=(14, 7))
|
|
|
|
# Plot the price data
|
|
plt.plot(self.df.index, prices, label='Bitcoin Price')
|
|
|
|
# Highlight the peaks and valleys
|
|
plt.scatter(self.df.index[peaks], prices[peaks], color='green', s=100, marker='^', label='Local Maxima')
|
|
plt.scatter(self.df.index[valleys], prices[valleys], color='red', s=100, marker='v', label='Local Minima')
|
|
|
|
# Identify trends by connecting consecutive extrema
|
|
all_points = np.sort(np.concatenate([peaks, valleys]))
|
|
|
|
self.up_trends = []
|
|
self.down_trends = []
|
|
|
|
for i in range(len(all_points) - 1):
|
|
start_idx = all_points[i]
|
|
end_idx = all_points[i+1]
|
|
|
|
# Determine if it's an uptrend or downtrend
|
|
if start_idx in valleys and end_idx in peaks:
|
|
# Uptrend
|
|
plt.plot([self.df.index[start_idx], self.df.index[end_idx]],
|
|
[prices[start_idx], prices[end_idx]],
|
|
'g-', linewidth=2, alpha=0.7)
|
|
|
|
duration = end_idx - start_idx
|
|
magnitude = prices[end_idx] - prices[start_idx]
|
|
percent_change = 100 * magnitude / prices[start_idx]
|
|
self.up_trends.append((duration, magnitude, percent_change))
|
|
|
|
elif start_idx in peaks and end_idx in valleys:
|
|
# Downtrend
|
|
plt.plot([self.df.index[start_idx], self.df.index[end_idx]],
|
|
[prices[start_idx], prices[end_idx]],
|
|
'r-', linewidth=2, alpha=0.7)
|
|
|
|
duration = end_idx - start_idx
|
|
magnitude = prices[start_idx] - prices[end_idx]
|
|
percent_change = 100 * magnitude / prices[start_idx]
|
|
self.down_trends.append((duration, magnitude, percent_change))
|
|
|
|
plt.title('Bitcoin Price Trends Analysis')
|
|
plt.xlabel('Date')
|
|
plt.ylabel('Price')
|
|
plt.legend()
|
|
plt.grid(True)
|
|
plt.tight_layout()
|
|
plt.savefig('bitcoin_trends_analysis.png')
|
|
plt.show(block=True)
|
|
|
|
def _print_trend_statistics(self, prices, peaks, valleys):
|
|
"""Helper method to print trend statistics."""
|
|
print(f"Found {len(peaks)} local maxima and {len(valleys)} local minima")
|
|
|
|
# Calculate average trend durations and magnitudes
|
|
if hasattr(self, 'up_trends') and self.up_trends:
|
|
avg_up_duration = sum(t[0] for t in self.up_trends) / len(self.up_trends)
|
|
avg_up_magnitude = sum(t[1] for t in self.up_trends) / len(self.up_trends)
|
|
avg_up_percent = sum(t[2] for t in self.up_trends) / len(self.up_trends)
|
|
print(f"Average uptrend: {avg_up_duration:.1f} periods, {avg_up_magnitude:.2f} price change ({avg_up_percent:.2f}%)")
|
|
|
|
if hasattr(self, 'down_trends') and self.down_trends:
|
|
avg_down_duration = sum(t[0] for t in self.down_trends) / len(self.down_trends)
|
|
avg_down_magnitude = sum(t[1] for t in self.down_trends) / len(self.down_trends)
|
|
avg_down_percent = sum(t[2] for t in self.down_trends) / len(self.down_trends)
|
|
print(f"Average downtrend: {avg_down_duration:.1f} periods, {avg_down_magnitude:.2f} price change ({avg_down_percent:.2f}%)")
|