CryptoMarketParser/BitcoinPricePredictor.py
2025-05-13 16:13:52 +08:00

659 lines
27 KiB
Python

import os
from datetime import datetime
import time
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import confusion_matrix, classification_report
from sqlalchemy import create_engine
from tensorflow.keras.models import Sequential
from tensorflow.keras.regularizers import l1_l2
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
import gc
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from matplotlib.backends.backend_agg import FigureCanvasAgg
from matplotlib.figure import Figure
import matplotlib
from trend_detector_simple import TrendDetectorSimple
class BitcoinPricePredictor:
def __init__(self, db_path, timeframe, model=None, timesteps=10, batch_size=32, learning_rate=0.001, epochs=50):
self.db_path = db_path
self.engine = create_engine(f'sqlite:///{self.db_path}')
self.timesteps = timesteps
self.batch_size = batch_size
self.learning_rate = learning_rate
self.epochs = epochs
self.model = model
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
self.history = None
self.scaler = None
self.timeframe = timeframe
self.feature_columns = ['open', 'high', 'low', 'close', 'volume',
'hl_ratio', 'sma_7', 'sma_21', 'price_change']
self.df = None
@staticmethod
def reduce_mem_usage(df):
"""Optimize memory usage of the dataframe by downcasting numeric types."""
numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
start_mem = df.memory_usage().sum() / 1024**2
for col in df.columns:
col_type = df[col].dtypes
if col_type in numerics: # Only process numeric columns
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
df[col] = df[col].astype(np.int64)
else:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
end_mem = df.memory_usage().sum() / 1024**2
print(f'Memory usage reduced from {start_mem:.2f} MB to {end_mem:.2f} MB ({(1 - end_mem/start_mem)*100:.1f}% reduction)')
return df
def add_essential_features(self, df):
"""Add technical indicators and features to the dataframe."""
print("Adding technical indicators and features...")
df = df.copy()
# Price ratio features
df['hl_ratio'] = (df['high'] / df['low']).clip(lower=0.8, upper=1.2)
# Moving averages
df['sma_7'] = df['close'].rolling(window=7, min_periods=1).mean()
df['sma_21'] = df['close'].rolling(window=21, min_periods=1).mean()
df['sma_50'] = df['close'].rolling(window=50, min_periods=1).mean()
# Exponential moving averages
df['ema_12'] = df['close'].ewm(span=12, adjust=False).mean()
df['ema_26'] = df['close'].ewm(span=26, adjust=False).mean()
# MACD
df['macd'] = df['ema_12'] - df['ema_26']
df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean()
# RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
# Bollinger Bands
df['bb_middle'] = df['close'].rolling(window=20).mean()
df['bb_std'] = df['close'].rolling(window=20).std()
df['bb_upper'] = df['bb_middle'] + 2 * df['bb_std']
df['bb_lower'] = df['bb_middle'] - 2 * df['bb_std']
df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['bb_middle']
# Price changes
df['price_change_1d'] = df['close'].pct_change(periods=1).clip(lower=-0.5, upper=0.5)
df['price_change_3d'] = df['close'].pct_change(periods=3).clip(lower=-0.5, upper=0.5)
df['price_change_7d'] = df['close'].pct_change(periods=7).clip(lower=-0.5, upper=0.5)
# Volatility
df['volatility'] = df['close'].rolling(window=14).std() / df['close'].rolling(window=14).mean()
# Get trend indicators from TrendDetector
# TrendDetector already expects lowercase columns, so no need for conversion
if 'datetime' not in df.columns:
df['datetime'] = df.index
trend_detector = TrendDetectorSimple(df)
trend_data, trend_analysis = trend_detector.detect_trends()
# Add supertrend signals
for i, st in enumerate(trend_analysis['supertrend']):
df[f'supertrend_{i+1}'] = st['results']['trend']
# Add meta-supertrend consensus
meta_results = trend_detector.calculate_metasupertrend(df, trend_analysis['supertrend'])
df['metasupertrend'] = meta_results['meta_trends']
# Add SMA crossover signals
df['sma_cross'] = np.where(trend_analysis['sma']['7'] > trend_analysis['sma']['15'], 1, -1)
# Clean up NaN or infinite values
df = df.fillna(0)
df = df.replace([np.inf, -np.inf], 0)
# Update feature columns list - exclude non-numeric columns
self.feature_columns = [col for col in df.columns if col not in ['next_period_return', 'next_period_up', 'datetime']]
print(f"Shape after adding features: {df.shape}")
return df
def create_sequences(self, data, target):
"""Create sequences of data for LSTM input with corresponding targets."""
x, y = [], []
for i in range(len(data) - self.timesteps):
x.append(data[i:i + self.timesteps])
y.append(target[i + self.timesteps])
return np.array(x, dtype=np.float32), np.array(y, dtype=np.float32)
def create_sequences_for_prediction(self, data):
"""Create sequences of data for prediction without targets."""
return np.array([data[i:i + self.timesteps] for i in range(len(data) - self.timesteps)], dtype=np.float32)
def create_model(self, input_shape):
"""Create and compile the LSTM model architecture."""
model = Sequential([
LSTM(64, return_sequences=True, input_shape=input_shape,
recurrent_dropout=0.2, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
Dropout(0.3),
LSTM(32, return_sequences=True, recurrent_dropout=0.1, kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
Dropout(0.2),
LSTM(16),
Dropout(0.2),
Dense(8, activation='relu'),
Dense(1, activation='sigmoid')
])
optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
model.compile(optimizer=optimizer,
loss='binary_crossentropy',
metrics=['accuracy'])
print(model.summary())
return model
def load_data_csv(self, file_path):
"""Load Bitcoin price data from a CSV file."""
try:
# Read the CSV file
self.df = pd.read_csv(file_path)
# Convert column names to lowercase
self.df.columns = self.df.columns.str.lower()
# Convert timestamp to datetime
self.df['timestamp'] = pd.to_datetime(self.df['timestamp'])
self.df.set_index('timestamp', inplace=True)
if self.df is not None and not self.df.empty:
print(f"Data loaded successfully from CSV. Shape: {self.df.shape}")
else:
print("Failed to load data. DataFrame is empty or None.")
except Exception as e:
print(f"Error loading CSV data: {str(e)}")
self.df = None
def load_data(self):
"""Load data from SQLite database."""
try:
import sqlite3
conn = sqlite3.connect(self.db_path)
self.df = pd.read_sql_query("SELECT * FROM bitcoin_data", conn)
# Convert column names to lowercase
self.df.columns = self.df.columns.str.lower()
if self.df is not None and not self.df.empty:
print(f"Data loaded successfully. Shape: {self.df.shape}")
else:
print("Failed to load data. DataFrame is empty or None.")
conn.close()
except Exception as e:
print(f"Error loading database data: {str(e)}")
self.df = None
def prepare_data(self):
"""Prepare data for model training."""
start_time = time.time()
self.df = self.add_essential_features(self.df)
# Define target variable - binary classification for price movement
self.df['next_period_return'] = self.df['close'].pct_change(periods=1).shift(-1).clip(lower=-0.5, upper=0.5)
self.df['next_period_up'] = (self.df['next_period_return'] > 0).astype(np.int8)
self.df = self.df.dropna()
# Scale features
self.scaler = RobustScaler()
# Ensure we're only scaling numeric features
numeric_features = [col for col in self.feature_columns if col != 'datetime' and pd.api.types.is_numeric_dtype(self.df[col])]
self.df[numeric_features] = self.scaler.fit_transform(self.df[numeric_features])
# Create sequences for LSTM
x, y = self.create_sequences(self.df[numeric_features].values, self.df['next_period_up'].values)
print(f"Sequence shape: {x.shape}, Target shape: {y.shape}")
# Class balance check
class_distribution = np.bincount(y.astype(int))
print(f"Class distribution - 0: {class_distribution[0]}, 1: {class_distribution[1]}")
print(f"Positive class ratio: {class_distribution[1]/len(y):.2f}")
# Train-test split (chronological)
split_idx = int(len(x) * 0.8)
self.X_train, self.X_test = x[:split_idx], x[split_idx:]
self.y_train, self.y_test = y[:split_idx], y[split_idx:]
print(f"Training data shape: {self.X_train.shape}, Test data shape: {self.X_test.shape}")
print(f"Data preparation completed in {time.time() - start_time:.2f} seconds")
def resample_data(self, df):
"""Resample data to specified timeframe."""
print(f"Resampling data to {self.timeframe} timeframe...")
df = df.resample(self.timeframe).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
})
print(f"Shape after resampling: {df.shape}")
return df
def load_new_data_from_model(self):
"""Load new data and identify missing entries compared to the database."""
new_data = pd.read_csv("./data/btcusd_1-min_data.csv")
# Convert column names to lowercase
new_data.columns = new_data.columns.str.lower()
new_data['timestamp'] = pd.to_datetime(new_data['timestamp'], unit='s')
existing_data = pd.read_sql('SELECT * FROM bitcoin_data', self.engine, index_col='timestamp',
parse_dates=['timestamp'])
# Convert column names to lowercase
existing_data.columns = existing_data.columns.str.lower()
# Show the most recent entries in the database
last_entries = existing_data.sort_index(ascending=False).head(10)
print("Most recent entries in database:")
print(last_entries)
# Find missing data
latest_timestamp = existing_data.index.max()
missing_data = new_data[new_data['timestamp'] > latest_timestamp]
print(f"New data total length: {len(new_data)}")
print(f"Missing data entries: {len(missing_data)}")
print(f"Existing data entries: {len(existing_data)}")
return missing_data
def preprocess_data(self, data):
"""Preprocess new data with feature engineering and scaling."""
# Add technical indicators
data = self.add_essential_features(data)
# Scale the features using the same scaler as training data
if self.scaler is not None:
data[self.feature_columns] = self.scaler.transform(data[self.feature_columns])
else:
# If no scaler exists, fit a new one
scaler = RobustScaler()
data[self.feature_columns] = scaler.fit_transform(data[self.feature_columns])
return data
def _prepare_prediction_data(self, new_data):
"""Helper method to prepare data for prediction."""
# Ensure the 'timestamp' column is present
if 'timestamp' not in new_data.columns:
raise ValueError("Input data must contain a 'timestamp' column.")
# Convert 'timestamp' to datetime and set as index
new_data['timestamp'] = pd.to_datetime(new_data['timestamp'], errors='coerce')
new_data = new_data.dropna(subset=['timestamp']) # Drop rows where Timestamp is NaT
new_data.set_index('timestamp', inplace=True)
# Resample and aggregate data to the specified timeframe
grouped_data = new_data.resample(self.timeframe).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).reset_index() # Reset index to preserve 'timestamp' as a column
if grouped_data.empty:
print("No new data found.")
return None
# Preprocess the data
grouped_data = self.preprocess_data(grouped_data)
if grouped_data.empty:
print("No new data after preprocessing.")
return None
# Create sequences for the model
X = self.create_sequences_for_prediction(grouped_data[self.feature_columns].values)
if len(X) == 0:
print("Not enough data to create sequences.")
return None, None
return X, grouped_data
def make_predictions_w_reality(self, new_data):
"""Make predictions and compare with actual outcomes."""
# Convert column names to lowercase if needed
new_data.columns = new_data.columns.str.lower()
prepared_data = self._prepare_prediction_data(new_data)
if prepared_data is None:
return None, None
X, grouped_data = prepared_data
# Generate predictions
predictions = self.model.predict(X)
# Trim 'grouped_data' to align with sequence length
grouped_data = grouped_data.iloc[self.timesteps:] # Align with sequence length
# Add predictions to the grouped_data DataFrame
grouped_data['predictions'] = (predictions > 0.5).astype(int)
grouped_data['prediction_probability'] = predictions
# Calculate reality (actual price movement)
grouped_data['reality'] = (grouped_data['close'].pct_change() > 0.005).astype(int)
# Calculate accuracy
grouped_data['correct'] = (grouped_data['predictions'] == grouped_data['reality']).astype(int)
accuracy = grouped_data['correct'].mean()
print(f"Prediction accuracy: {accuracy:.2f}")
# Return predictions and reality
return grouped_data[['timestamp', 'predictions', 'prediction_probability']], grouped_data['reality']
def make_predictions(self, new_data):
"""Make predictions on new data."""
# Convert column names to lowercase if needed
new_data.columns = new_data.columns.str.lower()
prepared_data = self._prepare_prediction_data(new_data)
if prepared_data is None:
return None
X, grouped_data = prepared_data
# Generate predictions
predictions = self.model.predict(X)
# Trim 'grouped_data' to align with sequence length
grouped_data = grouped_data.iloc[self.timesteps:]
# Add predictions to the grouped_data DataFrame
grouped_data['predictions'] = (predictions > 0.5).astype(int)
grouped_data['prediction_probability'] = predictions.flatten()
# Return prediction results
return grouped_data[['timestamp', 'predictions', 'prediction_probability']]
def update_database(self, missing_data):
"""Update the database with the missing data."""
if missing_data.empty:
print("No new data to add to the database.")
return
missing_data.to_sql('bitcoin_data', self.engine, if_exists='append', index=False)
print(f"Database updated with {len(missing_data)} new rows.")
def train_model(self):
"""Train the LSTM model with early stopping and checkpointing."""
if self.X_train is None or self.y_train is None:
raise ValueError("Data not loaded. Call prepare_data() first.")
# Create model directory if it doesn't exist
os.makedirs("./models", exist_ok=True)
# Configure TensorFlow for GPU memory
self._configure_gpu_memory()
# Create the model
self.model = self.create_model(input_shape=(self.timesteps, len(self.feature_columns)))
# Setup callbacks
current_date = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
checkpoint_path = f"./models/model_checkpoint_{current_date}.keras"
callbacks = [
EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True,
verbose=1
),
ModelCheckpoint(
filepath=checkpoint_path,
save_best_only=True,
monitor='val_loss',
verbose=1
)
]
# Train the model
print(f"Training model with {self.epochs} epochs and batch size {self.batch_size}...")
self.history = self.model.fit(
self.X_train, self.y_train,
validation_data=(self.X_test, self.y_test),
epochs=self.epochs,
batch_size=self.batch_size,
callbacks=callbacks,
verbose=1
)
# Save the final model
final_model_path = f"./models/model_final_{current_date}.keras"
self.model.save(final_model_path)
print(f"Model saved to {final_model_path}")
def _configure_gpu_memory(self):
"""Configure TensorFlow to use GPU memory efficiently."""
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
# Limit TensorFlow to use only 80% of GPU memory
for gpu in gpus:
tf.config.experimental.set_virtual_device_configuration(
gpu,
[tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048)]
)
print("GPU memory limit set")
except RuntimeError as e:
print(f"GPU memory limit setting failed: {e}")
def evaluate_model(self):
"""Evaluate the trained model on test data."""
if self.model is None:
raise ValueError("Model not trained. Call train_model() first.")
# Basic evaluation
test_loss, test_accuracy = self.model.evaluate(self.X_test, self.y_test, verbose=1)
print(f"\nTest Accuracy: {test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")
# Make predictions on test data
y_pred = (self.model.predict(self.X_test) > 0.5).astype(int)
# Calculate confusion matrix
cm = confusion_matrix(self.y_test, y_pred)
print("\nConfusion Matrix:")
print(cm)
# Print classification report
print("\nClassification Report:")
print(classification_report(self.y_test, y_pred))
def plot_history(self):
"""Plot the training history metrics."""
if self.history is None:
raise ValueError("No training history available. Train the model first.")
plt.figure(figsize=(15, 6))
# Plot accuracy
plt.subplot(1, 2, 1)
plt.plot(self.history.history['accuracy'], label='Training Accuracy')
plt.plot(self.history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')
plt.grid(True)
# Plot loss
plt.subplot(1, 2, 2)
plt.plot(self.history.history['loss'], label='Training Loss')
plt.plot(self.history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(loc='upper right')
plt.grid(True)
plt.tight_layout()
# Save the plot
os.makedirs("./plots", exist_ok=True)
current_date = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
plt.savefig(f"./plots/training_history_{current_date}.png")
plt.show()
def analyze_market_trends(self, window_size=100, prominence=0.01, height=None, threshold=0.0, distance=None):
"""Analyze market trends by finding local minima and maxima in the price data."""
matplotlib.use('TkAgg') # Use TkAgg backend for interactive plotting
# Make sure data is loaded
if not hasattr(self, 'df') or self.df is None:
print("Data not loaded. Call load_and_prepare_data() first.")
return
# Get the closing prices
prices = self.df['close'].values
# Calculate prominence as a percentage of price range if provided as a relative value
price_range = np.max(prices) - np.min(prices)
if prominence < 1: # If prominence is provided as a relative value
prominence_abs = prominence * price_range
else:
prominence_abs = prominence
# Use provided distance or default to window_size
if distance is None:
distance = window_size
# Find local maxima (peaks)
peaks, peaks_props = find_peaks(
prices,
height=height,
threshold=threshold,
distance=distance,
prominence=prominence_abs
)
# Find local minima (valleys)
valleys, valleys_props = find_peaks(
-prices,
height=-height if height is not None else None,
threshold=threshold,
distance=distance,
prominence=prominence_abs
)
# Create figure for trend analysis
self._plot_trend_analysis(prices, peaks, valleys)
# Print trend statistics
self._print_trend_statistics(prices, peaks, valleys)
return peaks, valleys
def _plot_trend_analysis(self, prices, peaks, valleys):
"""Helper method to plot trend analysis."""
plt.figure(figsize=(14, 7))
# Plot the price data
plt.plot(self.df.index, prices, label='Bitcoin Price')
# Highlight the peaks and valleys
plt.scatter(self.df.index[peaks], prices[peaks], color='green', s=100, marker='^', label='Local Maxima')
plt.scatter(self.df.index[valleys], prices[valleys], color='red', s=100, marker='v', label='Local Minima')
# Identify trends by connecting consecutive extrema
all_points = np.sort(np.concatenate([peaks, valleys]))
self.up_trends = []
self.down_trends = []
for i in range(len(all_points) - 1):
start_idx = all_points[i]
end_idx = all_points[i+1]
# Determine if it's an uptrend or downtrend
if start_idx in valleys and end_idx in peaks:
# Uptrend
plt.plot([self.df.index[start_idx], self.df.index[end_idx]],
[prices[start_idx], prices[end_idx]],
'g-', linewidth=2, alpha=0.7)
duration = end_idx - start_idx
magnitude = prices[end_idx] - prices[start_idx]
percent_change = 100 * magnitude / prices[start_idx]
self.up_trends.append((duration, magnitude, percent_change))
elif start_idx in peaks and end_idx in valleys:
# Downtrend
plt.plot([self.df.index[start_idx], self.df.index[end_idx]],
[prices[start_idx], prices[end_idx]],
'r-', linewidth=2, alpha=0.7)
duration = end_idx - start_idx
magnitude = prices[start_idx] - prices[end_idx]
percent_change = 100 * magnitude / prices[start_idx]
self.down_trends.append((duration, magnitude, percent_change))
plt.title('Bitcoin Price Trends Analysis')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('bitcoin_trends_analysis.png')
plt.show(block=True)
def _print_trend_statistics(self, prices, peaks, valleys):
"""Helper method to print trend statistics."""
print(f"Found {len(peaks)} local maxima and {len(valleys)} local minima")
# Calculate average trend durations and magnitudes
if hasattr(self, 'up_trends') and self.up_trends:
avg_up_duration = sum(t[0] for t in self.up_trends) / len(self.up_trends)
avg_up_magnitude = sum(t[1] for t in self.up_trends) / len(self.up_trends)
avg_up_percent = sum(t[2] for t in self.up_trends) / len(self.up_trends)
print(f"Average uptrend: {avg_up_duration:.1f} periods, {avg_up_magnitude:.2f} price change ({avg_up_percent:.2f}%)")
if hasattr(self, 'down_trends') and self.down_trends:
avg_down_duration = sum(t[0] for t in self.down_trends) / len(self.down_trends)
avg_down_magnitude = sum(t[1] for t in self.down_trends) / len(self.down_trends)
avg_down_percent = sum(t[2] for t in self.down_trends) / len(self.down_trends)
print(f"Average downtrend: {avg_down_duration:.1f} periods, {avg_down_magnitude:.2f} price change ({avg_down_percent:.2f}%)")