diff --git a/cycles/Analysis/__init__.py b/cycles/Analysis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cycles/Analysis/boillinger_band.py b/cycles/Analysis/boillinger_band.py new file mode 100644 index 0000000..64d1e36 --- /dev/null +++ b/cycles/Analysis/boillinger_band.py @@ -0,0 +1,50 @@ +import pandas as pd + +class BollingerBands: + """ + Calculates Bollinger Bands for given financial data. + """ + def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0): + """ + Initializes the BollingerBands calculator. + + Args: + period (int): The period for the moving average and standard deviation. + std_dev_multiplier (float): The number of standard deviations for the upper and lower bands. + """ + if period <= 0: + raise ValueError("Period must be a positive integer.") + if std_dev_multiplier <= 0: + raise ValueError("Standard deviation multiplier must be positive.") + + self.period = period + self.std_dev_multiplier = std_dev_multiplier + + def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame: + """ + Calculates Bollinger Bands and adds them to the DataFrame. + + Args: + data_df (pd.DataFrame): DataFrame with price data. Must include the price_column. + price_column (str): The name of the column containing the price data (e.g., 'close'). + + Returns: + pd.DataFrame: The original DataFrame with added columns: + 'SMA' (Simple Moving Average), + 'UpperBand', + 'LowerBand'. + """ + if price_column not in data_df.columns: + raise ValueError(f"Price column '{price_column}' not found in DataFrame.") + + # Calculate SMA + data_df['SMA'] = data_df[price_column].rolling(window=self.period).mean() + + # Calculate Standard Deviation + std_dev = data_df[price_column].rolling(window=self.period).std() + + # Calculate Upper and Lower Bands + data_df['UpperBand'] = data_df['SMA'] + (self.std_dev_multiplier * std_dev) + data_df['LowerBand'] = data_df['SMA'] - (self.std_dev_multiplier * std_dev) + + return data_df diff --git a/cycles/Analysis/rsi.py b/cycles/Analysis/rsi.py new file mode 100644 index 0000000..a51c792 --- /dev/null +++ b/cycles/Analysis/rsi.py @@ -0,0 +1,109 @@ +import pandas as pd +import numpy as np + +class RSI: + """ + A class to calculate the Relative Strength Index (RSI). + """ + def __init__(self, period: int = 14): + """ + Initializes the RSI calculator. + + Args: + period (int): The period for RSI calculation. Default is 14. + Must be a positive integer. + """ + if not isinstance(period, int) or period <= 0: + raise ValueError("Period must be a positive integer.") + self.period = period + + def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame: + """ + Calculates the RSI and adds it as a column to the input DataFrame. + + Args: + data_df (pd.DataFrame): DataFrame with historical price data. + Must contain the 'price_column'. + price_column (str): The name of the column containing price data. + Default is 'close'. + + Returns: + pd.DataFrame: The input DataFrame with an added 'RSI' column. + Returns the original DataFrame with no 'RSI' column + if the period is larger than the number of data points. + """ + if price_column not in data_df.columns: + raise ValueError(f"Price column '{price_column}' not found in DataFrame.") + + if len(data_df) < self.period: + print(f"Warning: Data length ({len(data_df)}) is less than RSI period ({self.period}). RSI will not be calculated.") + return data_df.copy() + + df = data_df.copy() + delta = df[price_column].diff(1) + + gain = delta.where(delta > 0, 0) + loss = -delta.where(delta < 0, 0) # Ensure loss is positive + + # Calculate initial average gain and loss (SMA) + avg_gain = gain.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period] + avg_loss = loss.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period] + + + # Calculate subsequent average gains and losses (EMA-like) + # Pre-allocate lists for gains and losses to avoid repeated appending to Series + gains = [0.0] * len(df) + losses = [0.0] * len(df) + + if not avg_gain.empty: + gains[self.period -1] = avg_gain.iloc[0] + if not avg_loss.empty: + losses[self.period -1] = avg_loss.iloc[0] + + + for i in range(self.period, len(df)): + gains[i] = ((gains[i-1] * (self.period - 1)) + gain.iloc[i]) / self.period + losses[i] = ((losses[i-1] * (self.period - 1)) + loss.iloc[i]) / self.period + + df['avg_gain'] = pd.Series(gains, index=df.index) + df['avg_loss'] = pd.Series(losses, index=df.index) + + # Calculate RS + # Handle division by zero: if avg_loss is 0, RS is undefined or infinite. + # If avg_loss is 0 and avg_gain is also 0, RSI is conventionally 50. + # If avg_loss is 0 and avg_gain > 0, RSI is conventionally 100. + rs = df['avg_gain'] / df['avg_loss'] + + # Calculate RSI + # RSI = 100 - (100 / (1 + RS)) + # If avg_loss is 0: + # If avg_gain > 0, RS -> inf, RSI -> 100 + # If avg_gain == 0, RS -> NaN (0/0), RSI -> 50 (conventionally, or could be 0 or 100 depending on interpretation) + # We will use a common convention where RSI is 100 if avg_loss is 0 and avg_gain > 0, + # and RSI is 0 if avg_loss is 0 and avg_gain is 0 (or 50, let's use 0 to indicate no strength if both are 0). + # However, to avoid NaN from 0/0, it's better to calculate RSI directly with conditions. + + rsi_values = [] + for i in range(len(df)): + avg_g = df['avg_gain'].iloc[i] + avg_l = df['avg_loss'].iloc[i] + + if i < self.period -1 : # Not enough data for initial SMA + rsi_values.append(np.nan) + continue + + if avg_l == 0: + if avg_g == 0: + rsi_values.append(50) # Or 0, or np.nan depending on how you want to treat this. 50 implies neutrality. + else: + rsi_values.append(100) # Max strength + else: + rs_val = avg_g / avg_l + rsi_values.append(100 - (100 / (1 + rs_val))) + + df['RSI'] = pd.Series(rsi_values, index=df.index) + + # Remove intermediate columns if desired, or keep them for debugging + # df.drop(columns=['avg_gain', 'avg_loss'], inplace=True) + + return df diff --git a/cycles/utils/data_utils.py b/cycles/utils/data_utils.py new file mode 100644 index 0000000..876bbe6 --- /dev/null +++ b/cycles/utils/data_utils.py @@ -0,0 +1,60 @@ +import pandas as pd + +def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame: + """ + Aggregates time-series financial data to daily OHLCV format. + + The input DataFrame is expected to have a DatetimeIndex. + 'open' will be the first 'open' price of the day. + 'close' will be the last 'close' price of the day. + 'high' will be the maximum 'high' price of the day. + 'low' will be the minimum 'low' price of the day. + 'volume' (if present) will be the sum of volumes for the day. + + Args: + data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns + like 'open', 'high', 'low', 'close', and optionally 'volume'. + Column names are expected to be lowercase. + + Returns: + pd.DataFrame: DataFrame aggregated to daily OHLCV data. + The index will be a DatetimeIndex with the time set to noon (12:00:00) for each day. + Returns an empty DataFrame if no relevant OHLCV columns are found. + + Raises: + ValueError: If the input DataFrame does not have a DatetimeIndex. + """ + if not isinstance(data_df.index, pd.DatetimeIndex): + raise ValueError("Input DataFrame must have a DatetimeIndex.") + + agg_rules = {} + + # Define aggregation rules based on available columns + if 'open' in data_df.columns: + agg_rules['open'] = 'first' + if 'high' in data_df.columns: + agg_rules['high'] = 'max' + if 'low' in data_df.columns: + agg_rules['low'] = 'min' + if 'close' in data_df.columns: + agg_rules['close'] = 'last' + if 'volume' in data_df.columns: + agg_rules['volume'] = 'sum' + + if not agg_rules: + # Log a warning or raise an error if no relevant columns are found + # For now, returning an empty DataFrame with a message might be suitable for some cases + print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.") + return pd.DataFrame(index=pd.to_datetime([])) # Return empty DF with datetime index + + # Resample to daily frequency and apply aggregation rules + daily_data = data_df.resample('D').agg(agg_rules) + + # Adjust timestamps to noon if data exists + if not daily_data.empty and isinstance(daily_data.index, pd.DatetimeIndex): + daily_data.index = daily_data.index + pd.Timedelta(hours=12) + + # Remove rows where all values are NaN (these are days with no trades in the original data) + daily_data.dropna(how='all', inplace=True) + + return daily_data diff --git a/cycles/utils/storage.py b/cycles/utils/storage.py index 300d8cc..f202d2d 100644 --- a/cycles/utils/storage.py +++ b/cycles/utils/storage.py @@ -57,20 +57,75 @@ class Storage: } # Read data with original capitalized column names data = pd.read_csv(os.path.join(self.data_dir, file_path), dtype=dtypes) + + # Convert timestamp to datetime - data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s') - # Filter by date range - data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)] - # Now convert column names to lowercase - data.columns = data.columns.str.lower() - if self.logging is not None: - self.logging.info(f"Data loaded from {file_path} for date range {start_date} to {stop_date}") - return data.set_index('timestamp') + if 'Timestamp' in data.columns: + data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s') + # Filter by date range + data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)] + # Now convert column names to lowercase + data.columns = data.columns.str.lower() + if self.logging is not None: + self.logging.info(f"Data loaded from {file_path} for date range {start_date} to {stop_date}") + return data.set_index('timestamp') + else: # Attempt to use the first column if 'Timestamp' is not present + data.rename(columns={data.columns[0]: 'timestamp'}, inplace=True) + data['timestamp'] = pd.to_datetime(data['timestamp'], unit='s') + data = data[(data['timestamp'] >= start_date) & (data['timestamp'] <= stop_date)] + data.columns = data.columns.str.lower() # Ensure all other columns are lower + if self.logging is not None: + self.logging.info(f"Data loaded from {file_path} (using first column as timestamp) for date range {start_date} to {stop_date}") + return data.set_index('timestamp') except Exception as e: if self.logging is not None: self.logging.error(f"Error loading data from {file_path}: {e}") - return None + # Return an empty DataFrame with a DatetimeIndex + return pd.DataFrame(index=pd.to_datetime([])) + def save_data(self, data: pd.DataFrame, file_path: str): + """Save processed data to a CSV file. + If the DataFrame has a DatetimeIndex, it's converted to float Unix timestamps + (seconds since epoch) before saving. The index is saved as a column named 'timestamp'. + + Args: + data (pd.DataFrame): data to save. + file_path (str): path to the data file relative to the data_dir. + """ + data_to_save = data.copy() + + if isinstance(data_to_save.index, pd.DatetimeIndex): + # Convert DatetimeIndex to Unix timestamp (float seconds since epoch) + # and make it a column named 'timestamp'. + data_to_save['timestamp'] = data_to_save.index.astype('int64') / 1e9 + # Reset index so 'timestamp' column is saved and old DatetimeIndex is not saved as a column. + # We want the 'timestamp' column to be the first one. + data_to_save.reset_index(drop=True, inplace=True) + # Ensure 'timestamp' is the first column if other columns exist + if 'timestamp' in data_to_save.columns and len(data_to_save.columns) > 1: + cols = ['timestamp'] + [col for col in data_to_save.columns if col != 'timestamp'] + data_to_save = data_to_save[cols] + elif pd.api.types.is_numeric_dtype(data_to_save.index.dtype): + # If index is already numeric (e.g. float Unix timestamps from a previous save/load cycle), + # make it a column named 'timestamp'. + data_to_save['timestamp'] = data_to_save.index + data_to_save.reset_index(drop=True, inplace=True) + if 'timestamp' in data_to_save.columns and len(data_to_save.columns) > 1: + cols = ['timestamp'] + [col for col in data_to_save.columns if col != 'timestamp'] + data_to_save = data_to_save[cols] + else: + # For other index types, or if no index that we want to specifically handle, + # save with the current index. pandas to_csv will handle it. + # This branch might be removed if we strictly expect either DatetimeIndex or a numeric one from previous save. + pass # data_to_save remains as is, to_csv will write its index if index=True + + # Save to CSV, ensuring the 'timestamp' column (if created) is written, and not the DataFrame's active index. + full_path = os.path.join(self.data_dir, file_path) + data_to_save.to_csv(full_path, index=False) # index=False because timestamp is now a column + if self.logging is not None: + self.logging.info(f"Data saved to {full_path} with Unix timestamp column.") + + def format_row(self, row): """Format a row for a combined results CSV file Args: diff --git a/test_bbrsi.py b/test_bbrsi.py new file mode 100644 index 0000000..61af037 --- /dev/null +++ b/test_bbrsi.py @@ -0,0 +1,132 @@ +import logging +import seaborn as sns +import matplotlib.pyplot as plt +import pandas as pd + +from cycles.utils.storage import Storage +from cycles.utils.data_utils import aggregate_to_daily +from cycles.Analysis.boillinger_band import BollingerBands +from cycles.Analysis.rsi import RSI + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler("backtest.log"), + logging.StreamHandler() + ] +) + +config_minute = { + "start_date": "2022-01-01", + "stop_date": "2023-01-01", + "data_file": "btcusd_1-min_data.csv" +} + +config_day = { + "start_date": "2022-01-01", + "stop_date": "2023-01-01", + "data_file": "btcusd_1-day_data.csv" +} + +IS_DAY = True + +def no_strategy(data_bb, data_with_rsi): + buy_condition = pd.Series([False] * len(data_bb), index=data_bb.index) + sell_condition = pd.Series([False] * len(data_bb), index=data_bb.index) + return buy_condition, sell_condition + +def strategy_1(data_bb, data_with_rsi): + # Long trade: price move below lower Bollinger band and RSI go below 25 + buy_condition = (data_bb['close'] < data_bb['LowerBand']) & (data_bb['RSI'] < 25) + # Short only: price move above top Bollinger band and RSI goes over 75 + sell_condition = (data_bb['close'] > data_bb['UpperBand']) & (data_bb['RSI'] > 75) + return buy_condition, sell_condition + + +if __name__ == "__main__": + + storage = Storage(logging=logging) + + if IS_DAY: + config = config_day + else: + config = config_minute + + data = storage.load_data(config["data_file"], config["start_date"], config["stop_date"]) + + if not IS_DAY: + data_daily = aggregate_to_daily(data) + storage.save_data(data, "btcusd_1-day_data.csv") + df_to_plot = data_daily + else: + df_to_plot = data + + bb = BollingerBands(period=30, std_dev_multiplier=2.0) + data_bb = bb.calculate(df_to_plot.copy()) + + rsi_calculator = RSI(period=13) + data_with_rsi = rsi_calculator.calculate(df_to_plot.copy(), price_column='close') + + # Combine BB and RSI data into a single DataFrame for signal generation + # Ensure indices are aligned; they should be as both are from df_to_plot.copy() + if 'RSI' in data_with_rsi.columns: + data_bb['RSI'] = data_with_rsi['RSI'] + else: + # If RSI wasn't calculated (e.g., not enough data), create a dummy column with NaNs + # to prevent errors later, though signals won't be generated. + data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float) + logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.") + + strategy = 1 + if strategy == 1: + buy_condition, sell_condition = strategy_1(data_bb, data_with_rsi) + else: + buy_condition, sell_condition = no_strategy(data_bb, data_with_rsi) + + buy_signals = data_bb[buy_condition] + sell_signals = data_bb[sell_condition] + + # plot the data with seaborn library + if df_to_plot is not None and not df_to_plot.empty: + # Create a figure with two subplots, sharing the x-axis + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 8), sharex=True) + + # Plot 1: Close Price and Bollinger Bands + sns.lineplot(x=data_bb.index, y='close', data=data_bb, label='Close Price', ax=ax1) + sns.lineplot(x=data_bb.index, y='UpperBand', data=data_bb, label='Upper Band (BB)', ax=ax1) + sns.lineplot(x=data_bb.index, y='LowerBand', data=data_bb, label='Lower Band (BB)', ax=ax1) + # Plot Buy/Sell signals on Price chart + if not buy_signals.empty: + ax1.scatter(buy_signals.index, buy_signals['close'], color='green', marker='o', s=20, label='Buy Signal', zorder=5) + if not sell_signals.empty: + ax1.scatter(sell_signals.index, sell_signals['close'], color='red', marker='o', s=20, label='Sell Signal', zorder=5) + ax1.set_title('Price and Bollinger Bands with Signals') + ax1.set_ylabel('Price') + ax1.legend() + ax1.grid(True) + + # Plot 2: RSI + if 'RSI' in data_bb.columns: # Check data_bb now as it should contain RSI + sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (14)', ax=ax2, color='purple') + ax2.axhline(70, color='red', linestyle='--', linewidth=0.8, label='Overbought (70)') + ax2.axhline(30, color='green', linestyle='--', linewidth=0.8, label='Oversold (30)') + # Plot Buy/Sell signals on RSI chart + if not buy_signals.empty: + ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5) + if not sell_signals.empty: + ax2.scatter(sell_signals.index, sell_signals['RSI'], color='red', marker='o', s=20, label='Sell Signal (RSI)', zorder=5) + ax2.set_title('Relative Strength Index (RSI) with Signals') + ax2.set_ylabel('RSI Value') + ax2.set_ylim(0, 100) # RSI is typically bounded between 0 and 100 + ax2.legend() + ax2.grid(True) + else: + logging.info("RSI data not available for plotting.") + + plt.xlabel('Date') # Common X-axis label + fig.tight_layout() # Adjust layout to prevent overlapping titles/labels + plt.show() + else: + logging.info("No data to plot.") +