import os from datetime import datetime import time import numpy as np import pandas as pd import tensorflow as tf from sklearn.metrics import confusion_matrix, classification_report from sqlalchemy import create_engine from tensorflow.keras.models import Sequential from tensorflow.keras.regularizers import l1_l2 from tensorflow.keras.layers import LSTM, Dense, Dropout from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint from sklearn.model_selection import train_test_split from sklearn.preprocessing import RobustScaler import gc import matplotlib.pyplot as plt class BitcoinPricePredictor: def __init__(self, db_path, timeframe, model=None, timesteps=10, batch_size=8, learning_rate=0.001, epochs=50): self.db_path = db_path self.engine = create_engine(f'sqlite:///{self.db_path}') self.timesteps = timesteps self.batch_size = batch_size self.learning_rate = learning_rate self.epochs = epochs self.model = model self.X_train = None self.X_test = None self.y_train = None self.y_test = None self.history = None self.scaler = None self.timeframe = timeframe self.feature_columns = ['Open', 'High', 'Low', 'Close', 'Volume', 'HL_Ratio', 'SMA_7', 'SMA_21', 'Price_Change'] @staticmethod def reduce_mem_usage(df): """Optimize memory usage of the dataframe by downcasting numeric types.""" numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64'] start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtypes if col_type in numerics: # Only process numeric columns c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 print(f'Memory usage reduced from {start_mem:.2f} MB to {end_mem:.2f} MB ({(1 - end_mem/start_mem)*100:.1f}% reduction)') return df def add_essential_features(self, df): print("Adding technical indicators and features...") df = df.copy() # Price ratio features df['HL_Ratio'] = (df['High'] / df['Low']).clip(lower=0.8, upper=1.2) # Moving averages with different timeframes df['SMA_7'] = df['Close'].rolling(window=7, min_periods=1).mean() df['SMA_21'] = df['Close'].rolling(window=21, min_periods=1).mean() df['SMA_50'] = df['Close'].rolling(window=50, min_periods=1).mean() # Exponential moving averages df['EMA_12'] = df['Close'].ewm(span=12, adjust=False).mean() df['EMA_26'] = df['Close'].ewm(span=26, adjust=False).mean() # MACD df['MACD'] = df['EMA_12'] - df['EMA_26'] df['MACD_Signal'] = df['MACD'].ewm(span=9, adjust=False).mean() # Relative Strength Index (RSI) delta = df['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['RSI'] = 100 - (100 / (1 + rs)) # Bollinger Bands df['BB_Middle'] = df['Close'].rolling(window=20).mean() df['BB_Std'] = df['Close'].rolling(window=20).std() df['BB_Upper'] = df['BB_Middle'] + 2 * df['BB_Std'] df['BB_Lower'] = df['BB_Middle'] - 2 * df['BB_Std'] df['BB_Width'] = (df['BB_Upper'] - df['BB_Lower']) / df['BB_Middle'] # Price changes at different timeframes df['Price_Change_1d'] = df['Close'].pct_change(periods=1).clip(lower=-0.5, upper=0.5) df['Price_Change_3d'] = df['Close'].pct_change(periods=3).clip(lower=-0.5, upper=0.5) df['Price_Change_7d'] = df['Close'].pct_change(periods=7).clip(lower=-0.5, upper=0.5) # Volatility df['Volatility'] = df['Close'].rolling(window=14).std() / df['Close'].rolling(window=14).mean() # Clean up any NaN or infinite values df = df.fillna(0) df = df.replace([np.inf, -np.inf], 0) # Update feature columns list self.feature_columns = [col for col in df.columns if col not in ['Next_Period_Return', 'Next_Period_Up']] print(f"Shape after adding features: {df.shape}") return df def create_sequences(self, data, target): """Create sequences of data for LSTM input with corresponding targets.""" x, y = [], [] for i in range(len(data) - self.timesteps): x.append(data[i:i + self.timesteps]) y.append(target[i + self.timesteps]) return np.array(x, dtype=np.float32), np.array(y, dtype=np.float32) def create_sequences_for_prediction(self, data): """Create sequences of data for prediction without targets.""" x = [] for i in range(len(data) - self.timesteps): x.append(data[i:i + self.timesteps]) return np.array(x, dtype=np.float32) def create_model(self, input_shape): """Create and compile the LSTM model architecture.""" model = Sequential([ LSTM(64, return_sequences=True, input_shape=input_shape, recurrent_dropout=0.2, kernel_regularizer=tf.keras.regularizers.l1_l2(l1=1e-5, l2=1e-4)), Dropout(0.3), LSTM(32, return_sequences=True, recurrent_dropout=0.1, kernel_regularizer=tf.keras.regularizers.l1_l2(l1=1e-5, l2=1e-4)), Dropout(0.2), LSTM(16), Dropout(0.2), Dense(8, activation='relu'), Dense(1, activation='sigmoid') ]) optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate) model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy']) print(model.summary()) return model def load_and_prepare_data(self): start_time = time.time() print("Loading data from database...") df = pd.read_sql('SELECT * FROM bitcoin_data', self.engine, index_col='Timestamp', parse_dates=['Timestamp']) print(f"Initial dataset shape: {df.shape}") print(f"Timeframe: {self.timeframe}") df = self.resample_data(df) df = self.add_essential_features(df) # Define target variable - binary classification for price movement df['Next_Period_Return'] = df['Close'].pct_change(periods=1).shift(-1).clip(lower=-0.5, upper=0.5) df['Next_Period_Up'] = (df['Next_Period_Return'] > 0).astype(np.int8) df = df.dropna() # Scale features self.scaler = RobustScaler() df[self.feature_columns] = self.scaler.fit_transform(df[self.feature_columns]) # Create sequences for LSTM x, y = self.create_sequences(df[self.feature_columns].values, df['Next_Period_Up'].values) print(f"Sequence shape: {x.shape}, Target shape: {y.shape}") # Class balance check class_distribution = np.bincount(y.astype(int)) print(f"Class distribution - 0: {class_distribution[0]}, 1: {class_distribution[1]}") print(f"Positive class ratio: {class_distribution[1]/len(y):.2f}") # Train-test split (chronological) split_idx = int(len(x) * 0.8) x_train, x_test = x[:split_idx], x[split_idx:] y_train, y_test = y[:split_idx], y[split_idx:] # Free memory del df gc.collect() self.X_train, self.X_test = x_train, x_test self.y_train, self.y_test = y_train, y_test print(f"Training data shape: {self.X_train.shape}, Test data shape: {self.X_test.shape}") class_counts = np.bincount(self.y_train.astype(int)) print(f"Class distribution in training data: 0={class_counts[0]}, 1={class_counts[1]}") print(f"Data preparation completed in {time.time() - start_time:.2f} seconds") def resample_data(self, df): print(f"Resampling data to {self.timeframe} timeframe...") df = df.resample(self.timeframe).agg({ 'Open': 'first', 'High': 'max', 'Low': 'min', 'Close': 'last', 'Volume': 'sum' }) print(f"Shape after resampling: {df.shape}") return df def load_new_data_from_model(self): """Load new data and identify missing entries compared to the database.""" new_data = pd.read_csv("./data/btcusd_1-min_data.csv") new_data['Timestamp'] = pd.to_datetime(new_data['Timestamp'], unit='s') existing_data = pd.read_sql('SELECT * FROM bitcoin_data', self.engine, index_col='Timestamp', parse_dates=['Timestamp']) # Show the most recent entries in the database last_entries = existing_data.sort_index(ascending=False).head(10) print("Most recent entries in database:") print(last_entries) # Find missing data latest_timestamp = existing_data.index.max() missing_data = new_data[new_data['Timestamp'] > latest_timestamp] print(f"New data total length: {len(new_data)}") print(f"Missing data entries: {len(missing_data)}") print(f"Existing data entries: {len(existing_data)}") return missing_data def preprocess_data(self, data): """Preprocess new data with feature engineering and scaling.""" # Add technical indicators data = self.add_essential_features(data) # Scale the features using the same scaler as training data if self.scaler is not None: data[self.feature_columns] = self.scaler.transform(data[self.feature_columns]) else: # If no scaler exists, fit a new one scaler = RobustScaler() data[self.feature_columns] = scaler.fit_transform(data[self.feature_columns]) return data def make_predictions_w_reality(self, new_data): """Make predictions and compare with actual outcomes.""" # Ensure the 'Timestamp' column is present if 'Timestamp' not in new_data.columns: raise ValueError("Input data must contain a 'Timestamp' column.") # Convert 'Timestamp' to datetime and set as index new_data['Timestamp'] = pd.to_datetime(new_data['Timestamp'], errors='coerce') new_data = new_data.dropna(subset=['Timestamp']) # Drop rows where Timestamp is NaT new_data.set_index('Timestamp', inplace=True) # Resample and aggregate data to the specified timeframe grouped_data = new_data.resample(self.timeframe).agg({ 'Open': 'first', 'High': 'max', 'Low': 'min', 'Close': 'last', 'Volume': 'sum' }).reset_index() # Reset index to preserve 'Timestamp' as a column if grouped_data.empty: print("No new data found.") return None, None # Preprocess the data grouped_data = self.preprocess_data(grouped_data) if grouped_data.empty: print("No new data after preprocessing.") return None, None # Create sequences for the model X = self.create_sequences_for_prediction(grouped_data[self.feature_columns].values) if len(X) == 0: print("Not enough data to create sequences.") return None, None # Generate predictions predictions = self.model.predict(X) # Trim 'grouped_data' to align with sequence length grouped_data = grouped_data.iloc[self.timesteps:] # Align with sequence length # Add predictions to the grouped_data DataFrame grouped_data['Predictions'] = (predictions > 0.5).astype(int) grouped_data['Prediction_Probability'] = predictions # Calculate reality (actual price movement) grouped_data['Reality'] = (grouped_data['Close'].pct_change() > 0.005).astype(int) # Calculate accuracy grouped_data['Correct'] = (grouped_data['Predictions'] == grouped_data['Reality']).astype(int) accuracy = grouped_data['Correct'].mean() print(f"Prediction accuracy: {accuracy:.2f}") # Return predictions and reality return grouped_data[['Timestamp', 'Predictions', 'Prediction_Probability']], grouped_data['Reality'] def make_predictions(self, new_data): """Make predictions on new data.""" # Ensure the 'Timestamp' column is present if 'Timestamp' not in new_data.columns: raise ValueError("Input data must contain a 'Timestamp' column.") # Convert 'Timestamp' to datetime and set as index new_data['Timestamp'] = pd.to_datetime(new_data['Timestamp'], errors='coerce') new_data = new_data.dropna(subset=['Timestamp']) # Drop rows where Timestamp is NaT new_data.set_index('Timestamp', inplace=True) # Resample and aggregate data to the specified timeframe grouped_data = new_data.resample(self.timeframe).agg({ 'Open': 'first', 'High': 'max', 'Low': 'min', 'Close': 'last', 'Volume': 'sum' }).reset_index() # Reset index to preserve 'Timestamp' as a column if grouped_data.empty: print("No new data found.") return None # Preprocess the data grouped_data = self.preprocess_data(grouped_data) if grouped_data.empty: print("No new data after preprocessing.") return None # Create sequences for the model X = self.create_sequences_for_prediction(grouped_data[self.feature_columns].values) if len(X) == 0: print("Not enough data to create sequences.") return None # Generate predictions predictions = self.model.predict(X) # Trim 'grouped_data' to align with sequence length grouped_data = grouped_data.iloc[self.timesteps:] # Add predictions to the grouped_data DataFrame grouped_data['Predictions'] = (predictions > 0.5).astype(int) grouped_data['Prediction_Probability'] = predictions.flatten() # Return prediction results return grouped_data[['Timestamp', 'Predictions', 'Prediction_Probability']] def update_database(self, missing_data): """Update the database with the missing data.""" if missing_data.empty: print("No new data to add to the database.") return missing_data.to_sql('bitcoin_data', self.engine, if_exists='append', index=False) print(f"Database updated with {len(missing_data)} new rows.") def train_model(self): """Train the LSTM model with early stopping and checkpointing.""" if self.X_train is None or self.y_train is None: raise ValueError("Data not loaded. Call load_and_prepare_data() first.") # Create model directory if it doesn't exist os.makedirs("./models", exist_ok=True) # Configure TensorFlow to use memory growth gpus = tf.config.experimental.list_physical_devices('GPU') if gpus: try: # Limit TensorFlow to use only 80% of GPU memory for gpu in gpus: tf.config.experimental.set_virtual_device_configuration( gpu, [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048)] # Set to 2GB or adjust as needed ) print("GPU memory limit set") except RuntimeError as e: print(f"GPU memory limit setting failed: {e}") # Create the model self.model = self.create_model(input_shape=(self.timesteps, len(self.feature_columns))) # Setup callbacks current_date = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') checkpoint_path = f"./models/model_checkpoint_{current_date}.keras" callbacks = [ EarlyStopping( monitor='val_loss', patience=10, restore_best_weights=True, verbose=1 ), ModelCheckpoint( filepath=checkpoint_path, save_best_only=True, monitor='val_loss', verbose=1 ) ] # Train the model print(f"Training model with {self.epochs} epochs and batch size {self.batch_size}...") self.history = self.model.fit( self.X_train, self.y_train, validation_data=(self.X_test, self.y_test), epochs=self.epochs, batch_size=self.batch_size, callbacks=callbacks, verbose=1 ) # Save the final model final_model_path = f"./models/model_final_{current_date}.keras" self.model.save(final_model_path) print(f"Model saved to {final_model_path}") def evaluate_model(self): """Evaluate the trained model on test data.""" if self.model is None: raise ValueError("Model not trained. Call train_model() first.") # Basic evaluation test_loss, test_accuracy = self.model.evaluate(self.X_test, self.y_test, verbose=1) print(f"\nTest Accuracy: {test_accuracy:.4f}") print(f"Test Loss: {test_loss:.4f}") # Make predictions on test data y_pred = (self.model.predict(self.X_test) > 0.5).astype(int) # Calculate confusion matrix cm = confusion_matrix(self.y_test, y_pred) print("\nConfusion Matrix:") print(cm) # Print classification report print("\nClassification Report:") print(classification_report(self.y_test, y_pred)) def plot_history(self): """Plot the training history metrics.""" if self.history is None: raise ValueError("No training history available. Train the model first.") plt.figure(figsize=(15, 6)) # Plot accuracy plt.subplot(1, 2, 1) plt.plot(self.history.history['accuracy'], label='Training Accuracy') plt.plot(self.history.history['val_accuracy'], label='Validation Accuracy') plt.title('Model Accuracy') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.legend(loc='lower right') plt.grid(True) # Plot loss plt.subplot(1, 2, 2) plt.plot(self.history.history['loss'], label='Training Loss') plt.plot(self.history.history['val_loss'], label='Validation Loss') plt.title('Model Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend(loc='upper right') plt.grid(True) plt.tight_layout() # Save the plot os.makedirs("./plots", exist_ok=True) current_date = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') plt.savefig(f"./plots/training_history_{current_date}.png") plt.show()