diff --git a/cycles/utils/data_utils.py b/cycles/utils/data_utils.py index 876bbe6..e691a07 100644 --- a/cycles/utils/data_utils.py +++ b/cycles/utils/data_utils.py @@ -1,5 +1,80 @@ import pandas as pd +def check_data(data_df: pd.DataFrame) -> bool: + """ + Checks if the input DataFrame has a DatetimeIndex. + + Args: + data_df (pd.DataFrame): DataFrame to check. + + Returns: + bool: True if the DataFrame has a DatetimeIndex, False otherwise. + """ + + if not isinstance(data_df.index, pd.DatetimeIndex): + print("Warning: Input DataFrame must have a DatetimeIndex.") + return False + + agg_rules = {} + + # Define aggregation rules based on available columns + if 'open' in data_df.columns: + agg_rules['open'] = 'first' + if 'high' in data_df.columns: + agg_rules['high'] = 'max' + if 'low' in data_df.columns: + agg_rules['low'] = 'min' + if 'close' in data_df.columns: + agg_rules['close'] = 'last' + if 'volume' in data_df.columns: + agg_rules['volume'] = 'sum' + + if not agg_rules: + print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.") + return False + + return agg_rules + +def aggregate_to_weekly(data_df: pd.DataFrame, weeks: int = 1) -> pd.DataFrame: + """ + Aggregates time-series financial data to weekly OHLCV format. + + The input DataFrame is expected to have a DatetimeIndex. + 'open' will be the first 'open' price of the week. + 'close' will be the last 'close' price of the week. + 'high' will be the maximum 'high' price of the week. + 'low' will be the minimum 'low' price of the week. + 'volume' (if present) will be the sum of volumes for the week. + + Args: + data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns + like 'open', 'high', 'low', 'close', and optionally 'volume'. + weeks (int): The number of weeks to aggregate to. Default is 1. + + Returns: + pd.DataFrame: DataFrame aggregated to weekly OHLCV data. + The index will be a DatetimeIndex with the time set to the start of the week. + Returns an empty DataFrame if no relevant OHLCV columns are found. + """ + + agg_rules = check_data(data_df) + + if not agg_rules: + print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for weekly aggregation.") + return pd.DataFrame(index=pd.to_datetime([])) + + # Resample to weekly frequency and apply aggregation rules + weekly_data = data_df.resample(f'{weeks}W').agg(agg_rules) + + weekly_data.dropna(how='all', inplace=True) + + # Adjust timestamps to the start of the week + if not weekly_data.empty and isinstance(weekly_data.index, pd.DatetimeIndex): + weekly_data.index = weekly_data.index.floor('W') + + return weekly_data + + def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame: """ Aggregates time-series financial data to daily OHLCV format. @@ -24,23 +99,9 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame: Raises: ValueError: If the input DataFrame does not have a DatetimeIndex. """ - if not isinstance(data_df.index, pd.DatetimeIndex): - raise ValueError("Input DataFrame must have a DatetimeIndex.") - - agg_rules = {} - - # Define aggregation rules based on available columns - if 'open' in data_df.columns: - agg_rules['open'] = 'first' - if 'high' in data_df.columns: - agg_rules['high'] = 'max' - if 'low' in data_df.columns: - agg_rules['low'] = 'min' - if 'close' in data_df.columns: - agg_rules['close'] = 'last' - if 'volume' in data_df.columns: - agg_rules['volume'] = 'sum' + agg_rules = check_data(data_df) + if not agg_rules: # Log a warning or raise an error if no relevant columns are found # For now, returning an empty DataFrame with a message might be suitable for some cases @@ -58,3 +119,43 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame: daily_data.dropna(how='all', inplace=True) return daily_data + + +def aggregate_to_hourly(data_df: pd.DataFrame, hours: int = 1) -> pd.DataFrame: + """ + Aggregates time-series financial data to hourly OHLCV format. + + The input DataFrame is expected to have a DatetimeIndex. + 'open' will be the first 'open' price of the hour. + 'close' will be the last 'close' price of the hour. + 'high' will be the maximum 'high' price of the hour. + 'low' will be the minimum 'low' price of the hour. + 'volume' (if present) will be the sum of volumes for the hour. + + Args: + data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns + like 'open', 'high', 'low', 'close', and optionally 'volume'. + hours (int): The number of hours to aggregate to. Default is 1. + + Returns: + pd.DataFrame: DataFrame aggregated to hourly OHLCV data. + The index will be a DatetimeIndex with the time set to the start of the hour. + Returns an empty DataFrame if no relevant OHLCV columns are found. + """ + + agg_rules = check_data(data_df) + + if not agg_rules: + print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for hourly aggregation.") + return pd.DataFrame(index=pd.to_datetime([])) + + # Resample to hourly frequency and apply aggregation rules + hourly_data = data_df.resample(f'{hours}H').agg(agg_rules) + + hourly_data.dropna(how='all', inplace=True) + + # Adjust timestamps to the start of the hour + if not hourly_data.empty and isinstance(hourly_data.index, pd.DatetimeIndex): + hourly_data.index = hourly_data.index.floor('H') + + return hourly_data