11 Commits

Author SHA1 Message Date
Ajasra
00873d593f Enhance strategy output standardization and improve plotting logic
- Introduced a new method to standardize output column names across different strategies, ensuring consistency in data handling and plotting.
- Updated plotting logic in test_bbrsi.py to utilize standardized column names, improving clarity and maintainability.
- Enhanced error handling for missing data in plots and adjusted visual elements for better representation of trading signals.
- Improved the overall structure of strategy implementations to support additional indicators and metadata for better analysis.
2025-05-22 18:16:23 +08:00
Ajasra
3a9dec543c Refactor test_bbrsi.py and enhance strategy implementations
- Removed unused configuration for daily data and consolidated minute configuration into a single config dictionary.
- Updated plotting logic to dynamically handle different strategies, ensuring appropriate bands and signals are displayed based on the selected strategy.
- Improved error handling and logging for missing data in plots.
- Enhanced the Bollinger Bands and RSI classes to support adaptive parameters based on market regimes, improving flexibility in strategy execution.
- Added new CryptoTradingStrategy with multi-timeframe analysis and volume confirmation for better trading signal accuracy.
- Updated documentation to reflect changes in strategy implementations and configuration requirements.
2025-05-22 17:57:04 +08:00
Ajasra
934c807246 fixed depricated parameters 2025-05-22 17:24:16 +08:00
Ajasra
8e220b564c Merge branch 'main' of https://dep.sokaris.link/Simon/Cycles 2025-05-22 17:15:55 +08:00
Ajasra
1107346594 refactor to move inside strategy calculations 2025-05-22 17:15:51 +08:00
Simon Moisy
45c853efab Moved supertrend.py to Analysis subfolder 2025-05-22 17:09:29 +08:00
Simon Moisy
268bc33bbf Merge branch 'main' of ssh://dep.sokaris.link:2222/Simon/Cycles 2025-05-22 17:05:39 +08:00
Simon Moisy
e286dd881a - Refactored the Backtest class for strategy modularity
- Updated entry and exit strategy functions
2025-05-22 17:05:19 +08:00
Ajasra
736b278ee2 aggregate for specific condition 2025-05-22 16:53:23 +08:00
Ajasra
a924328c90 Implement Market Regime Strategy and refactor Bollinger Bands and RSI classes
- Introduced a new Strategy class to encapsulate trading strategies, including the Market Regime Strategy that adapts to different market conditions.
- Refactored BollingerBands and RSI classes to accept configuration parameters for improved flexibility and maintainability.
- Updated test_bbrsi.py to utilize the new strategy implementation and adjusted date ranges for testing.
- Enhanced documentation to include details about the new Strategy class and its components.
2025-05-22 16:44:59 +08:00
Simon Moisy
f4873c56ff minor fixes 2025-05-21 17:23:35 +08:00
11 changed files with 1134 additions and 357 deletions

View File

@@ -1,26 +1,29 @@
import pandas as pd import pandas as pd
import numpy as np
class BollingerBands: class BollingerBands:
""" """
Calculates Bollinger Bands for given financial data. Calculates Bollinger Bands for given financial data.
""" """
def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0): def __init__(self, config):
""" """
Initializes the BollingerBands calculator. Initializes the BollingerBands calculator.
Args: Args:
period (int): The period for the moving average and standard deviation. period (int): The period for the moving average and standard deviation.
std_dev_multiplier (float): The number of standard deviations for the upper and lower bands. std_dev_multiplier (float): The number of standard deviations for the upper and lower bands.
bb_width (float): The width of the Bollinger Bands.
""" """
if period <= 0: if config['bb_period'] <= 0:
raise ValueError("Period must be a positive integer.") raise ValueError("Period must be a positive integer.")
if std_dev_multiplier <= 0: if config['trending']['bb_std_dev_multiplier'] <= 0 or config['sideways']['bb_std_dev_multiplier'] <= 0:
raise ValueError("Standard deviation multiplier must be positive.") raise ValueError("Standard deviation multiplier must be positive.")
if config['bb_width'] <= 0:
raise ValueError("BB width must be positive.")
self.period = period self.config = config
self.std_dev_multiplier = std_dev_multiplier
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame: def calculate(self, data_df: pd.DataFrame, price_column: str = 'close', squeeze = False) -> pd.DataFrame:
""" """
Calculates Bollinger Bands and adds them to the DataFrame. Calculates Bollinger Bands and adds them to the DataFrame.
@@ -37,14 +40,105 @@ class BollingerBands:
if price_column not in data_df.columns: if price_column not in data_df.columns:
raise ValueError(f"Price column '{price_column}' not found in DataFrame.") raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
# Calculate SMA # Work on a copy to avoid modifying the original DataFrame passed to the function
data_df['SMA'] = data_df[price_column].rolling(window=self.period).mean() data_df = data_df.copy()
# Calculate Standard Deviation if not squeeze:
std_dev = data_df[price_column].rolling(window=self.period).std() period = self.config['bb_period']
bb_width_threshold = self.config['bb_width']
trending_std_multiplier = self.config['trending']['bb_std_dev_multiplier']
sideways_std_multiplier = self.config['sideways']['bb_std_dev_multiplier']
# Calculate Upper and Lower Bands # Calculate SMA
data_df['UpperBand'] = data_df['SMA'] + (self.std_dev_multiplier * std_dev) data_df['SMA'] = data_df[price_column].rolling(window=period).mean()
data_df['LowerBand'] = data_df['SMA'] - (self.std_dev_multiplier * std_dev)
# Calculate Standard Deviation
std_dev = data_df[price_column].rolling(window=period).std()
# Calculate reference Upper and Lower Bands for BBWidth calculation (e.g., using 2.0 std dev)
# This ensures BBWidth is calculated based on a consistent band definition before applying adaptive multipliers.
ref_upper_band = data_df['SMA'] + (2.0 * std_dev)
ref_lower_band = data_df['SMA'] - (2.0 * std_dev)
# Calculate the width of the Bollinger Bands
# Avoid division by zero or NaN if SMA is zero or NaN by replacing with np.nan
data_df['BBWidth'] = np.where(data_df['SMA'] != 0, (ref_upper_band - ref_lower_band) / data_df['SMA'], np.nan)
# Calculate the market regime (1 = sideways, 0 = trending)
# Handle NaN in BBWidth: if BBWidth is NaN, MarketRegime should also be NaN or a default (e.g. trending)
data_df['MarketRegime'] = np.where(data_df['BBWidth'].isna(), np.nan,
(data_df['BBWidth'] < bb_width_threshold).astype(float)) # Use float for NaN compatibility
# Determine the std dev multiplier for each row based on its market regime
conditions = [
data_df['MarketRegime'] == 1, # Sideways market
data_df['MarketRegime'] == 0 # Trending market
]
choices = [
sideways_std_multiplier,
trending_std_multiplier
]
# Default multiplier if MarketRegime is NaN (e.g., use trending or a neutral default like 2.0)
# For now, let's use trending_std_multiplier as default if MarketRegime is NaN.
# This can be adjusted based on desired behavior for periods where regime is undetermined.
row_specific_std_multiplier = np.select(conditions, choices, default=trending_std_multiplier)
# Calculate final Upper and Lower Bands using the row-specific multiplier
data_df['UpperBand'] = data_df['SMA'] + (row_specific_std_multiplier * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (row_specific_std_multiplier * std_dev)
else: # squeeze is True
price_series = data_df[price_column]
# Use the static method for the squeeze case with fixed parameters
upper_band, sma, lower_band = self.calculate_custom_bands(
price_series,
window=14,
num_std=1.5,
min_periods=14 # Match typical squeeze behavior where bands appear after full period
)
data_df['SMA'] = sma
data_df['UpperBand'] = upper_band
data_df['LowerBand'] = lower_band
# BBWidth and MarketRegime are not typically calculated/used in a simple squeeze context by this method
# If needed, they could be added, but the current structure implies they are part of the non-squeeze path.
data_df['BBWidth'] = np.nan
data_df['MarketRegime'] = np.nan
return data_df return data_df
@staticmethod
def calculate_custom_bands(price_series: pd.Series, window: int = 20, num_std: float = 2.0, min_periods: int = None) -> tuple[pd.Series, pd.Series, pd.Series]:
"""
Calculates Bollinger Bands with specified window and standard deviation multiplier.
Args:
price_series (pd.Series): Series of prices.
window (int): The period for the moving average and standard deviation.
num_std (float): The number of standard deviations for the upper and lower bands.
min_periods (int, optional): Minimum number of observations in window required to have a value.
Defaults to `window` if None.
Returns:
tuple[pd.Series, pd.Series, pd.Series]: Upper band, SMA, Lower band.
"""
if not isinstance(price_series, pd.Series):
raise TypeError("price_series must be a pandas Series.")
if not isinstance(window, int) or window <= 0:
raise ValueError("window must be a positive integer.")
if not isinstance(num_std, (int, float)) or num_std <= 0:
raise ValueError("num_std must be a positive number.")
if min_periods is not None and (not isinstance(min_periods, int) or min_periods <= 0):
raise ValueError("min_periods must be a positive integer if provided.")
actual_min_periods = window if min_periods is None else min_periods
sma = price_series.rolling(window=window, min_periods=actual_min_periods).mean()
std = price_series.rolling(window=window, min_periods=actual_min_periods).std()
# Replace NaN std with 0 to avoid issues if sma is present but std is not (e.g. constant price in window)
std = std.fillna(0)
upper_band = sma + (std * num_std)
lower_band = sma - (std * num_std)
return upper_band, sma, lower_band

View File

@@ -5,7 +5,7 @@ class RSI:
""" """
A class to calculate the Relative Strength Index (RSI). A class to calculate the Relative Strength Index (RSI).
""" """
def __init__(self, period: int = 14): def __init__(self, config):
""" """
Initializes the RSI calculator. Initializes the RSI calculator.
@@ -13,13 +13,13 @@ class RSI:
period (int): The period for RSI calculation. Default is 14. period (int): The period for RSI calculation. Default is 14.
Must be a positive integer. Must be a positive integer.
""" """
if not isinstance(period, int) or period <= 0: if not isinstance(config['rsi_period'], int) or config['rsi_period'] <= 0:
raise ValueError("Period must be a positive integer.") raise ValueError("Period must be a positive integer.")
self.period = period self.period = config['rsi_period']
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame: def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
""" """
Calculates the RSI and adds it as a column to the input DataFrame. Calculates the RSI (using Wilder's smoothing) and adds it as a column to the input DataFrame.
Args: Args:
data_df (pd.DataFrame): DataFrame with historical price data. data_df (pd.DataFrame): DataFrame with historical price data.
@@ -35,75 +35,79 @@ class RSI:
if price_column not in data_df.columns: if price_column not in data_df.columns:
raise ValueError(f"Price column '{price_column}' not found in DataFrame.") raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
if len(data_df) < self.period: # Check if data is sufficient for calculation (need period + 1 for one diff calculation)
print(f"Warning: Data length ({len(data_df)}) is less than RSI period ({self.period}). RSI will not be calculated.") if len(data_df) < self.period + 1:
return data_df.copy() print(f"Warning: Data length ({len(data_df)}) is less than RSI period ({self.period}) + 1. RSI will not be calculated meaningfully.")
df_copy = data_df.copy()
df_copy['RSI'] = np.nan # Add an RSI column with NaNs
return df_copy
df = data_df.copy() df = data_df.copy() # Work on a copy
delta = df[price_column].diff(1)
gain = delta.where(delta > 0, 0) price_series = df[price_column]
loss = -delta.where(delta < 0, 0) # Ensure loss is positive
# Calculate initial average gain and loss (SMA) # Call the static custom RSI calculator, defaulting to EMA for Wilder's smoothing
avg_gain = gain.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period] rsi_series = self.calculate_custom_rsi(price_series, window=self.period, smoothing='EMA')
avg_loss = loss.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period]
df['RSI'] = rsi_series
# Calculate subsequent average gains and losses (EMA-like)
# Pre-allocate lists for gains and losses to avoid repeated appending to Series
gains = [0.0] * len(df)
losses = [0.0] * len(df)
if not avg_gain.empty:
gains[self.period -1] = avg_gain.iloc[0]
if not avg_loss.empty:
losses[self.period -1] = avg_loss.iloc[0]
for i in range(self.period, len(df)):
gains[i] = ((gains[i-1] * (self.period - 1)) + gain.iloc[i]) / self.period
losses[i] = ((losses[i-1] * (self.period - 1)) + loss.iloc[i]) / self.period
df['avg_gain'] = pd.Series(gains, index=df.index)
df['avg_loss'] = pd.Series(losses, index=df.index)
# Calculate RS
# Handle division by zero: if avg_loss is 0, RS is undefined or infinite.
# If avg_loss is 0 and avg_gain is also 0, RSI is conventionally 50.
# If avg_loss is 0 and avg_gain > 0, RSI is conventionally 100.
rs = df['avg_gain'] / df['avg_loss']
# Calculate RSI
# RSI = 100 - (100 / (1 + RS))
# If avg_loss is 0:
# If avg_gain > 0, RS -> inf, RSI -> 100
# If avg_gain == 0, RS -> NaN (0/0), RSI -> 50 (conventionally, or could be 0 or 100 depending on interpretation)
# We will use a common convention where RSI is 100 if avg_loss is 0 and avg_gain > 0,
# and RSI is 0 if avg_loss is 0 and avg_gain is 0 (or 50, let's use 0 to indicate no strength if both are 0).
# However, to avoid NaN from 0/0, it's better to calculate RSI directly with conditions.
rsi_values = []
for i in range(len(df)):
avg_g = df['avg_gain'].iloc[i]
avg_l = df['avg_loss'].iloc[i]
if i < self.period -1 : # Not enough data for initial SMA
rsi_values.append(np.nan)
continue
if avg_l == 0:
if avg_g == 0:
rsi_values.append(50) # Or 0, or np.nan depending on how you want to treat this. 50 implies neutrality.
else:
rsi_values.append(100) # Max strength
else:
rs_val = avg_g / avg_l
rsi_values.append(100 - (100 / (1 + rs_val)))
df['RSI'] = pd.Series(rsi_values, index=df.index)
# Remove intermediate columns if desired, or keep them for debugging
# df.drop(columns=['avg_gain', 'avg_loss'], inplace=True)
return df return df
@staticmethod
def calculate_custom_rsi(price_series: pd.Series, window: int = 14, smoothing: str = 'SMA') -> pd.Series:
"""
Calculates RSI with specified window and smoothing (SMA or EMA).
Args:
price_series (pd.Series): Series of prices.
window (int): The period for RSI calculation. Must be a positive integer.
smoothing (str): Smoothing method, 'SMA' or 'EMA'. Defaults to 'SMA'.
Returns:
pd.Series: Series containing the RSI values.
"""
if not isinstance(price_series, pd.Series):
raise TypeError("price_series must be a pandas Series.")
if not isinstance(window, int) or window <= 0:
raise ValueError("window must be a positive integer.")
if smoothing not in ['SMA', 'EMA']:
raise ValueError("smoothing must be either 'SMA' or 'EMA'.")
if len(price_series) < window + 1: # Need at least window + 1 prices for one diff
# print(f"Warning: Data length ({len(price_series)}) is less than RSI window ({window}) + 1. RSI will be all NaN.")
return pd.Series(np.nan, index=price_series.index)
delta = price_series.diff()
# The first delta is NaN. For gain/loss calculations, it can be treated as 0.
# However, subsequent rolling/ewm will handle NaNs appropriately if min_periods is set.
gain = delta.where(delta > 0, 0.0)
loss = -delta.where(delta < 0, 0.0) # Ensure loss is positive
# Ensure gain and loss Series have the same index as price_series for rolling/ewm
# This is important if price_series has missing dates/times
gain = gain.reindex(price_series.index, fill_value=0.0)
loss = loss.reindex(price_series.index, fill_value=0.0)
if smoothing == 'EMA':
# adjust=False for Wilder's smoothing used in RSI
avg_gain = gain.ewm(alpha=1/window, adjust=False, min_periods=window).mean()
avg_loss = loss.ewm(alpha=1/window, adjust=False, min_periods=window).mean()
else: # SMA
avg_gain = gain.rolling(window=window, min_periods=window).mean()
avg_loss = loss.rolling(window=window, min_periods=window).mean()
# Handle division by zero for RS calculation
# If avg_loss is 0, RS can be considered infinite (if avg_gain > 0) or undefined (if avg_gain also 0)
rs = avg_gain / avg_loss.replace(0, 1e-9) # Replace 0 with a tiny number to avoid direct division by zero warning
rsi = 100 - (100 / (1 + rs))
# Correct RSI values for edge cases where avg_loss was 0
# If avg_loss is 0 and avg_gain is > 0, RSI is 100.
# If avg_loss is 0 and avg_gain is 0, RSI is 50 (neutral).
rsi[avg_loss == 0] = np.where(avg_gain[avg_loss == 0] > 0, 100, 50)
# Ensure RSI is NaN where avg_gain or avg_loss is NaN (due to min_periods)
rsi[avg_gain.isna() | avg_loss.isna()] = np.nan
return rsi

View File

@@ -0,0 +1,364 @@
import pandas as pd
import numpy as np
from cycles.Analysis.boillinger_band import BollingerBands
from cycles.Analysis.rsi import RSI
from cycles.utils.data_utils import aggregate_to_daily, aggregate_to_hourly, aggregate_to_minutes
class Strategy:
def __init__(self, config = None, logging = None):
if config is None:
raise ValueError("Config must be provided.")
self.config = config
self.logging = logging
def run(self, data, strategy_name):
if strategy_name == "MarketRegimeStrategy":
result = self.MarketRegimeStrategy(data)
return self.standardize_output(result, strategy_name)
elif strategy_name == "CryptoTradingStrategy":
result = self.CryptoTradingStrategy(data)
return self.standardize_output(result, strategy_name)
else:
if self.logging is not None:
self.logging.warning(f"Strategy {strategy_name} not found. Using no_strategy instead.")
return self.no_strategy(data)
def standardize_output(self, data, strategy_name):
"""
Standardize column names across different strategies to ensure consistent plotting and analysis
Args:
data (DataFrame): Strategy output DataFrame
strategy_name (str): Name of the strategy that generated this data
Returns:
DataFrame: Data with standardized column names
"""
if data.empty:
return data
# Create a copy to avoid modifying the original
standardized = data.copy()
# Standardize column names based on strategy
if strategy_name == "MarketRegimeStrategy":
# MarketRegimeStrategy already has standard column names for most fields
# Just ensure all standard columns exist
pass
elif strategy_name == "CryptoTradingStrategy":
# Map strategy-specific column names to standard names
column_mapping = {
'UpperBand_15m': 'UpperBand',
'LowerBand_15m': 'LowerBand',
'SMA_15m': 'SMA',
'RSI_15m': 'RSI',
'VolumeMA_15m': 'VolumeMA',
# Keep StopLoss and TakeProfit as they are
}
# Add standard columns from mapped columns
for old_col, new_col in column_mapping.items():
if old_col in standardized.columns and new_col not in standardized.columns:
standardized[new_col] = standardized[old_col]
# Add additional strategy-specific data as metadata columns
if 'UpperBand_1h' in standardized.columns:
standardized['UpperBand_1h_meta'] = standardized['UpperBand_1h']
if 'LowerBand_1h' in standardized.columns:
standardized['LowerBand_1h_meta'] = standardized['LowerBand_1h']
# Ensure all strategies have BBWidth if possible
if 'BBWidth' not in standardized.columns and 'UpperBand' in standardized.columns and 'LowerBand' in standardized.columns:
standardized['BBWidth'] = (standardized['UpperBand'] - standardized['LowerBand']) / standardized['SMA'] if 'SMA' in standardized.columns else np.nan
return standardized
def no_strategy(self, data):
"""No strategy: returns False for both buy and sell conditions"""
buy_condition = pd.Series([False] * len(data), index=data.index)
sell_condition = pd.Series([False] * len(data), index=data.index)
return buy_condition, sell_condition
def rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5):
"""Calculate RSI Bollinger Bands for confirmation
Args:
rsi (Series): RSI values
window (int): Rolling window for SMA
std_mult (float): Standard deviation multiplier
Returns:
tuple: (oversold condition, overbought condition)
"""
valid_rsi = ~rsi.isna()
if not valid_rsi.any():
# Return empty Series if no valid RSI data
return pd.Series(False, index=rsi.index), pd.Series(False, index=rsi.index)
rsi_sma = rsi.rolling(window).mean()
rsi_std = rsi.rolling(window).std()
upper_rsi_band = rsi_sma + std_mult * rsi_std
lower_rsi_band = rsi_sma - std_mult * rsi_std
return (rsi < lower_rsi_band), (rsi > upper_rsi_band)
def MarketRegimeStrategy(self, data):
"""Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
with adaptive Bollinger Bands
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection
to adapt to Bitcoin's unique market conditions.
Entry Conditions:
- Trending Market (Breakout Mode):
Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
- Sideways Market (Mean Reversion):
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
Enhanced with RSI Bollinger Squeeze for signal confirmation when enabled.
Returns:
DataFrame: A unified DataFrame containing original data, BB, RSI, and signals.
"""
# data = aggregate_to_hourly(data, 4)
data = aggregate_to_daily(data)
# Calculate Bollinger Bands
bb_calculator = BollingerBands(config=self.config)
# Ensure we are working with a copy to avoid modifying the original DataFrame upstream
data_bb = bb_calculator.calculate(data.copy())
# Calculate RSI
rsi_calculator = RSI(config=self.config)
# Use the original data's copy for RSI calculation as well, to maintain index integrity
data_with_rsi = rsi_calculator.calculate(data.copy(), price_column='close')
# Combine BB and RSI data into a single DataFrame for signal generation
# Ensure indices are aligned; they should be as both are from data.copy()
if 'RSI' in data_with_rsi.columns:
data_bb['RSI'] = data_with_rsi['RSI']
else:
# If RSI wasn't calculated (e.g., not enough data), create a dummy column with NaNs
# to prevent errors later, though signals won't be generated.
data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float)
if self.logging:
self.logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.")
# Initialize conditions as all False
buy_condition = pd.Series(False, index=data_bb.index)
sell_condition = pd.Series(False, index=data_bb.index)
# Create masks for different market regimes
# MarketRegime is expected to be in data_bb from BollingerBands calculation
sideways_mask = data_bb['MarketRegime'] > 0
trending_mask = data_bb['MarketRegime'] <= 0
valid_data_mask = ~data_bb['MarketRegime'].isna() # Handle potential NaN values
# Calculate volume spike (≥1.5× 20D Avg)
# 'volume' column should be present in the input 'data', and thus in 'data_bb'
if 'volume' in data_bb.columns:
volume_20d_avg = data_bb['volume'].rolling(window=20).mean()
volume_spike = data_bb['volume'] >= 1.5 * volume_20d_avg
# Additional volume contraction filter for sideways markets
volume_30d_avg = data_bb['volume'].rolling(window=30).mean()
volume_contraction = data_bb['volume'] < 0.7 * volume_30d_avg
else:
# If volume data is not available, assume no volume spike
volume_spike = pd.Series(False, index=data_bb.index)
volume_contraction = pd.Series(False, index=data_bb.index)
if self.logging is not None:
self.logging.warning("Volume data not available. Volume conditions will not be triggered.")
# Calculate RSI Bollinger Squeeze confirmation
# RSI column is now part of data_bb
if 'RSI' in data_bb.columns and not data_bb['RSI'].isna().all():
oversold_rsi, overbought_rsi = self.rsi_bollinger_confirmation(data_bb['RSI'])
else:
oversold_rsi = pd.Series(False, index=data_bb.index)
overbought_rsi = pd.Series(False, index=data_bb.index)
if self.logging is not None and ('RSI' not in data_bb.columns or data_bb['RSI'].isna().all()):
self.logging.warning("RSI data not available or all NaN. RSI Bollinger Squeeze will not be triggered.")
# Calculate conditions for sideways market (Mean Reversion)
if sideways_mask.any():
sideways_buy = (data_bb['close'] <= data_bb['LowerBand']) & (data_bb['RSI'] <= 40)
sideways_sell = (data_bb['close'] >= data_bb['UpperBand']) & (data_bb['RSI'] >= 60)
# Add enhanced confirmation for sideways markets
if self.config.get("SqueezeStrategy", False):
sideways_buy = sideways_buy & oversold_rsi & volume_contraction
sideways_sell = sideways_sell & overbought_rsi & volume_contraction
# Apply only where market is sideways and data is valid
buy_condition = buy_condition | (sideways_buy & sideways_mask & valid_data_mask)
sell_condition = sell_condition | (sideways_sell & sideways_mask & valid_data_mask)
# Calculate conditions for trending market (Breakout Mode)
if trending_mask.any():
trending_buy = (data_bb['close'] < data_bb['LowerBand']) & (data_bb['RSI'] < 50) & volume_spike
trending_sell = (data_bb['close'] > data_bb['UpperBand']) & (data_bb['RSI'] > 50) & volume_spike
# Add enhanced confirmation for trending markets
if self.config.get("SqueezeStrategy", False):
trending_buy = trending_buy & oversold_rsi
trending_sell = trending_sell & overbought_rsi
# Apply only where market is trending and data is valid
buy_condition = buy_condition | (trending_buy & trending_mask & valid_data_mask)
sell_condition = sell_condition | (trending_sell & trending_mask & valid_data_mask)
# Add buy/sell conditions as columns to the DataFrame
data_bb['BuySignal'] = buy_condition
data_bb['SellSignal'] = sell_condition
return data_bb
# Helper functions for CryptoTradingStrategy
def _volume_confirmation_crypto(self, current_volume, volume_ma):
"""Check volume surge against moving average for crypto strategy"""
if pd.isna(current_volume) or pd.isna(volume_ma) or volume_ma == 0:
return False
return current_volume > 1.5 * volume_ma
def _multi_timeframe_signal_crypto(self, current_price, rsi_value,
lower_band_15m, lower_band_1h,
upper_band_15m, upper_band_1h):
"""Generate signals with multi-timeframe confirmation for crypto strategy"""
# Ensure all inputs are not NaN before making comparisons
if any(pd.isna(val) for val in [current_price, rsi_value, lower_band_15m, lower_band_1h, upper_band_15m, upper_band_1h]):
return False, False
buy_signal = (current_price <= lower_band_15m and
current_price <= lower_band_1h and
rsi_value < 35)
sell_signal = (current_price >= upper_band_15m and
current_price >= upper_band_1h and
rsi_value > 65)
return buy_signal, sell_signal
def CryptoTradingStrategy(self, data):
"""Core trading algorithm with risk management
- Multi-Timeframe Confirmation: Combines 15-minute and 1-hour Bollinger Bands
- Adaptive Volatility Filtering: Uses ATR for dynamic stop-loss/take-profit
- Volume Spike Detection: Requires 1.5× average volume for confirmation
- EMA-Smoothed RSI: Reduces false signals in choppy markets
- Regime-Adaptive Parameters:
- Trending: 2σ bands, RSI 35/65 thresholds
- Sideways: 1.8σ bands, RSI 40/60 thresholds
- Strategy Logic:
- Long Entry: Price ≤ both 15m & 1h lower bands + RSI < 35 + Volume surge
- Short Entry: Price ≥ both 15m & 1h upper bands + RSI > 65 + Volume surge
- Exit: 2:1 risk-reward ratio with ATR-based stops
"""
if data.empty or 'close' not in data.columns or 'volume' not in data.columns:
if self.logging:
self.logging.warning("CryptoTradingStrategy: Input data is empty or missing 'close'/'volume' columns.")
return pd.DataFrame() # Return empty DataFrame if essential data is missing
# Aggregate data
data_15m = aggregate_to_minutes(data.copy(), 15)
data_1h = aggregate_to_hourly(data.copy(), 1)
if data_15m.empty or data_1h.empty:
if self.logging:
self.logging.warning("CryptoTradingStrategy: Not enough data for 15m or 1h aggregation.")
return pd.DataFrame() # Return original data if aggregation fails
# --- Calculate indicators for 15m timeframe ---
# Ensure 'close' and 'volume' exist before trying to access them
if 'close' not in data_15m.columns or 'volume' not in data_15m.columns:
if self.logging: self.logging.warning("CryptoTradingStrategy: 15m data missing close or volume.")
return data # Or an empty DF
price_data_15m = data_15m['close']
volume_data_15m = data_15m['volume']
upper_15m, sma_15m, lower_15m = BollingerBands.calculate_custom_bands(price_data_15m, window=20, num_std=2, min_periods=1)
# Use the static method from RSI class
rsi_15m = RSI.calculate_custom_rsi(price_data_15m, window=14, smoothing='EMA')
volume_ma_15m = volume_data_15m.rolling(window=20, min_periods=1).mean()
# Add 15m indicators to data_15m DataFrame
data_15m['UpperBand_15m'] = upper_15m
data_15m['SMA_15m'] = sma_15m
data_15m['LowerBand_15m'] = lower_15m
data_15m['RSI_15m'] = rsi_15m
data_15m['VolumeMA_15m'] = volume_ma_15m
# --- Calculate indicators for 1h timeframe ---
if 'close' not in data_1h.columns:
if self.logging: self.logging.warning("CryptoTradingStrategy: 1h data missing close.")
return data_15m # Return 15m data as 1h failed
price_data_1h = data_1h['close']
# Use the static method from BollingerBands class, setting min_periods to 1 explicitly
upper_1h, _, lower_1h = BollingerBands.calculate_custom_bands(price_data_1h, window=50, num_std=1.8, min_periods=1)
# Add 1h indicators to a temporary DataFrame to be merged
df_1h_indicators = pd.DataFrame(index=data_1h.index)
df_1h_indicators['UpperBand_1h'] = upper_1h
df_1h_indicators['LowerBand_1h'] = lower_1h
# Merge 1h indicators into 15m DataFrame
# Use reindex and ffill to propagate 1h values to 15m intervals
data_15m = pd.merge(data_15m, df_1h_indicators, left_index=True, right_index=True, how='left')
data_15m['UpperBand_1h'] = data_15m['UpperBand_1h'].ffill()
data_15m['LowerBand_1h'] = data_15m['LowerBand_1h'].ffill()
# --- Generate Signals ---
buy_signals = pd.Series(False, index=data_15m.index)
sell_signals = pd.Series(False, index=data_15m.index)
stop_loss_levels = pd.Series(np.nan, index=data_15m.index)
take_profit_levels = pd.Series(np.nan, index=data_15m.index)
# ATR calculation needs a rolling window, apply to 'high', 'low', 'close' if available
# Using a simplified ATR for now: std of close prices over the last 4 15-min periods (1 hour)
if 'close' in data_15m.columns:
atr_series = price_data_15m.rolling(window=4, min_periods=1).std()
else:
atr_series = pd.Series(0, index=data_15m.index) # No ATR if close is missing
for i in range(len(data_15m)):
if i == 0: continue # Skip first row for volume_ma_15m[i-1]
current_price = data_15m['close'].iloc[i]
current_volume = data_15m['volume'].iloc[i]
rsi_val = data_15m['RSI_15m'].iloc[i]
lb_15m = data_15m['LowerBand_15m'].iloc[i]
ub_15m = data_15m['UpperBand_15m'].iloc[i]
lb_1h = data_15m['LowerBand_1h'].iloc[i]
ub_1h = data_15m['UpperBand_1h'].iloc[i]
vol_ma = data_15m['VolumeMA_15m'].iloc[i-1] # Use previous period's MA
atr = atr_series.iloc[i]
vol_confirm = self._volume_confirmation_crypto(current_volume, vol_ma)
buy_signal, sell_signal = self._multi_timeframe_signal_crypto(
current_price, rsi_val, lb_15m, lb_1h, ub_15m, ub_1h
)
if buy_signal and vol_confirm:
buy_signals.iloc[i] = True
if not pd.isna(atr) and atr > 0:
stop_loss_levels.iloc[i] = current_price - 2 * atr
take_profit_levels.iloc[i] = current_price + 4 * atr
elif sell_signal and vol_confirm:
sell_signals.iloc[i] = True
if not pd.isna(atr) and atr > 0:
stop_loss_levels.iloc[i] = current_price + 2 * atr
take_profit_levels.iloc[i] = current_price - 4 * atr
data_15m['BuySignal'] = buy_signals
data_15m['SellSignal'] = sell_signals
data_15m['StopLoss'] = stop_loss_levels
data_15m['TakeProfit'] = take_profit_levels
return data_15m

View File

@@ -1,12 +1,31 @@
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from cycles.supertrend import Supertrends
from cycles.market_fees import MarketFees from cycles.market_fees import MarketFees
class Backtest: class Backtest:
class Data:
def __init__(self, initial_usd, df, min1_df, init_strategy_fields) -> None:
self.initial_usd = initial_usd
self.usd = initial_usd
self.max_balance = initial_usd
self.coin = 0
self.position = 0
self.entry_price = 0
self.entry_time = None
self.current_trade_min1_start_idx = None
self.current_min1_end_idx = None
self.price_open = None
self.price_close = None
self.current_date = None
self.strategies = {}
self.df = df
self.min1_df = min1_df
self = init_strategy_fields(self)
@staticmethod @staticmethod
def run(min1_df, df, initial_usd, stop_loss_pct, debug=False): def run(data, entry_strategy, exit_strategy, debug=False):
""" """
Backtest a simple strategy using the meta supertrend (all three supertrends agree). Backtest a simple strategy using the meta supertrend (all three supertrends agree).
Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss. Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss.
@@ -17,84 +36,43 @@ class Backtest:
- stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%) - stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%)
- debug: bool, whether to print debug info - debug: bool, whether to print debug info
""" """
_df = df.copy().reset_index(drop=True)
_df['timestamp'] = pd.to_datetime(_df['timestamp'])
supertrends = Supertrends(_df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
position = 0 # 0 = no position, 1 = long
entry_price = 0
usd = initial_usd
coin = 0
trade_log = [] trade_log = []
max_balance = initial_usd
drawdowns = [] drawdowns = []
trades = [] trades = []
entry_time = None
current_trade_min1_start_idx = None
min1_df['timestamp'] = pd.to_datetime(min1_df.index) for i in range(1, len(data.df)):
data.price_open = data.df['open'].iloc[i]
data.price_close = data.df['close'].iloc[i]
for i in range(1, len(_df)): data.current_date = data.df['timestamp'].iloc[i]
price_open = _df['open'].iloc[i]
price_close = _df['close'].iloc[i]
date = _df['timestamp'].iloc[i]
prev_mt = meta_trend[i-1]
curr_mt = meta_trend[i]
# Check stop loss if in position if data.position == 0:
if position == 1: if entry_strategy(data, i):
stop_loss_result = Backtest.check_stop_loss( data, entry_log_entry = Backtest.handle_entry(data)
min1_df, trade_log.append(entry_log_entry)
entry_time, elif data.position == 1:
date, exit_test_results, data, sell_price = exit_strategy(data, i)
entry_price,
stop_loss_pct,
coin,
usd,
debug,
current_trade_min1_start_idx
)
if stop_loss_result is not None:
trade_log_entry, current_trade_min1_start_idx, position, coin, entry_price = stop_loss_result
trade_log.append(trade_log_entry)
continue
# Update the start index for next check
current_trade_min1_start_idx = min1_df.index[min1_df.index <= date][-1]
# Entry: only if not in position and signal changes to 1 if exit_test_results is not None:
if position == 0 and prev_mt != 1 and curr_mt == 1: data, exit_log_entry = Backtest.handle_exit(data, exit_test_results, sell_price)
entry_result = Backtest.handle_entry(usd, price_open, date) trade_log.append(exit_log_entry)
coin, entry_price, entry_time, usd, position, trade_log_entry = entry_result
trade_log.append(trade_log_entry)
# Exit: only if in position and signal changes from 1 to -1
elif position == 1 and prev_mt == 1 and curr_mt == -1:
exit_result = Backtest.handle_exit(coin, price_open, entry_price, entry_time, date)
usd, coin, position, entry_price, trade_log_entry = exit_result
trade_log.append(trade_log_entry)
# Track drawdown # Track drawdown
balance = usd if position == 0 else coin * price_close balance = data.usd if data.position == 0 else data.coin * data.price_close
if balance > max_balance:
max_balance = balance if balance > data.max_balance:
drawdown = (max_balance - balance) / max_balance data.max_balance = balance
drawdown = (data.max_balance - balance) / data.max_balance
drawdowns.append(drawdown) drawdowns.append(drawdown)
# If still in position at end, sell at last close # If still in position at end, sell at last close
if position == 1: if data.position == 1:
exit_result = Backtest.handle_exit(coin, _df['close'].iloc[-1], entry_price, entry_time, _df['timestamp'].iloc[-1]) data, exit_log_entry = Backtest.handle_exit(data, "EOD", None)
usd, coin, position, entry_price, trade_log_entry = exit_result trade_log.append(exit_log_entry)
trade_log.append(trade_log_entry)
# Calculate statistics # Calculate statistics
final_balance = usd final_balance = data.usd
n_trades = len(trade_log) n_trades = len(trade_log)
wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']] wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']]
win_rate = len(wins) / n_trades if n_trades > 0 else 0 win_rate = len(wins) / n_trades if n_trades > 0 else 0
@@ -114,14 +92,14 @@ class Backtest:
'entry': trade['entry'], 'entry': trade['entry'],
'exit': trade['exit'], 'exit': trade['exit'],
'profit_pct': profit_pct, 'profit_pct': profit_pct,
'type': trade.get('type', 'SELL'), 'type': trade['type'],
'fee_usd': trade.get('fee_usd') 'fee_usd': trade['fee_usd']
}) })
fee_usd = trade.get('fee_usd') fee_usd = trade.get('fee_usd')
total_fees_usd += fee_usd total_fees_usd += fee_usd
results = { results = {
"initial_usd": initial_usd, "initial_usd": data.initial_usd,
"final_usd": final_balance, "final_usd": final_balance,
"n_trades": n_trades, "n_trades": n_trades,
"win_rate": win_rate, "win_rate": win_rate,
@@ -143,74 +121,45 @@ class Backtest:
return results return results
@staticmethod @staticmethod
def check_stop_loss(min1_df, entry_time, date, entry_price, stop_loss_pct, coin, usd, debug, current_trade_min1_start_idx): def handle_entry(data):
stop_price = entry_price * (1 - stop_loss_pct) entry_fee = MarketFees.calculate_okx_taker_maker_fee(data.usd, is_maker=False)
usd_after_fee = data.usd - entry_fee
if current_trade_min1_start_idx is None: data.coin = usd_after_fee / data.price_open
current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0] data.entry_price = data.price_open
current_min1_end_idx = min1_df.index[min1_df.index <= date][-1] data.entry_time = data.current_date
data.usd = 0
data.position = 1
# Check all 1-minute candles in between for stop loss
min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx]
if (min1_slice['low'] <= stop_price).any():
# Stop loss triggered, find the exact candle
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
# More realistic fill: if open < stop, fill at open, else at stop
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
if debug:
print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}")
btc_to_sell = coin
usd_gross = btc_to_sell * sell_price
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
trade_log_entry = {
'type': 'STOP',
'entry': entry_price,
'exit': sell_price,
'entry_time': entry_time,
'exit_time': stop_candle.name,
'fee_usd': exit_fee
}
# After stop loss, reset position and entry
return trade_log_entry, None, 0, 0, 0
return None
@staticmethod
def handle_entry(usd, price_open, date):
entry_fee = MarketFees.calculate_okx_taker_maker_fee(usd, is_maker=False)
usd_after_fee = usd - entry_fee
coin = usd_after_fee / price_open
entry_price = price_open
entry_time = date
usd = 0
position = 1
trade_log_entry = { trade_log_entry = {
'type': 'BUY', 'type': 'BUY',
'entry': entry_price, 'entry': data.entry_price,
'exit': None, 'exit': None,
'entry_time': entry_time, 'entry_time': data.entry_time,
'exit_time': None, 'exit_time': None,
'fee_usd': entry_fee 'fee_usd': entry_fee
} }
return coin, entry_price, entry_time, usd, position, trade_log_entry return data, trade_log_entry
@staticmethod @staticmethod
def handle_exit(coin, price_open, entry_price, entry_time, date): def handle_exit(data, exit_reason, sell_price):
btc_to_sell = coin btc_to_sell = data.coin
usd_gross = btc_to_sell * price_open exit_price = sell_price if sell_price is not None else data.price_open
usd_gross = btc_to_sell * exit_price
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False) exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
usd = usd_gross - exit_fee
trade_log_entry = { data.usd = usd_gross - exit_fee
'type': 'SELL',
'entry': entry_price, exit_log_entry = {
'exit': price_open, 'type': exit_reason,
'entry_time': entry_time, 'entry': data.entry_price,
'exit_time': date, 'exit': exit_price,
'entry_time': data.entry_time,
'exit_time': data.current_date,
'fee_usd': exit_fee 'fee_usd': exit_fee
} }
coin = 0 data.coin = 0
position = 0 data.position = 0
entry_price = 0 data.entry_price = 0
return usd, coin, position, entry_price, trade_log_entry
return data, exit_log_entry

View File

@@ -2,6 +2,6 @@ import pandas as pd
class MarketFees: class MarketFees:
@staticmethod @staticmethod
def calculate_okx_taker_maker_fee(amount, is_maker=True): def calculate_okx_taker_maker_fee(amount, is_maker=True) -> float:
fee_rate = 0.0008 if is_maker else 0.0010 fee_rate = 0.0008 if is_maker else 0.0010
return amount * fee_rate return amount * fee_rate

View File

@@ -1,5 +1,80 @@
import pandas as pd import pandas as pd
def check_data(data_df: pd.DataFrame) -> bool:
"""
Checks if the input DataFrame has a DatetimeIndex.
Args:
data_df (pd.DataFrame): DataFrame to check.
Returns:
bool: True if the DataFrame has a DatetimeIndex, False otherwise.
"""
if not isinstance(data_df.index, pd.DatetimeIndex):
print("Warning: Input DataFrame must have a DatetimeIndex.")
return False
agg_rules = {}
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.")
return False
return agg_rules
def aggregate_to_weekly(data_df: pd.DataFrame, weeks: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to weekly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the week.
'close' will be the last 'close' price of the week.
'high' will be the maximum 'high' price of the week.
'low' will be the minimum 'low' price of the week.
'volume' (if present) will be the sum of volumes for the week.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
weeks (int): The number of weeks to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to weekly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the week.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for weekly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to weekly frequency and apply aggregation rules
weekly_data = data_df.resample(f'{weeks}W').agg(agg_rules)
weekly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the week
if not weekly_data.empty and isinstance(weekly_data.index, pd.DatetimeIndex):
weekly_data.index = weekly_data.index.floor('W')
return weekly_data
def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame: def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
""" """
Aggregates time-series financial data to daily OHLCV format. Aggregates time-series financial data to daily OHLCV format.
@@ -24,22 +99,8 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
Raises: Raises:
ValueError: If the input DataFrame does not have a DatetimeIndex. ValueError: If the input DataFrame does not have a DatetimeIndex.
""" """
if not isinstance(data_df.index, pd.DatetimeIndex):
raise ValueError("Input DataFrame must have a DatetimeIndex.")
agg_rules = {} agg_rules = check_data(data_df)
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
if not agg_rules: if not agg_rules:
# Log a warning or raise an error if no relevant columns are found # Log a warning or raise an error if no relevant columns are found
@@ -58,3 +119,81 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
daily_data.dropna(how='all', inplace=True) daily_data.dropna(how='all', inplace=True)
return daily_data return daily_data
def aggregate_to_hourly(data_df: pd.DataFrame, hours: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to hourly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the hour.
'close' will be the last 'close' price of the hour.
'high' will be the maximum 'high' price of the hour.
'low' will be the minimum 'low' price of the hour.
'volume' (if present) will be the sum of volumes for the hour.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
hours (int): The number of hours to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to hourly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the hour.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for hourly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to hourly frequency and apply aggregation rules
hourly_data = data_df.resample(f'{hours}h').agg(agg_rules)
hourly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the hour
if not hourly_data.empty and isinstance(hourly_data.index, pd.DatetimeIndex):
hourly_data.index = hourly_data.index.floor('h')
return hourly_data
def aggregate_to_minutes(data_df: pd.DataFrame, minutes: int) -> pd.DataFrame:
"""
Aggregates time-series financial data to N-minute OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the N-minute interval.
'close' will be the last 'close' price of the N-minute interval.
'high' will be the maximum 'high' price of the N-minute interval.
'low' will be the minimum 'low' price of the N-minute interval.
'volume' (if present) will be the sum of volumes for the N-minute interval.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
minutes (int): The number of minutes to aggregate to.
Returns:
pd.DataFrame: DataFrame aggregated to N-minute OHLCV data.
The index will be a DatetimeIndex.
Returns an empty DataFrame if no relevant OHLCV columns are found or
if the input DataFrame does not have a DatetimeIndex.
"""
agg_rules_obj = check_data(data_df) # check_data returns rules or False
if not agg_rules_obj:
# check_data already prints a warning if index is not DatetimeIndex or no OHLCV columns
# Ensure an empty DataFrame with a DatetimeIndex is returned for consistency
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to N-minute frequency and apply aggregation rules
# Using .agg(agg_rules_obj) where agg_rules_obj is the dict from check_data
resampled_data = data_df.resample(f'{minutes}min').agg(agg_rules_obj)
resampled_data.dropna(how='all', inplace=True)
return resampled_data

View File

@@ -8,6 +8,7 @@ The `Analysis` module includes classes for calculating common technical indicato
- **Relative Strength Index (RSI)**: Implemented in `cycles/Analysis/rsi.py`. - **Relative Strength Index (RSI)**: Implemented in `cycles/Analysis/rsi.py`.
- **Bollinger Bands**: Implemented in `cycles/Analysis/boillinger_band.py`. - **Bollinger Bands**: Implemented in `cycles/Analysis/boillinger_band.py`.
- Note: Trading strategies are detailed in `strategies.md`.
## Class: `RSI` ## Class: `RSI`
@@ -15,64 +16,91 @@ Found in `cycles/Analysis/rsi.py`.
Calculates the Relative Strength Index. Calculates the Relative Strength Index.
### Mathematical Model ### Mathematical Model
1. **Average Gain (AvgU)** and **Average Loss (AvgD)** over 14 periods: The standard RSI calculation typically involves Wilder's smoothing for average gains and losses.
1. **Price Change (Delta)**: Difference between consecutive closing prices.
2. **Gain and Loss**: Separate positive (gain) and negative (loss, expressed as positive) price changes.
3. **Average Gain (AvgU)** and **Average Loss (AvgD)**: Smoothed averages of gains and losses over the RSI period. Wilder's smoothing is a specific type of exponential moving average (EMA):
- Initial AvgU/AvgD: Simple Moving Average (SMA) over the first `period` values.
- Subsequent AvgU: `(Previous AvgU * (period - 1) + Current Gain) / period`
- Subsequent AvgD: `(Previous AvgD * (period - 1) + Current Loss) / period`
4. **Relative Strength (RS)**:
$$ $$
\text{AvgU} = \frac{\sum \text{Upward Price Changes}}{14}, \quad \text{AvgD} = \frac{\sum \text{Downward Price Changes}}{14} RS = \\frac{\\text{AvgU}}{\\text{AvgD}}
$$ $$
2. **Relative Strength (RS)**: 5. **RSI**:
$$ $$
RS = \frac{\text{AvgU}}{\text{AvgD}} RSI = 100 - \\frac{100}{1 + RS}
$$
3. **RSI**:
$$
RSI = 100 - \frac{100}{1 + RS}
$$ $$
Special conditions:
- If AvgD is 0: RSI is 100 if AvgU > 0, or 50 if AvgU is also 0 (neutral).
### `__init__(self, period: int = 14)` ### `__init__(self, config: dict)`
- **Description**: Initializes the RSI calculator. - **Description**: Initializes the RSI calculator.
- **Parameters**: - **Parameters**:\n - `config` (dict): Configuration dictionary. Must contain an `'rsi_period'` key with a positive integer value (e.g., `{'rsi_period': 14}`).
- `period` (int, optional): The period for RSI calculation. Defaults to 14. Must be a positive integer.
### `calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame` ### `calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame`
- **Description**: Calculates the RSI and adds it as an 'RSI' column to the input DataFrame. Handles cases where data length is less than the period by returning the original DataFrame with a warning. - **Description**: Calculates the RSI (using Wilder's smoothing by default) and adds it as an 'RSI' column to the input DataFrame. This method utilizes `calculate_custom_rsi` internally with `smoothing='EMA'`.
- **Parameters**:\n - `data_df` (pd.DataFrame): DataFrame with historical price data. Must contain the `price_column`.\n - `price_column` (str, optional): The name of the column containing price data. Defaults to 'close'.
- **Returns**: `pd.DataFrame` - A copy of the input DataFrame with an added 'RSI' column. If data length is insufficient for the period, the 'RSI' column will contain `np.nan`.
### `calculate_custom_rsi(price_series: pd.Series, window: int = 14, smoothing: str = 'SMA') -> pd.Series` (Static Method)
- **Description**: Calculates RSI with a specified window and smoothing method (SMA or EMA). This is the core calculation engine.
- **Parameters**: - **Parameters**:
- `data_df` (pd.DataFrame): DataFrame with historical price data. Must contain the `price_column`. - `price_series` (pd.Series): Series of prices.
- `price_column` (str, optional): The name of the column containing price data. Defaults to 'close'. - `window` (int, optional): The period for RSI calculation. Defaults to 14. Must be a positive integer.
- **Returns**: `pd.DataFrame` - The input DataFrame with an added 'RSI' column (containing `np.nan` for initial periods where RSI cannot be calculated). Returns a copy of the original DataFrame if the period is larger than the number of data points. - `smoothing` (str, optional): Smoothing method, can be 'SMA' (Simple Moving Average) or 'EMA' (Exponential Moving Average, specifically Wilder's smoothing when `alpha = 1/window`). Defaults to 'SMA'.
- **Returns**: `pd.Series` - Series containing the RSI values. Returns a series of NaNs if data length is insufficient.
## Class: `BollingerBands` ## Class: `BollingerBands`
Found in `cycles/Analysis/boillinger_band.py`. Found in `cycles/Analysis/boillinger_band.py`.
## **Bollinger Bands** Calculates Bollinger Bands.
### Mathematical Model ### Mathematical Model
1. **Middle Band**: 20-day Simple Moving Average (SMA) 1. **Middle Band**: Simple Moving Average (SMA) over `period`.
$$ $$
\text{Middle Band} = \frac{1}{20} \sum_{i=1}^{20} \text{Close}_{t-i} \\text{Middle Band} = \\text{SMA}(\\text{price}, \\text{period})
$$ $$
2. **Upper Band**: Middle Band + 2 × 20-day Standard Deviation (σ) 2. **Standard Deviation (σ)**: Standard deviation of price over `period`.
3. **Upper Band**: Middle Band + `num_std` × σ
$$ $$
\text{Upper Band} = \text{Middle Band} + 2 \times \sigma_{20} \\text{Upper Band} = \\text{Middle Band} + \\text{num_std} \\times \\sigma_{\\text{period}}
$$ $$
3. **Lower Band**: Middle Band 2 × 20-day Standard Deviation (σ) 4. **Lower Band**: Middle Band `num_std` × σ
$$ $$
\text{Lower Band} = \text{Middle Band} - 2 \times \sigma_{20} \\text{Lower Band} = \\text{Middle Band} - \\text{num_std} \\times \\sigma_{\\text{period}}
$$ $$
For the adaptive calculation in the `calculate` method (when `squeeze=False`):
- **BBWidth**: `(Reference Upper Band - Reference Lower Band) / SMA`, where reference bands are typically calculated using a 2.0 standard deviation multiplier.
- **MarketRegime**: Determined by comparing `BBWidth` to a threshold from the configuration. `1` for sideways, `0` for trending.
- The `num_std` used for the final Upper and Lower Bands then varies based on this `MarketRegime` and the `bb_std_dev_multiplier` values for "trending" and "sideways" markets from the configuration, applied row-wise.
### `__init__(self, config: dict)`
### `__init__(self, period: int = 20, std_dev_multiplier: float = 2.0)`
- **Description**: Initializes the BollingerBands calculator. - **Description**: Initializes the BollingerBands calculator.
- **Parameters**: - **Parameters**:\n - `config` (dict): Configuration dictionary. It must contain:
- `period` (int, optional): The period for the moving average and standard deviation. Defaults to 20. Must be a positive integer. - `'bb_period'` (int): Positive integer for the moving average and standard deviation period.
- `std_dev_multiplier` (float, optional): The number of standard deviations for the upper and lower bands. Defaults to 2.0. Must be positive. - `'trending'` (dict): Containing `'bb_std_dev_multiplier'` (float, positive) for trending markets.
- `'sideways'` (dict): Containing `'bb_std_dev_multiplier'` (float, positive) for sideways markets.
- `'bb_width'` (float): Positive float threshold for determining market regime.
### `calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame` ### `calculate(self, data_df: pd.DataFrame, price_column: str = 'close', squeeze: bool = False) -> pd.DataFrame`
- **Description**: Calculates Bollinger Bands and adds 'SMA' (Simple Moving Average), 'UpperBand', and 'LowerBand' columns to the DataFrame. - **Description**: Calculates Bollinger Bands and adds relevant columns to the DataFrame.
- If `squeeze` is `False` (default): Calculates adaptive Bollinger Bands. It determines the market regime (trending/sideways) based on `BBWidth` and applies different standard deviation multipliers (from the `config`) on a row-by-row basis. Adds 'SMA', 'UpperBand', 'LowerBand', 'BBWidth', and 'MarketRegime' columns.
- If `squeeze` is `True`: Calculates simpler Bollinger Bands with a fixed window of 14 and a standard deviation multiplier of 1.5 by calling `calculate_custom_bands`. Adds 'SMA', 'UpperBand', 'LowerBand' columns; 'BBWidth' and 'MarketRegime' will be `NaN`.
- **Parameters**:\n - `data_df` (pd.DataFrame): DataFrame with price data. Must include the `price_column`.\n - `price_column` (str, optional): The name of the column containing the price data. Defaults to 'close'.\n - `squeeze` (bool, optional): If `True`, calculates bands with fixed parameters (window 14, std 1.5). Defaults to `False`.
- **Returns**: `pd.DataFrame` - A copy of the original DataFrame with added Bollinger Band related columns.
### `calculate_custom_bands(price_series: pd.Series, window: int = 20, num_std: float = 2.0, min_periods: int = None) -> tuple[pd.Series, pd.Series, pd.Series]` (Static Method)
- **Description**: Calculates Bollinger Bands with a specified window, standard deviation multiplier, and minimum periods.
- **Parameters**: - **Parameters**:
- `data_df` (pd.DataFrame): DataFrame with price data. Must include the `price_column`. - `price_series` (pd.Series): Series of prices.
- `price_column` (str, optional): The name of the column containing the price data (e.g., 'close'). Defaults to 'close'. - `window` (int, optional): The period for the moving average and standard deviation. Defaults to 20.
- **Returns**: `pd.DataFrame` - The original DataFrame with added columns: 'SMA', 'UpperBand', 'LowerBand'. - `num_std` (float, optional): The number of standard deviations for the upper and lower bands. Defaults to 2.0.
- `min_periods` (int, optional): Minimum number of observations in window required to have a value. Defaults to `window` if `None`.
- **Returns**: `tuple[pd.Series, pd.Series, pd.Series]` - A tuple containing the Upper band, SMA, and Lower band series.

98
docs/strategies.md Normal file
View File

@@ -0,0 +1,98 @@
# Trading Strategies (`cycles/Analysis/strategies.py`)
This document outlines the trading strategies implemented within the `Strategy` class. These strategies utilize technical indicators calculated by other classes in the `Analysis` module.
## Class: `Strategy`
Manages and executes different trading strategies.
### `__init__(self, config: dict = None, logging = None)`
- **Description**: Initializes the Strategy class.
- **Parameters**:
- `config` (dict, optional): Configuration dictionary containing parameters for various indicators and strategy settings. Must be provided if strategies requiring config are used.
- `logging` (logging.Logger, optional): Logger object for outputting messages. Defaults to `None`.
### `run(self, data: pd.DataFrame, strategy_name: str) -> pd.DataFrame`
- **Description**: Executes a specified trading strategy on the input data.
- **Parameters**:
- `data` (pd.DataFrame): Input DataFrame containing at least price data (e.g., 'close', 'volume'). Specific strategies might require other columns or will calculate them.
- `strategy_name` (str): The name of the strategy to run. Supported names include:
- `"MarketRegimeStrategy"`
- `"CryptoTradingStrategy"`
- `"no_strategy"` (or any other unrecognized name will default to this)
- **Returns**: `pd.DataFrame` - A DataFrame containing the original data augmented with indicator values, and `BuySignal` and `SellSignal` (boolean) columns specific to the executed strategy. The structure of the DataFrame (e.g., daily, 15-minute) depends on the strategy.
### `no_strategy(self, data: pd.DataFrame) -> pd.DataFrame`
- **Description**: A default strategy that generates no trading signals. It can serve as a baseline or placeholder.
- **Parameters**:
- `data` (pd.DataFrame): Input data DataFrame.
- **Returns**: `pd.DataFrame` - The input DataFrame with `BuySignal` and `SellSignal` columns added, both containing all `False` values.
---
## Implemented Strategies
### 1. `MarketRegimeStrategy`
- **Description**: An adaptive strategy that combines Bollinger Bands and RSI, adjusting its parameters based on detected market regimes (trending vs. sideways). It operates on daily aggregated data (aggregation is performed internally).
- **Core Logic**:
- Calculates Bollinger Bands (using `BollingerBands` class) with adaptive standard deviation multipliers based on `MarketRegime` (derived from `BBWidth`).
- Calculates RSI (using `RSI` class).
- **Trending Market (Breakout Mode)**:
- Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike.
- Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike.
- **Sideways Market (Mean Reversion)**:
- Buy: Price ≤ Lower Band ∧ RSI ≤ 40.
- Sell: Price ≥ Upper Band ∧ RSI ≥ 60.
- **Squeeze Confirmation** (if `config["SqueezeStrategy"]` is `True`):
- Requires additional confirmation from RSI Bollinger Bands (calculated by `rsi_bollinger_confirmation` helper method).
- Sideways markets also check for volume contraction.
- **Key Configuration Parameters (from `config` dict)**:
- `bb_period`, `bb_width`
- `trending['bb_std_dev_multiplier']`, `trending['rsi_threshold']`
- `sideways['bb_std_dev_multiplier']`, `sideways['rsi_threshold']`
- `rsi_period`
- `SqueezeStrategy` (boolean)
- **Output DataFrame Columns (Daily)**: Includes input columns plus `SMA`, `UpperBand`, `LowerBand`, `BBWidth`, `MarketRegime`, `RSI`, `BuySignal`, `SellSignal`.
#### `rsi_bollinger_confirmation(self, rsi: pd.Series, window: int = 14, std_mult: float = 1.5) -> tuple`
- **Description** (Helper for `MarketRegimeStrategy`): Calculates Bollinger Bands on RSI values for signal confirmation.
- **Parameters**:
- `rsi` (pd.Series): Series containing RSI values.
- `window` (int, optional): The period for the moving average. Defaults to 14.
- `std_mult` (float, optional): Standard deviation multiplier for bands. Defaults to 1.5.
- **Returns**: `tuple` - (oversold_condition, overbought_condition) as pandas Series (boolean).
### 2. `CryptoTradingStrategy`
- **Description**: A multi-timeframe strategy primarily designed for volatile assets like cryptocurrencies. It aggregates input data into 15-minute and 1-hour intervals for analysis.
- **Core Logic**:
- Aggregates data to 15-minute (`data_15m`) and 1-hour (`data_1h`) resolutions using `aggregate_to_minutes` and `aggregate_to_hourly` from `data_utils.py`.
- Calculates 15-minute Bollinger Bands (20-period, 2 std dev) and 15-minute EMA-smoothed RSI (14-period) using `BollingerBands.calculate_custom_bands` and `RSI.calculate_custom_rsi`.
- Calculates 1-hour Bollinger Bands (50-period, 1.8 std dev) using `BollingerBands.calculate_custom_bands`.
- **Signal Generation (on 15m timeframe)**:
- Buy Signal: Price ≤ Lower 15m Band ∧ Price ≤ Lower 1h Band ∧ RSI_15m < 35 ∧ Volume Confirmation.
- Sell Signal: Price ≥ Upper 15m Band ∧ Price ≥ Upper 1h Band ∧ RSI_15m > 65 ∧ Volume Confirmation.
- **Volume Confirmation**: Current 15m volume > 1.5 × 20-period MA of 15m volume.
- **Risk Management**: Calculates `StopLoss` and `TakeProfit` levels based on a simplified ATR (standard deviation of 15m close prices over the last 4 periods).
- Buy: SL = Price - 2 * ATR; TP = Price + 4 * ATR
- Sell: SL = Price + 2 * ATR; TP = Price - 4 * ATR
- **Key Configuration Parameters**: While this strategy uses fixed parameters for its core indicator calculations, the `config` object passed to the `Strategy` class might be used by helper functions or for future extensions (though not heavily used by the current `CryptoTradingStrategy` logic itself for primary indicator settings).
- **Output DataFrame Columns (15-minute)**: Includes resampled 15m OHLCV, plus `UpperBand_15m`, `SMA_15m`, `LowerBand_15m`, `RSI_15m`, `VolumeMA_15m`, `UpperBand_1h` (forward-filled), `LowerBand_1h` (forward-filled), `BuySignal`, `SellSignal`, `StopLoss`, `TakeProfit`.
---
## General Strategy Concepts (from previous high-level notes)
While the specific implementations above have their own detailed logic, some general concepts that often inspire trading strategies include:
- **Adaptive Parameters**: Adjusting indicator settings (like Bollinger Band width or RSI thresholds) based on market conditions (e.g., trending vs. sideways).
- **Multi-Timeframe Analysis**: Confirming signals on one timeframe with trends or levels on another (e.g., 15-minute signals confirmed by 1-hour context).
- **Volume Confirmation**: Using volume spikes or contractions to validate price-based signals.
- **Volatility-Adjusted Risk Management**: Using measures like ATR (Average True Range) to set stop-loss and take-profit levels, or to size positions dynamically.
These concepts are partially reflected in the implemented strategies, particularly in `MarketRegimeStrategy` (adaptive parameters) and `CryptoTradingStrategy` (multi-timeframe, volume confirmation, ATR-based risk levels).

109
main.py
View File

@@ -6,11 +6,11 @@ import os
import datetime import datetime
import argparse import argparse
import json import json
import ast
from cycles.utils.storage import Storage from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils from cycles.utils.system import SystemUtils
from cycles.backtest import Backtest from cycles.backtest import Backtest
from cycles.Analysis.supertrend import Supertrends
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -21,6 +21,68 @@ logging.basicConfig(
] ]
) )
def default_init_strategy(data: Backtest.Data) -> Backtest.Data:
supertrends = Supertrends(data.df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
data.strategies["meta_trend"] = meta_trend
return data
def default_entry_strategy(data, df_index):
return data.strategies["meta_trend"][df_index - 1] != 1 and data.strategies["meta_trend"][df_index] == 1
def stop_loss_strategy(data):
stop_price = data.entry_price * (1 - data.strategies["stop_loss_pct"])
# Ensure index is sorted and is a DatetimeIndex
min1_index = data.min1_df.index
# Find the first index >= entry_time
start_candidates = min1_index[min1_index >= data.entry_time]
data.current_trade_min1_start_idx = start_candidates[0]
# Find the last index <= current_date
end_candidates = min1_index[min1_index <= data.current_date]
if len(end_candidates) == 0:
print("Warning: no end candidate here. Need to be checked")
return False, None
data.current_min1_end_idx = end_candidates[-1]
min1_slice = data.min1_df.loc[data.current_trade_min1_start_idx:data.current_min1_end_idx]
# print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}")
# print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}")
if (min1_slice['low'] <= stop_price).any():
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
return True, sell_price
return False, None
def default_exit_strategy(data: Backtest.Data, df_index):
if data.strategies["meta_trend"][df_index - 1] != 1 and \
data.strategies["meta_trend"][df_index] == -1:
return "META_TREND_EXIT_SIGNAL", data, None
stop_loss_result, sell_price = stop_loss_strategy(data)
if stop_loss_result:
data.strategies["current_trade_min1_start_idx"] = \
data.min1_df.index[data.min1_df.index <= data.current_date][-1]
return "STOP_LOSS", data, sell_price
return None, data, None
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False): def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
"""Process the entire timeframe with all stop loss values (no monthly split)""" """Process the entire timeframe with all stop loss values (no monthly split)"""
df = df.copy().reset_index(drop=True) df = df.copy().reset_index(drop=True)
@@ -28,13 +90,17 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
results_rows = [] results_rows = []
trade_rows = [] trade_rows = []
min1_df['timestamp'] = pd.to_datetime(min1_df.index) # need ?
for stop_loss_pct in stop_loss_pcts: for stop_loss_pct in stop_loss_pcts:
data = Backtest.Data(initial_usd, df, min1_df, default_init_strategy)
data.strategies["stop_loss_pct"] = stop_loss_pct
results = Backtest.run( results = Backtest.run(
min1_df, data,
df, default_entry_strategy,
initial_usd=initial_usd, default_exit_strategy,
stop_loss_pct=stop_loss_pct, debug
debug=debug
) )
n_trades = results["n_trades"] n_trades = results["n_trades"]
trades = results.get('trades', []) trades = results.get('trades', [])
@@ -48,22 +114,29 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
cumulative_profit = 0 cumulative_profit = 0
max_drawdown = 0 max_drawdown = 0
peak = 0 peak = 0
for trade in trades: for trade in trades:
cumulative_profit += trade['profit_pct'] cumulative_profit += trade['profit_pct']
if cumulative_profit > peak: if cumulative_profit > peak:
peak = cumulative_profit peak = cumulative_profit
drawdown = peak - cumulative_profit drawdown = peak - cumulative_profit
if drawdown > max_drawdown: if drawdown > max_drawdown:
max_drawdown = drawdown max_drawdown = drawdown
final_usd = initial_usd final_usd = initial_usd
for trade in trades: for trade in trades:
final_usd *= (1 + trade['profit_pct']) final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades) total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
row = { row = {
"timeframe": rule_name, "timeframe": rule_name,
"stop_loss_pct": stop_loss_pct, "stop_loss_pct": stop_loss_pct,
"n_trades": n_trades, "n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'), "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
"win_rate": win_rate, "win_rate": win_rate,
"max_drawdown": max_drawdown, "max_drawdown": max_drawdown,
"avg_trade": avg_trade, "avg_trade": avg_trade,
@@ -75,6 +148,7 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"total_fees_usd": total_fees_usd, "total_fees_usd": total_fees_usd,
} }
results_rows.append(row) results_rows.append(row)
for trade in trades: for trade in trades:
trade_rows.append({ trade_rows.append({
"timeframe": rule_name, "timeframe": rule_name,
@@ -88,20 +162,18 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"fee_usd": trade.get("fee_usd"), "fee_usd": trade.get("fee_usd"),
}) })
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}") logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
if debug: if debug:
for trade in trades: for trade in trades:
if trade['type'] == 'STOP': print(trade)
print(trade)
for trade in trades:
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
print("Large loss trade:", trade)
return results_rows, trade_rows return results_rows, trade_rows
def process(timeframe_info, debug=False): def process(timeframe_info, debug=False):
"""Process a single (timeframe, stop_loss_pct) combination (no monthly split)""" """Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
if rule == "1T": if rule == "1min":
df = data_1min.copy() df = data_1min.copy()
else: else:
df = data_1min.resample(rule).agg({ df = data_1min.resample(rule).agg({
@@ -163,7 +235,7 @@ def get_nearest_price(df, target_date):
return nearest_time, price return nearest_time, price
if __name__ == "__main__": if __name__ == "__main__":
debug = True debug = False
parser = argparse.ArgumentParser(description="Run backtest with config file.") parser = argparse.ArgumentParser(description="Run backtest with config file.")
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.") parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
@@ -174,14 +246,14 @@ if __name__ == "__main__":
"start_date": "2024-05-15", "start_date": "2024-05-15",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'), "stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"initial_usd": 10000, "initial_usd": 10000,
"timeframes": ["1D"], "timeframes": ["15min"],
"stop_loss_pcts": [0.01, 0.02, 0.03], "stop_loss_pcts": [0.03],
} }
if args.config: if args.config:
with open(args.config, 'r') as f: with open(args.config, 'r') as f:
config = json.load(f) config = json.load(f)
else: elif not debug:
print("No config file provided. Please enter the following values (press Enter to use default):") print("No config file provided. Please enter the following values (press Enter to use default):")
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date'] start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
@@ -203,8 +275,9 @@ if __name__ == "__main__":
'timeframes': timeframes, 'timeframes': timeframes,
'stop_loss_pcts': stop_loss_pcts, 'stop_loss_pcts': stop_loss_pcts,
} }
else:
config = default_config
# Use config values
start_date = config['start_date'] start_date = config['start_date']
stop_date = config['stop_date'] stop_date = config['stop_date']
initial_usd = config['initial_usd'] initial_usd = config['initial_usd']

View File

@@ -4,9 +4,7 @@ import matplotlib.pyplot as plt
import pandas as pd import pandas as pd
from cycles.utils.storage import Storage from cycles.utils.storage import Storage
from cycles.utils.data_utils import aggregate_to_daily from cycles.Analysis.strategies import Strategy
from cycles.Analysis.boillinger_band import BollingerBands
from cycles.Analysis.rsi import RSI
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -17,115 +15,145 @@ logging.basicConfig(
] ]
) )
config_minute = { config = {
"start_date": "2022-01-01", "start_date": "2023-01-01",
"stop_date": "2023-01-01", "stop_date": "2024-01-01",
"data_file": "btcusd_1-min_data.csv" "data_file": "btcusd_1-min_data.csv"
} }
config_day = { config_strategy = {
"start_date": "2022-01-01", "bb_width": 0.05,
"stop_date": "2023-01-01", "bb_period": 20,
"data_file": "btcusd_1-day_data.csv" "rsi_period": 14,
"trending": {
"rsi_threshold": [30, 70],
"bb_std_dev_multiplier": 2.5,
},
"sideways": {
"rsi_threshold": [40, 60],
"bb_std_dev_multiplier": 1.8,
},
"strategy_name": "MarketRegimeStrategy", # CryptoTradingStrategy
"SqueezeStrategy": True
} }
IS_DAY = True IS_DAY = False
def no_strategy(data_bb, data_with_rsi):
buy_condition = pd.Series([False] * len(data_bb), index=data_bb.index)
sell_condition = pd.Series([False] * len(data_bb), index=data_bb.index)
return buy_condition, sell_condition
def strategy_1(data_bb, data_with_rsi):
# Long trade: price move below lower Bollinger band and RSI go below 25
buy_condition = (data_bb['close'] < data_bb['LowerBand']) & (data_bb['RSI'] < 25)
# Short only: price move above top Bollinger band and RSI goes over 75
sell_condition = (data_bb['close'] > data_bb['UpperBand']) & (data_bb['RSI'] > 75)
return buy_condition, sell_condition
if __name__ == "__main__": if __name__ == "__main__":
# Load data
storage = Storage(logging=logging) storage = Storage(logging=logging)
if IS_DAY:
config = config_day
else:
config = config_minute
data = storage.load_data(config["data_file"], config["start_date"], config["stop_date"]) data = storage.load_data(config["data_file"], config["start_date"], config["stop_date"])
if not IS_DAY: # Run strategy
data_daily = aggregate_to_daily(data) strategy = Strategy(config=config_strategy, logging=logging)
storage.save_data(data, "btcusd_1-day_data.csv") processed_data = strategy.run(data.copy(), config_strategy["strategy_name"])
df_to_plot = data_daily
else:
df_to_plot = data
bb = BollingerBands(period=30, std_dev_multiplier=2.0) # Get buy and sell signals
data_bb = bb.calculate(df_to_plot.copy()) buy_condition = processed_data.get('BuySignal', pd.Series(False, index=processed_data.index)).astype(bool)
sell_condition = processed_data.get('SellSignal', pd.Series(False, index=processed_data.index)).astype(bool)
rsi_calculator = RSI(period=13) buy_signals = processed_data[buy_condition]
data_with_rsi = rsi_calculator.calculate(df_to_plot.copy(), price_column='close') sell_signals = processed_data[sell_condition]
# Combine BB and RSI data into a single DataFrame for signal generation # Plot the data with seaborn library
# Ensure indices are aligned; they should be as both are from df_to_plot.copy() if processed_data is not None and not processed_data.empty:
if 'RSI' in data_with_rsi.columns:
data_bb['RSI'] = data_with_rsi['RSI']
else:
# If RSI wasn't calculated (e.g., not enough data), create a dummy column with NaNs
# to prevent errors later, though signals won't be generated.
data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float)
logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.")
strategy = 1
if strategy == 1:
buy_condition, sell_condition = strategy_1(data_bb, data_with_rsi)
else:
buy_condition, sell_condition = no_strategy(data_bb, data_with_rsi)
buy_signals = data_bb[buy_condition]
sell_signals = data_bb[sell_condition]
# plot the data with seaborn library
if df_to_plot is not None and not df_to_plot.empty:
# Create a figure with two subplots, sharing the x-axis # Create a figure with two subplots, sharing the x-axis
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 8), sharex=True) fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(16, 8), sharex=True)
strategy_name = config_strategy["strategy_name"]
# Plot 1: Close Price and Strategy-Specific Bands/Levels
sns.lineplot(x=processed_data.index, y='close', data=processed_data, label='Close Price', ax=ax1)
# Use standardized column names for bands
if 'UpperBand' in processed_data.columns and 'LowerBand' in processed_data.columns:
# Instead of lines, shade the area between upper and lower bands
ax1.fill_between(processed_data.index,
processed_data['LowerBand'],
processed_data['UpperBand'],
alpha=0.1, color='blue', label='Bollinger Bands')
else:
logging.warning(f"{strategy_name}: UpperBand or LowerBand not found for plotting.")
# Add strategy-specific extra indicators if available
if strategy_name == "CryptoTradingStrategy":
if 'StopLoss' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='StopLoss', data=processed_data, label='Stop Loss', ax=ax1, linestyle='--', color='orange')
if 'TakeProfit' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='TakeProfit', data=processed_data, label='Take Profit', ax=ax1, linestyle='--', color='purple')
# Plot 1: Close Price and Bollinger Bands
sns.lineplot(x=data_bb.index, y='close', data=data_bb, label='Close Price', ax=ax1)
sns.lineplot(x=data_bb.index, y='UpperBand', data=data_bb, label='Upper Band (BB)', ax=ax1)
sns.lineplot(x=data_bb.index, y='LowerBand', data=data_bb, label='Lower Band (BB)', ax=ax1)
# Plot Buy/Sell signals on Price chart # Plot Buy/Sell signals on Price chart
if not buy_signals.empty: if not buy_signals.empty:
ax1.scatter(buy_signals.index, buy_signals['close'], color='green', marker='o', s=20, label='Buy Signal', zorder=5) ax1.scatter(buy_signals.index, buy_signals['close'], color='green', marker='o', s=20, label='Buy Signal', zorder=5)
if not sell_signals.empty: if not sell_signals.empty:
ax1.scatter(sell_signals.index, sell_signals['close'], color='red', marker='o', s=20, label='Sell Signal', zorder=5) ax1.scatter(sell_signals.index, sell_signals['close'], color='red', marker='o', s=20, label='Sell Signal', zorder=5)
ax1.set_title('Price and Bollinger Bands with Signals') ax1.set_title(f'Price and Signals ({strategy_name})')
ax1.set_ylabel('Price') ax1.set_ylabel('Price')
ax1.legend() ax1.legend()
ax1.grid(True) ax1.grid(True)
# Plot 2: RSI # Plot 2: RSI and Strategy-Specific Thresholds
if 'RSI' in data_bb.columns: # Check data_bb now as it should contain RSI if 'RSI' in processed_data.columns:
sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (14)', ax=ax2, color='purple') sns.lineplot(x=processed_data.index, y='RSI', data=processed_data, label=f'RSI (' + str(config_strategy.get("rsi_period", 14)) + ')', ax=ax2, color='purple')
ax2.axhline(75, color='red', linestyle='--', linewidth=0.8, label='Overbought (75)') if strategy_name == "MarketRegimeStrategy":
ax2.axhline(25, color='green', linestyle='--', linewidth=0.8, label='Oversold (25)') # Get threshold values
upper_threshold = config_strategy.get("trending", {}).get("rsi_threshold", [30,70])[1]
lower_threshold = config_strategy.get("trending", {}).get("rsi_threshold", [30,70])[0]
# Shade overbought area (upper)
ax2.fill_between(processed_data.index, upper_threshold, 100,
alpha=0.1, color='red', label=f'Overbought (>{upper_threshold})')
# Shade oversold area (lower)
ax2.fill_between(processed_data.index, 0, lower_threshold,
alpha=0.1, color='green', label=f'Oversold (<{lower_threshold})')
elif strategy_name == "CryptoTradingStrategy":
# Shade overbought area (upper)
ax2.fill_between(processed_data.index, 65, 100,
alpha=0.1, color='red', label='Overbought (>65)')
# Shade oversold area (lower)
ax2.fill_between(processed_data.index, 0, 35,
alpha=0.1, color='green', label='Oversold (<35)')
# Plot Buy/Sell signals on RSI chart # Plot Buy/Sell signals on RSI chart
if not buy_signals.empty: if not buy_signals.empty and 'RSI' in buy_signals.columns:
ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5) ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5)
if not sell_signals.empty: if not sell_signals.empty and 'RSI' in sell_signals.columns:
ax2.scatter(sell_signals.index, sell_signals['RSI'], color='red', marker='o', s=20, label='Sell Signal (RSI)', zorder=5) ax2.scatter(sell_signals.index, sell_signals['RSI'], color='red', marker='o', s=20, label='Sell Signal (RSI)', zorder=5)
ax2.set_title('Relative Strength Index (RSI) with Signals') ax2.set_title('Relative Strength Index (RSI) with Signals')
ax2.set_ylabel('RSI Value') ax2.set_ylabel('RSI Value')
ax2.set_ylim(0, 100) # RSI is typically bounded between 0 and 100 ax2.set_ylim(0, 100)
ax2.legend() ax2.legend()
ax2.grid(True) ax2.grid(True)
else: else:
logging.info("RSI data not available for plotting.") logging.info("RSI data not available for plotting.")
plt.xlabel('Date') # Common X-axis label # Plot 3: Strategy-Specific Indicators
fig.tight_layout() # Adjust layout to prevent overlapping titles/labels ax3.clear() # Clear previous plot content if any
if 'BBWidth' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='BBWidth', data=processed_data, label='BB Width', ax=ax3)
if strategy_name == "MarketRegimeStrategy":
if 'MarketRegime' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='MarketRegime', data=processed_data, label='Market Regime (Sideways: 1, Trending: 0)', ax=ax3)
ax3.set_title('Bollinger Bands Width & Market Regime')
ax3.set_ylabel('Value')
elif strategy_name == "CryptoTradingStrategy":
if 'VolumeMA' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='VolumeMA', data=processed_data, label='Volume MA', ax=ax3)
if 'volume' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='volume', data=processed_data, label='Volume', ax=ax3, alpha=0.5)
ax3.set_title('Volume Analysis')
ax3.set_ylabel('Volume')
ax3.legend()
ax3.grid(True)
plt.xlabel('Date')
fig.tight_layout()
plt.show() plt.show()
else: else:
logging.info("No data to plot.") logging.info("No data to plot.")