Cycles/cycles/utils/data_utils.py
2025-05-22 16:53:23 +08:00

162 lines
6.3 KiB
Python

import pandas as pd
def check_data(data_df: pd.DataFrame) -> bool:
"""
Checks if the input DataFrame has a DatetimeIndex.
Args:
data_df (pd.DataFrame): DataFrame to check.
Returns:
bool: True if the DataFrame has a DatetimeIndex, False otherwise.
"""
if not isinstance(data_df.index, pd.DatetimeIndex):
print("Warning: Input DataFrame must have a DatetimeIndex.")
return False
agg_rules = {}
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.")
return False
return agg_rules
def aggregate_to_weekly(data_df: pd.DataFrame, weeks: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to weekly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the week.
'close' will be the last 'close' price of the week.
'high' will be the maximum 'high' price of the week.
'low' will be the minimum 'low' price of the week.
'volume' (if present) will be the sum of volumes for the week.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
weeks (int): The number of weeks to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to weekly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the week.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for weekly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to weekly frequency and apply aggregation rules
weekly_data = data_df.resample(f'{weeks}W').agg(agg_rules)
weekly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the week
if not weekly_data.empty and isinstance(weekly_data.index, pd.DatetimeIndex):
weekly_data.index = weekly_data.index.floor('W')
return weekly_data
def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
"""
Aggregates time-series financial data to daily OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the day.
'close' will be the last 'close' price of the day.
'high' will be the maximum 'high' price of the day.
'low' will be the minimum 'low' price of the day.
'volume' (if present) will be the sum of volumes for the day.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
Column names are expected to be lowercase.
Returns:
pd.DataFrame: DataFrame aggregated to daily OHLCV data.
The index will be a DatetimeIndex with the time set to noon (12:00:00) for each day.
Returns an empty DataFrame if no relevant OHLCV columns are found.
Raises:
ValueError: If the input DataFrame does not have a DatetimeIndex.
"""
agg_rules = check_data(data_df)
if not agg_rules:
# Log a warning or raise an error if no relevant columns are found
# For now, returning an empty DataFrame with a message might be suitable for some cases
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.")
return pd.DataFrame(index=pd.to_datetime([])) # Return empty DF with datetime index
# Resample to daily frequency and apply aggregation rules
daily_data = data_df.resample('D').agg(agg_rules)
# Adjust timestamps to noon if data exists
if not daily_data.empty and isinstance(daily_data.index, pd.DatetimeIndex):
daily_data.index = daily_data.index + pd.Timedelta(hours=12)
# Remove rows where all values are NaN (these are days with no trades in the original data)
daily_data.dropna(how='all', inplace=True)
return daily_data
def aggregate_to_hourly(data_df: pd.DataFrame, hours: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to hourly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the hour.
'close' will be the last 'close' price of the hour.
'high' will be the maximum 'high' price of the hour.
'low' will be the minimum 'low' price of the hour.
'volume' (if present) will be the sum of volumes for the hour.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
hours (int): The number of hours to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to hourly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the hour.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for hourly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to hourly frequency and apply aggregation rules
hourly_data = data_df.resample(f'{hours}H').agg(agg_rules)
hourly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the hour
if not hourly_data.empty and isinstance(hourly_data.index, pd.DatetimeIndex):
hourly_data.index = hourly_data.index.floor('H')
return hourly_data