- Removed unused configuration for daily data and consolidated minute configuration into a single config dictionary. - Updated plotting logic to dynamically handle different strategies, ensuring appropriate bands and signals are displayed based on the selected strategy. - Improved error handling and logging for missing data in plots. - Enhanced the Bollinger Bands and RSI classes to support adaptive parameters based on market regimes, improving flexibility in strategy execution. - Added new CryptoTradingStrategy with multi-timeframe analysis and volume confirmation for better trading signal accuracy. - Updated documentation to reflect changes in strategy implementations and configuration requirements.
311 lines
15 KiB
Python
311 lines
15 KiB
Python
import pandas as pd
|
||
import numpy as np
|
||
|
||
from cycles.Analysis.boillinger_band import BollingerBands
|
||
from cycles.Analysis.rsi import RSI
|
||
from cycles.utils.data_utils import aggregate_to_daily, aggregate_to_hourly, aggregate_to_minutes
|
||
|
||
|
||
class Strategy:
|
||
|
||
def __init__(self, config = None, logging = None):
|
||
if config is None:
|
||
raise ValueError("Config must be provided.")
|
||
self.config = config
|
||
self.logging = logging
|
||
|
||
def run(self, data, strategy_name):
|
||
if strategy_name == "MarketRegimeStrategy":
|
||
return self.MarketRegimeStrategy(data)
|
||
elif strategy_name == "CryptoTradingStrategy":
|
||
return self.CryptoTradingStrategy(data)
|
||
else:
|
||
if self.logging is not None:
|
||
self.logging.warning(f"Strategy {strategy_name} not found. Using no_strategy instead.")
|
||
return self.no_strategy(data)
|
||
|
||
def no_strategy(self, data):
|
||
"""No strategy: returns False for both buy and sell conditions"""
|
||
buy_condition = pd.Series([False] * len(data), index=data.index)
|
||
sell_condition = pd.Series([False] * len(data), index=data.index)
|
||
return buy_condition, sell_condition
|
||
|
||
def rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5):
|
||
"""Calculate RSI Bollinger Bands for confirmation
|
||
|
||
Args:
|
||
rsi (Series): RSI values
|
||
window (int): Rolling window for SMA
|
||
std_mult (float): Standard deviation multiplier
|
||
|
||
Returns:
|
||
tuple: (oversold condition, overbought condition)
|
||
"""
|
||
valid_rsi = ~rsi.isna()
|
||
if not valid_rsi.any():
|
||
# Return empty Series if no valid RSI data
|
||
return pd.Series(False, index=rsi.index), pd.Series(False, index=rsi.index)
|
||
|
||
rsi_sma = rsi.rolling(window).mean()
|
||
rsi_std = rsi.rolling(window).std()
|
||
upper_rsi_band = rsi_sma + std_mult * rsi_std
|
||
lower_rsi_band = rsi_sma - std_mult * rsi_std
|
||
|
||
return (rsi < lower_rsi_band), (rsi > upper_rsi_band)
|
||
|
||
def MarketRegimeStrategy(self, data):
|
||
"""Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
|
||
with adaptive Bollinger Bands
|
||
|
||
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection
|
||
to adapt to Bitcoin's unique market conditions.
|
||
|
||
Entry Conditions:
|
||
- Trending Market (Breakout Mode):
|
||
Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
|
||
Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
|
||
- Sideways Market (Mean Reversion):
|
||
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
|
||
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
|
||
|
||
Enhanced with RSI Bollinger Squeeze for signal confirmation when enabled.
|
||
|
||
Returns:
|
||
DataFrame: A unified DataFrame containing original data, BB, RSI, and signals.
|
||
"""
|
||
|
||
data = aggregate_to_daily(data)
|
||
|
||
# Calculate Bollinger Bands
|
||
bb_calculator = BollingerBands(config=self.config)
|
||
# Ensure we are working with a copy to avoid modifying the original DataFrame upstream
|
||
data_bb = bb_calculator.calculate(data.copy())
|
||
|
||
# Calculate RSI
|
||
rsi_calculator = RSI(config=self.config)
|
||
# Use the original data's copy for RSI calculation as well, to maintain index integrity
|
||
data_with_rsi = rsi_calculator.calculate(data.copy(), price_column='close')
|
||
|
||
# Combine BB and RSI data into a single DataFrame for signal generation
|
||
# Ensure indices are aligned; they should be as both are from data.copy()
|
||
if 'RSI' in data_with_rsi.columns:
|
||
data_bb['RSI'] = data_with_rsi['RSI']
|
||
else:
|
||
# If RSI wasn't calculated (e.g., not enough data), create a dummy column with NaNs
|
||
# to prevent errors later, though signals won't be generated.
|
||
data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float)
|
||
if self.logging:
|
||
self.logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.")
|
||
|
||
# Initialize conditions as all False
|
||
buy_condition = pd.Series(False, index=data_bb.index)
|
||
sell_condition = pd.Series(False, index=data_bb.index)
|
||
|
||
# Create masks for different market regimes
|
||
# MarketRegime is expected to be in data_bb from BollingerBands calculation
|
||
sideways_mask = data_bb['MarketRegime'] > 0
|
||
trending_mask = data_bb['MarketRegime'] <= 0
|
||
valid_data_mask = ~data_bb['MarketRegime'].isna() # Handle potential NaN values
|
||
|
||
# Calculate volume spike (≥1.5× 20D Avg)
|
||
# 'volume' column should be present in the input 'data', and thus in 'data_bb'
|
||
if 'volume' in data_bb.columns:
|
||
volume_20d_avg = data_bb['volume'].rolling(window=20).mean()
|
||
volume_spike = data_bb['volume'] >= 1.5 * volume_20d_avg
|
||
|
||
# Additional volume contraction filter for sideways markets
|
||
volume_30d_avg = data_bb['volume'].rolling(window=30).mean()
|
||
volume_contraction = data_bb['volume'] < 0.7 * volume_30d_avg
|
||
else:
|
||
# If volume data is not available, assume no volume spike
|
||
volume_spike = pd.Series(False, index=data_bb.index)
|
||
volume_contraction = pd.Series(False, index=data_bb.index)
|
||
if self.logging is not None:
|
||
self.logging.warning("Volume data not available. Volume conditions will not be triggered.")
|
||
|
||
# Calculate RSI Bollinger Squeeze confirmation
|
||
# RSI column is now part of data_bb
|
||
if 'RSI' in data_bb.columns and not data_bb['RSI'].isna().all():
|
||
oversold_rsi, overbought_rsi = self.rsi_bollinger_confirmation(data_bb['RSI'])
|
||
else:
|
||
oversold_rsi = pd.Series(False, index=data_bb.index)
|
||
overbought_rsi = pd.Series(False, index=data_bb.index)
|
||
if self.logging is not None and ('RSI' not in data_bb.columns or data_bb['RSI'].isna().all()):
|
||
self.logging.warning("RSI data not available or all NaN. RSI Bollinger Squeeze will not be triggered.")
|
||
|
||
# Calculate conditions for sideways market (Mean Reversion)
|
||
if sideways_mask.any():
|
||
sideways_buy = (data_bb['close'] <= data_bb['LowerBand']) & (data_bb['RSI'] <= 40)
|
||
sideways_sell = (data_bb['close'] >= data_bb['UpperBand']) & (data_bb['RSI'] >= 60)
|
||
|
||
# Add enhanced confirmation for sideways markets
|
||
if self.config.get("SqueezeStrategy", False):
|
||
sideways_buy = sideways_buy & oversold_rsi & volume_contraction
|
||
sideways_sell = sideways_sell & overbought_rsi & volume_contraction
|
||
|
||
# Apply only where market is sideways and data is valid
|
||
buy_condition = buy_condition | (sideways_buy & sideways_mask & valid_data_mask)
|
||
sell_condition = sell_condition | (sideways_sell & sideways_mask & valid_data_mask)
|
||
|
||
# Calculate conditions for trending market (Breakout Mode)
|
||
if trending_mask.any():
|
||
trending_buy = (data_bb['close'] < data_bb['LowerBand']) & (data_bb['RSI'] < 50) & volume_spike
|
||
trending_sell = (data_bb['close'] > data_bb['UpperBand']) & (data_bb['RSI'] > 50) & volume_spike
|
||
|
||
# Add enhanced confirmation for trending markets
|
||
if self.config.get("SqueezeStrategy", False):
|
||
trending_buy = trending_buy & oversold_rsi
|
||
trending_sell = trending_sell & overbought_rsi
|
||
|
||
# Apply only where market is trending and data is valid
|
||
buy_condition = buy_condition | (trending_buy & trending_mask & valid_data_mask)
|
||
sell_condition = sell_condition | (trending_sell & trending_mask & valid_data_mask)
|
||
|
||
# Add buy/sell conditions as columns to the DataFrame
|
||
data_bb['BuySignal'] = buy_condition
|
||
data_bb['SellSignal'] = sell_condition
|
||
|
||
return data_bb
|
||
|
||
# Helper functions for CryptoTradingStrategy
|
||
def _volume_confirmation_crypto(self, current_volume, volume_ma):
|
||
"""Check volume surge against moving average for crypto strategy"""
|
||
if pd.isna(current_volume) or pd.isna(volume_ma) or volume_ma == 0:
|
||
return False
|
||
return current_volume > 1.5 * volume_ma
|
||
|
||
def _multi_timeframe_signal_crypto(self, current_price, rsi_value,
|
||
lower_band_15m, lower_band_1h,
|
||
upper_band_15m, upper_band_1h):
|
||
"""Generate signals with multi-timeframe confirmation for crypto strategy"""
|
||
# Ensure all inputs are not NaN before making comparisons
|
||
if any(pd.isna(val) for val in [current_price, rsi_value, lower_band_15m, lower_band_1h, upper_band_15m, upper_band_1h]):
|
||
return False, False
|
||
|
||
buy_signal = (current_price <= lower_band_15m and
|
||
current_price <= lower_band_1h and
|
||
rsi_value < 35)
|
||
|
||
sell_signal = (current_price >= upper_band_15m and
|
||
current_price >= upper_band_1h and
|
||
rsi_value > 65)
|
||
|
||
return buy_signal, sell_signal
|
||
|
||
def CryptoTradingStrategy(self, data):
|
||
"""Core trading algorithm with risk management
|
||
- Multi-Timeframe Confirmation: Combines 15-minute and 1-hour Bollinger Bands
|
||
- Adaptive Volatility Filtering: Uses ATR for dynamic stop-loss/take-profit
|
||
- Volume Spike Detection: Requires 1.5× average volume for confirmation
|
||
- EMA-Smoothed RSI: Reduces false signals in choppy markets
|
||
- Regime-Adaptive Parameters:
|
||
- Trending: 2σ bands, RSI 35/65 thresholds
|
||
- Sideways: 1.8σ bands, RSI 40/60 thresholds
|
||
- Strategy Logic:
|
||
- Long Entry: Price ≤ both 15m & 1h lower bands + RSI < 35 + Volume surge
|
||
- Short Entry: Price ≥ both 15m & 1h upper bands + RSI > 65 + Volume surge
|
||
- Exit: 2:1 risk-reward ratio with ATR-based stops
|
||
"""
|
||
if data.empty or 'close' not in data.columns or 'volume' not in data.columns:
|
||
if self.logging:
|
||
self.logging.warning("CryptoTradingStrategy: Input data is empty or missing 'close'/'volume' columns.")
|
||
return pd.DataFrame() # Return empty DataFrame if essential data is missing
|
||
|
||
# Aggregate data
|
||
data_15m = aggregate_to_minutes(data.copy(), 15)
|
||
data_1h = aggregate_to_hourly(data.copy(), 1)
|
||
|
||
if data_15m.empty or data_1h.empty:
|
||
if self.logging:
|
||
self.logging.warning("CryptoTradingStrategy: Not enough data for 15m or 1h aggregation.")
|
||
return pd.DataFrame() # Return original data if aggregation fails
|
||
|
||
# --- Calculate indicators for 15m timeframe ---
|
||
# Ensure 'close' and 'volume' exist before trying to access them
|
||
if 'close' not in data_15m.columns or 'volume' not in data_15m.columns:
|
||
if self.logging: self.logging.warning("CryptoTradingStrategy: 15m data missing close or volume.")
|
||
return data # Or an empty DF
|
||
|
||
price_data_15m = data_15m['close']
|
||
volume_data_15m = data_15m['volume']
|
||
|
||
upper_15m, sma_15m, lower_15m = BollingerBands.calculate_custom_bands(price_data_15m, window=20, num_std=2, min_periods=1)
|
||
# Use the static method from RSI class
|
||
rsi_15m = RSI.calculate_custom_rsi(price_data_15m, window=14, smoothing='EMA')
|
||
volume_ma_15m = volume_data_15m.rolling(window=20, min_periods=1).mean()
|
||
|
||
# Add 15m indicators to data_15m DataFrame
|
||
data_15m['UpperBand_15m'] = upper_15m
|
||
data_15m['SMA_15m'] = sma_15m
|
||
data_15m['LowerBand_15m'] = lower_15m
|
||
data_15m['RSI_15m'] = rsi_15m
|
||
data_15m['VolumeMA_15m'] = volume_ma_15m
|
||
|
||
# --- Calculate indicators for 1h timeframe ---
|
||
if 'close' not in data_1h.columns:
|
||
if self.logging: self.logging.warning("CryptoTradingStrategy: 1h data missing close.")
|
||
return data_15m # Return 15m data as 1h failed
|
||
|
||
price_data_1h = data_1h['close']
|
||
# Use the static method from BollingerBands class, setting min_periods to 1 explicitly
|
||
upper_1h, _, lower_1h = BollingerBands.calculate_custom_bands(price_data_1h, window=50, num_std=1.8, min_periods=1)
|
||
|
||
# Add 1h indicators to a temporary DataFrame to be merged
|
||
df_1h_indicators = pd.DataFrame(index=data_1h.index)
|
||
df_1h_indicators['UpperBand_1h'] = upper_1h
|
||
df_1h_indicators['LowerBand_1h'] = lower_1h
|
||
|
||
# Merge 1h indicators into 15m DataFrame
|
||
# Use reindex and ffill to propagate 1h values to 15m intervals
|
||
data_15m = pd.merge(data_15m, df_1h_indicators, left_index=True, right_index=True, how='left')
|
||
data_15m['UpperBand_1h'] = data_15m['UpperBand_1h'].ffill()
|
||
data_15m['LowerBand_1h'] = data_15m['LowerBand_1h'].ffill()
|
||
|
||
# --- Generate Signals ---
|
||
buy_signals = pd.Series(False, index=data_15m.index)
|
||
sell_signals = pd.Series(False, index=data_15m.index)
|
||
stop_loss_levels = pd.Series(np.nan, index=data_15m.index)
|
||
take_profit_levels = pd.Series(np.nan, index=data_15m.index)
|
||
|
||
# ATR calculation needs a rolling window, apply to 'high', 'low', 'close' if available
|
||
# Using a simplified ATR for now: std of close prices over the last 4 15-min periods (1 hour)
|
||
if 'close' in data_15m.columns:
|
||
atr_series = price_data_15m.rolling(window=4, min_periods=1).std()
|
||
else:
|
||
atr_series = pd.Series(0, index=data_15m.index) # No ATR if close is missing
|
||
|
||
for i in range(len(data_15m)):
|
||
if i == 0: continue # Skip first row for volume_ma_15m[i-1]
|
||
|
||
current_price = data_15m['close'].iloc[i]
|
||
current_volume = data_15m['volume'].iloc[i]
|
||
rsi_val = data_15m['RSI_15m'].iloc[i]
|
||
lb_15m = data_15m['LowerBand_15m'].iloc[i]
|
||
ub_15m = data_15m['UpperBand_15m'].iloc[i]
|
||
lb_1h = data_15m['LowerBand_1h'].iloc[i]
|
||
ub_1h = data_15m['UpperBand_1h'].iloc[i]
|
||
vol_ma = data_15m['VolumeMA_15m'].iloc[i-1] # Use previous period's MA
|
||
atr = atr_series.iloc[i]
|
||
|
||
vol_confirm = self._volume_confirmation_crypto(current_volume, vol_ma)
|
||
buy_signal, sell_signal = self._multi_timeframe_signal_crypto(
|
||
current_price, rsi_val, lb_15m, lb_1h, ub_15m, ub_1h
|
||
)
|
||
|
||
if buy_signal and vol_confirm:
|
||
buy_signals.iloc[i] = True
|
||
if not pd.isna(atr) and atr > 0:
|
||
stop_loss_levels.iloc[i] = current_price - 2 * atr
|
||
take_profit_levels.iloc[i] = current_price + 4 * atr
|
||
elif sell_signal and vol_confirm:
|
||
sell_signals.iloc[i] = True
|
||
if not pd.isna(atr) and atr > 0:
|
||
stop_loss_levels.iloc[i] = current_price + 2 * atr
|
||
take_profit_levels.iloc[i] = current_price - 4 * atr
|
||
|
||
data_15m['BuySignal'] = buy_signals
|
||
data_15m['SellSignal'] = sell_signals
|
||
data_15m['StopLoss'] = stop_loss_levels
|
||
data_15m['TakeProfit'] = take_profit_levels
|
||
|
||
return data_15m |