import os from datetime import datetime import time import numpy as np import pandas as pd import tensorflow as tf from sklearn.metrics import confusion_matrix, classification_report from sqlalchemy import create_engine from tensorflow.keras.models import Sequential from tensorflow.keras.regularizers import l1_l2 from tensorflow.keras.layers import LSTM, Dense, Dropout from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint from sklearn.model_selection import train_test_split from sklearn.preprocessing import RobustScaler import gc import matplotlib.pyplot as plt from scipy.signal import find_peaks from matplotlib.backends.backend_agg import FigureCanvasAgg from matplotlib.figure import Figure import matplotlib from trend_detector_simple import TrendDetectorSimple class BitcoinPricePredictor: def __init__(self, db_path, timeframe, model=None, timesteps=10, batch_size=32, learning_rate=0.001, epochs=50): self.db_path = db_path self.engine = create_engine(f'sqlite:///{self.db_path}') self.timesteps = timesteps self.batch_size = batch_size self.learning_rate = learning_rate self.epochs = epochs self.model = model self.X_train = None self.X_test = None self.y_train = None self.y_test = None self.history = None self.scaler = None self.timeframe = timeframe self.feature_columns = ['open', 'high', 'low', 'close', 'volume', 'hl_ratio', 'sma_7', 'sma_21', 'price_change'] self.df = None @staticmethod def reduce_mem_usage(df): """Optimize memory usage of the dataframe by downcasting numeric types.""" numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64'] start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtypes if col_type in numerics: # Only process numeric columns c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 print(f'Memory usage reduced from {start_mem:.2f} MB to {end_mem:.2f} MB ({(1 - end_mem/start_mem)*100:.1f}% reduction)') return df def add_essential_features(self, df): """Add technical indicators and features to the dataframe.""" print("Adding technical indicators and features...") df = df.copy() # Price ratio features df['hl_ratio'] = (df['high'] / df['low']).clip(lower=0.8, upper=1.2) # Moving averages df['sma_7'] = df['close'].rolling(window=7, min_periods=1).mean() df['sma_21'] = df['close'].rolling(window=21, min_periods=1).mean() df['sma_50'] = df['close'].rolling(window=50, min_periods=1).mean() # Exponential moving averages df['ema_12'] = df['close'].ewm(span=12, adjust=False).mean() df['ema_26'] = df['close'].ewm(span=26, adjust=False).mean() # MACD df['macd'] = df['ema_12'] - df['ema_26'] df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean() # RSI delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['rsi'] = 100 - (100 / (1 + rs)) # Bollinger Bands df['bb_middle'] = df['close'].rolling(window=20).mean() df['bb_std'] = df['close'].rolling(window=20).std() df['bb_upper'] = df['bb_middle'] + 2 * df['bb_std'] df['bb_lower'] = df['bb_middle'] - 2 * df['bb_std'] df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['bb_middle'] # Price changes df['price_change_1d'] = df['close'].pct_change(periods=1).clip(lower=-0.5, upper=0.5) df['price_change_3d'] = df['close'].pct_change(periods=3).clip(lower=-0.5, upper=0.5) df['price_change_7d'] = df['close'].pct_change(periods=7).clip(lower=-0.5, upper=0.5) # Volatility df['volatility'] = df['close'].rolling(window=14).std() / df['close'].rolling(window=14).mean() # Get trend indicators from TrendDetector # TrendDetector already expects lowercase columns, so no need for conversion if 'datetime' not in df.columns: df['datetime'] = df.index trend_detector = TrendDetectorSimple(df) trend_data, trend_analysis = trend_detector.detect_trends() # Add supertrend signals for i, st in enumerate(trend_analysis['supertrend']): df[f'supertrend_{i+1}'] = st['results']['trend'] # Add meta-supertrend consensus meta_results = trend_detector.calculate_metasupertrend(df, trend_analysis['supertrend']) df['metasupertrend'] = meta_results['meta_trends'] # Add SMA crossover signals df['sma_cross'] = np.where(trend_analysis['sma']['7'] > trend_analysis['sma']['15'], 1, -1) # Clean up NaN or infinite values df = df.fillna(0) df = df.replace([np.inf, -np.inf], 0) # Update feature columns list - exclude non-numeric columns self.feature_columns = [col for col in df.columns if col not in ['next_period_return', 'next_period_up', 'datetime']] print(f"Shape after adding features: {df.shape}") return df def create_sequences(self, data, target): """Create sequences of data for LSTM input with corresponding targets.""" x, y = [], [] for i in range(len(data) - self.timesteps): x.append(data[i:i + self.timesteps]) y.append(target[i + self.timesteps]) return np.array(x, dtype=np.float32), np.array(y, dtype=np.float32) def create_sequences_for_prediction(self, data): """Create sequences of data for prediction without targets.""" return np.array([data[i:i + self.timesteps] for i in range(len(data) - self.timesteps)], dtype=np.float32) def create_model(self, input_shape): """Create and compile the LSTM model architecture.""" model = Sequential([ LSTM(64, return_sequences=True, input_shape=input_shape, recurrent_dropout=0.2, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), Dropout(0.3), LSTM(32, return_sequences=True, recurrent_dropout=0.1, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)), Dropout(0.2), LSTM(16), Dropout(0.2), Dense(8, activation='relu'), Dense(1, activation='sigmoid') ]) optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate) model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy']) print(model.summary()) return model def load_data_csv(self, file_path): """Load Bitcoin price data from a CSV file.""" try: # Read the CSV file self.df = pd.read_csv(file_path) # Convert column names to lowercase self.df.columns = self.df.columns.str.lower() # Convert timestamp to datetime self.df['timestamp'] = pd.to_datetime(self.df['timestamp']) self.df.set_index('timestamp', inplace=True) if self.df is not None and not self.df.empty: print(f"Data loaded successfully from CSV. Shape: {self.df.shape}") else: print("Failed to load data. DataFrame is empty or None.") except Exception as e: print(f"Error loading CSV data: {str(e)}") self.df = None def load_data(self): """Load data from SQLite database.""" try: import sqlite3 conn = sqlite3.connect(self.db_path) self.df = pd.read_sql_query("SELECT * FROM bitcoin_data", conn) # Convert column names to lowercase self.df.columns = self.df.columns.str.lower() if self.df is not None and not self.df.empty: print(f"Data loaded successfully. Shape: {self.df.shape}") else: print("Failed to load data. DataFrame is empty or None.") conn.close() except Exception as e: print(f"Error loading database data: {str(e)}") self.df = None def prepare_data(self): """Prepare data for model training.""" start_time = time.time() self.df = self.add_essential_features(self.df) # Define target variable - binary classification for price movement self.df['next_period_return'] = self.df['close'].pct_change(periods=1).shift(-1).clip(lower=-0.5, upper=0.5) self.df['next_period_up'] = (self.df['next_period_return'] > 0).astype(np.int8) self.df = self.df.dropna() # Scale features self.scaler = RobustScaler() # Ensure we're only scaling numeric features numeric_features = [col for col in self.feature_columns if col != 'datetime' and pd.api.types.is_numeric_dtype(self.df[col])] self.df[numeric_features] = self.scaler.fit_transform(self.df[numeric_features]) # Create sequences for LSTM x, y = self.create_sequences(self.df[numeric_features].values, self.df['next_period_up'].values) print(f"Sequence shape: {x.shape}, Target shape: {y.shape}") # Class balance check class_distribution = np.bincount(y.astype(int)) print(f"Class distribution - 0: {class_distribution[0]}, 1: {class_distribution[1]}") print(f"Positive class ratio: {class_distribution[1]/len(y):.2f}") # Train-test split (chronological) split_idx = int(len(x) * 0.8) self.X_train, self.X_test = x[:split_idx], x[split_idx:] self.y_train, self.y_test = y[:split_idx], y[split_idx:] print(f"Training data shape: {self.X_train.shape}, Test data shape: {self.X_test.shape}") print(f"Data preparation completed in {time.time() - start_time:.2f} seconds") def resample_data(self, df): """Resample data to specified timeframe.""" print(f"Resampling data to {self.timeframe} timeframe...") df = df.resample(self.timeframe).agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' }) print(f"Shape after resampling: {df.shape}") return df def load_new_data_from_model(self): """Load new data and identify missing entries compared to the database.""" new_data = pd.read_csv("./data/btcusd_1-min_data.csv") # Convert column names to lowercase new_data.columns = new_data.columns.str.lower() new_data['timestamp'] = pd.to_datetime(new_data['timestamp'], unit='s') existing_data = pd.read_sql('SELECT * FROM bitcoin_data', self.engine, index_col='timestamp', parse_dates=['timestamp']) # Convert column names to lowercase existing_data.columns = existing_data.columns.str.lower() # Show the most recent entries in the database last_entries = existing_data.sort_index(ascending=False).head(10) print("Most recent entries in database:") print(last_entries) # Find missing data latest_timestamp = existing_data.index.max() missing_data = new_data[new_data['timestamp'] > latest_timestamp] print(f"New data total length: {len(new_data)}") print(f"Missing data entries: {len(missing_data)}") print(f"Existing data entries: {len(existing_data)}") return missing_data def preprocess_data(self, data): """Preprocess new data with feature engineering and scaling.""" # Add technical indicators data = self.add_essential_features(data) # Scale the features using the same scaler as training data if self.scaler is not None: data[self.feature_columns] = self.scaler.transform(data[self.feature_columns]) else: # If no scaler exists, fit a new one scaler = RobustScaler() data[self.feature_columns] = scaler.fit_transform(data[self.feature_columns]) return data def _prepare_prediction_data(self, new_data): """Helper method to prepare data for prediction.""" # Ensure the 'timestamp' column is present if 'timestamp' not in new_data.columns: raise ValueError("Input data must contain a 'timestamp' column.") # Convert 'timestamp' to datetime and set as index new_data['timestamp'] = pd.to_datetime(new_data['timestamp'], errors='coerce') new_data = new_data.dropna(subset=['timestamp']) # Drop rows where Timestamp is NaT new_data.set_index('timestamp', inplace=True) # Resample and aggregate data to the specified timeframe grouped_data = new_data.resample(self.timeframe).agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' }).reset_index() # Reset index to preserve 'timestamp' as a column if grouped_data.empty: print("No new data found.") return None # Preprocess the data grouped_data = self.preprocess_data(grouped_data) if grouped_data.empty: print("No new data after preprocessing.") return None # Create sequences for the model X = self.create_sequences_for_prediction(grouped_data[self.feature_columns].values) if len(X) == 0: print("Not enough data to create sequences.") return None, None return X, grouped_data def make_predictions_w_reality(self, new_data): """Make predictions and compare with actual outcomes.""" # Convert column names to lowercase if needed new_data.columns = new_data.columns.str.lower() prepared_data = self._prepare_prediction_data(new_data) if prepared_data is None: return None, None X, grouped_data = prepared_data # Generate predictions predictions = self.model.predict(X) # Trim 'grouped_data' to align with sequence length grouped_data = grouped_data.iloc[self.timesteps:] # Align with sequence length # Add predictions to the grouped_data DataFrame grouped_data['predictions'] = (predictions > 0.5).astype(int) grouped_data['prediction_probability'] = predictions # Calculate reality (actual price movement) grouped_data['reality'] = (grouped_data['close'].pct_change() > 0.005).astype(int) # Calculate accuracy grouped_data['correct'] = (grouped_data['predictions'] == grouped_data['reality']).astype(int) accuracy = grouped_data['correct'].mean() print(f"Prediction accuracy: {accuracy:.2f}") # Return predictions and reality return grouped_data[['timestamp', 'predictions', 'prediction_probability']], grouped_data['reality'] def make_predictions(self, new_data): """Make predictions on new data.""" # Convert column names to lowercase if needed new_data.columns = new_data.columns.str.lower() prepared_data = self._prepare_prediction_data(new_data) if prepared_data is None: return None X, grouped_data = prepared_data # Generate predictions predictions = self.model.predict(X) # Trim 'grouped_data' to align with sequence length grouped_data = grouped_data.iloc[self.timesteps:] # Add predictions to the grouped_data DataFrame grouped_data['predictions'] = (predictions > 0.5).astype(int) grouped_data['prediction_probability'] = predictions.flatten() # Return prediction results return grouped_data[['timestamp', 'predictions', 'prediction_probability']] def update_database(self, missing_data): """Update the database with the missing data.""" if missing_data.empty: print("No new data to add to the database.") return missing_data.to_sql('bitcoin_data', self.engine, if_exists='append', index=False) print(f"Database updated with {len(missing_data)} new rows.") def train_model(self): """Train the LSTM model with early stopping and checkpointing.""" if self.X_train is None or self.y_train is None: raise ValueError("Data not loaded. Call prepare_data() first.") # Create model directory if it doesn't exist os.makedirs("./models", exist_ok=True) # Configure TensorFlow for GPU memory self._configure_gpu_memory() # Create the model self.model = self.create_model(input_shape=(self.timesteps, len(self.feature_columns))) # Setup callbacks current_date = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') checkpoint_path = f"./models/model_checkpoint_{current_date}.keras" callbacks = [ EarlyStopping( monitor='val_loss', patience=10, restore_best_weights=True, verbose=1 ), ModelCheckpoint( filepath=checkpoint_path, save_best_only=True, monitor='val_loss', verbose=1 ) ] # Train the model print(f"Training model with {self.epochs} epochs and batch size {self.batch_size}...") self.history = self.model.fit( self.X_train, self.y_train, validation_data=(self.X_test, self.y_test), epochs=self.epochs, batch_size=self.batch_size, callbacks=callbacks, verbose=1 ) # Save the final model final_model_path = f"./models/model_final_{current_date}.keras" self.model.save(final_model_path) print(f"Model saved to {final_model_path}") def _configure_gpu_memory(self): """Configure TensorFlow to use GPU memory efficiently.""" gpus = tf.config.experimental.list_physical_devices('GPU') if gpus: try: # Limit TensorFlow to use only 80% of GPU memory for gpu in gpus: tf.config.experimental.set_virtual_device_configuration( gpu, [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048)] ) print("GPU memory limit set") except RuntimeError as e: print(f"GPU memory limit setting failed: {e}") def evaluate_model(self): """Evaluate the trained model on test data.""" if self.model is None: raise ValueError("Model not trained. Call train_model() first.") # Basic evaluation test_loss, test_accuracy = self.model.evaluate(self.X_test, self.y_test, verbose=1) print(f"\nTest Accuracy: {test_accuracy:.4f}") print(f"Test Loss: {test_loss:.4f}") # Make predictions on test data y_pred = (self.model.predict(self.X_test) > 0.5).astype(int) # Calculate confusion matrix cm = confusion_matrix(self.y_test, y_pred) print("\nConfusion Matrix:") print(cm) # Print classification report print("\nClassification Report:") print(classification_report(self.y_test, y_pred)) def plot_history(self): """Plot the training history metrics.""" if self.history is None: raise ValueError("No training history available. Train the model first.") plt.figure(figsize=(15, 6)) # Plot accuracy plt.subplot(1, 2, 1) plt.plot(self.history.history['accuracy'], label='Training Accuracy') plt.plot(self.history.history['val_accuracy'], label='Validation Accuracy') plt.title('Model Accuracy') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.legend(loc='lower right') plt.grid(True) # Plot loss plt.subplot(1, 2, 2) plt.plot(self.history.history['loss'], label='Training Loss') plt.plot(self.history.history['val_loss'], label='Validation Loss') plt.title('Model Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend(loc='upper right') plt.grid(True) plt.tight_layout() # Save the plot os.makedirs("./plots", exist_ok=True) current_date = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') plt.savefig(f"./plots/training_history_{current_date}.png") plt.show() def analyze_market_trends(self, window_size=100, prominence=0.01, height=None, threshold=0.0, distance=None): """Analyze market trends by finding local minima and maxima in the price data.""" matplotlib.use('TkAgg') # Use TkAgg backend for interactive plotting # Make sure data is loaded if not hasattr(self, 'df') or self.df is None: print("Data not loaded. Call load_and_prepare_data() first.") return # Get the closing prices prices = self.df['close'].values # Calculate prominence as a percentage of price range if provided as a relative value price_range = np.max(prices) - np.min(prices) if prominence < 1: # If prominence is provided as a relative value prominence_abs = prominence * price_range else: prominence_abs = prominence # Use provided distance or default to window_size if distance is None: distance = window_size # Find local maxima (peaks) peaks, peaks_props = find_peaks( prices, height=height, threshold=threshold, distance=distance, prominence=prominence_abs ) # Find local minima (valleys) valleys, valleys_props = find_peaks( -prices, height=-height if height is not None else None, threshold=threshold, distance=distance, prominence=prominence_abs ) # Create figure for trend analysis self._plot_trend_analysis(prices, peaks, valleys) # Print trend statistics self._print_trend_statistics(prices, peaks, valleys) return peaks, valleys def _plot_trend_analysis(self, prices, peaks, valleys): """Helper method to plot trend analysis.""" plt.figure(figsize=(14, 7)) # Plot the price data plt.plot(self.df.index, prices, label='Bitcoin Price') # Highlight the peaks and valleys plt.scatter(self.df.index[peaks], prices[peaks], color='green', s=100, marker='^', label='Local Maxima') plt.scatter(self.df.index[valleys], prices[valleys], color='red', s=100, marker='v', label='Local Minima') # Identify trends by connecting consecutive extrema all_points = np.sort(np.concatenate([peaks, valleys])) self.up_trends = [] self.down_trends = [] for i in range(len(all_points) - 1): start_idx = all_points[i] end_idx = all_points[i+1] # Determine if it's an uptrend or downtrend if start_idx in valleys and end_idx in peaks: # Uptrend plt.plot([self.df.index[start_idx], self.df.index[end_idx]], [prices[start_idx], prices[end_idx]], 'g-', linewidth=2, alpha=0.7) duration = end_idx - start_idx magnitude = prices[end_idx] - prices[start_idx] percent_change = 100 * magnitude / prices[start_idx] self.up_trends.append((duration, magnitude, percent_change)) elif start_idx in peaks and end_idx in valleys: # Downtrend plt.plot([self.df.index[start_idx], self.df.index[end_idx]], [prices[start_idx], prices[end_idx]], 'r-', linewidth=2, alpha=0.7) duration = end_idx - start_idx magnitude = prices[start_idx] - prices[end_idx] percent_change = 100 * magnitude / prices[start_idx] self.down_trends.append((duration, magnitude, percent_change)) plt.title('Bitcoin Price Trends Analysis') plt.xlabel('Date') plt.ylabel('Price') plt.legend() plt.grid(True) plt.tight_layout() plt.savefig('bitcoin_trends_analysis.png') plt.show(block=True) def _print_trend_statistics(self, prices, peaks, valleys): """Helper method to print trend statistics.""" print(f"Found {len(peaks)} local maxima and {len(valleys)} local minima") # Calculate average trend durations and magnitudes if hasattr(self, 'up_trends') and self.up_trends: avg_up_duration = sum(t[0] for t in self.up_trends) / len(self.up_trends) avg_up_magnitude = sum(t[1] for t in self.up_trends) / len(self.up_trends) avg_up_percent = sum(t[2] for t in self.up_trends) / len(self.up_trends) print(f"Average uptrend: {avg_up_duration:.1f} periods, {avg_up_magnitude:.2f} price change ({avg_up_percent:.2f}%)") if hasattr(self, 'down_trends') and self.down_trends: avg_down_duration = sum(t[0] for t in self.down_trends) / len(self.down_trends) avg_down_magnitude = sum(t[1] for t in self.down_trends) / len(self.down_trends) avg_down_percent = sum(t[2] for t in self.down_trends) / len(self.down_trends) print(f"Average downtrend: {avg_down_duration:.1f} periods, {avg_down_magnitude:.2f} price change ({avg_down_percent:.2f}%)")