12 Commits

Author SHA1 Message Date
Simon Moisy
e5c2988d71 Refactor Backtest class and update strategy functions for improved modularity
- Refactored the Backtest class to encapsulate state and behavior, enhancing clarity and maintainability.
- Updated strategy functions to accept the Backtest instance, streamlining data access and manipulation.
- Introduced a new plotting method in BacktestCharts for visualizing close prices with trend indicators.
- Improved handling of meta_trend data to ensure proper visualization and trend representation.
- Adjusted main execution logic to support the new Backtest structure and enhanced debugging capabilities.
2025-05-22 20:02:14 +08:00
Ajasra
00873d593f Enhance strategy output standardization and improve plotting logic
- Introduced a new method to standardize output column names across different strategies, ensuring consistency in data handling and plotting.
- Updated plotting logic in test_bbrsi.py to utilize standardized column names, improving clarity and maintainability.
- Enhanced error handling for missing data in plots and adjusted visual elements for better representation of trading signals.
- Improved the overall structure of strategy implementations to support additional indicators and metadata for better analysis.
2025-05-22 18:16:23 +08:00
Ajasra
3a9dec543c Refactor test_bbrsi.py and enhance strategy implementations
- Removed unused configuration for daily data and consolidated minute configuration into a single config dictionary.
- Updated plotting logic to dynamically handle different strategies, ensuring appropriate bands and signals are displayed based on the selected strategy.
- Improved error handling and logging for missing data in plots.
- Enhanced the Bollinger Bands and RSI classes to support adaptive parameters based on market regimes, improving flexibility in strategy execution.
- Added new CryptoTradingStrategy with multi-timeframe analysis and volume confirmation for better trading signal accuracy.
- Updated documentation to reflect changes in strategy implementations and configuration requirements.
2025-05-22 17:57:04 +08:00
Ajasra
934c807246 fixed depricated parameters 2025-05-22 17:24:16 +08:00
Ajasra
8e220b564c Merge branch 'main' of https://dep.sokaris.link/Simon/Cycles 2025-05-22 17:15:55 +08:00
Ajasra
1107346594 refactor to move inside strategy calculations 2025-05-22 17:15:51 +08:00
Simon Moisy
45c853efab Moved supertrend.py to Analysis subfolder 2025-05-22 17:09:29 +08:00
Simon Moisy
268bc33bbf Merge branch 'main' of ssh://dep.sokaris.link:2222/Simon/Cycles 2025-05-22 17:05:39 +08:00
Simon Moisy
e286dd881a - Refactored the Backtest class for strategy modularity
- Updated entry and exit strategy functions
2025-05-22 17:05:19 +08:00
Ajasra
736b278ee2 aggregate for specific condition 2025-05-22 16:53:23 +08:00
Ajasra
a924328c90 Implement Market Regime Strategy and refactor Bollinger Bands and RSI classes
- Introduced a new Strategy class to encapsulate trading strategies, including the Market Regime Strategy that adapts to different market conditions.
- Refactored BollingerBands and RSI classes to accept configuration parameters for improved flexibility and maintainability.
- Updated test_bbrsi.py to utilize the new strategy implementation and adjusted date ranges for testing.
- Enhanced documentation to include details about the new Strategy class and its components.
2025-05-22 16:44:59 +08:00
Simon Moisy
f4873c56ff minor fixes 2025-05-21 17:23:35 +08:00
13 changed files with 1552 additions and 688 deletions

View File

@@ -1,26 +1,29 @@
import pandas as pd
import numpy as np
class BollingerBands:
"""
Calculates Bollinger Bands for given financial data.
"""
def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0):
def __init__(self, config):
"""
Initializes the BollingerBands calculator.
Args:
period (int): The period for the moving average and standard deviation.
std_dev_multiplier (float): The number of standard deviations for the upper and lower bands.
bb_width (float): The width of the Bollinger Bands.
"""
if period <= 0:
if config['bb_period'] <= 0:
raise ValueError("Period must be a positive integer.")
if std_dev_multiplier <= 0:
if config['trending']['bb_std_dev_multiplier'] <= 0 or config['sideways']['bb_std_dev_multiplier'] <= 0:
raise ValueError("Standard deviation multiplier must be positive.")
if config['bb_width'] <= 0:
raise ValueError("BB width must be positive.")
self.period = period
self.std_dev_multiplier = std_dev_multiplier
self.config = config
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close', squeeze = False) -> pd.DataFrame:
"""
Calculates Bollinger Bands and adds them to the DataFrame.
@@ -37,14 +40,105 @@ class BollingerBands:
if price_column not in data_df.columns:
raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
# Work on a copy to avoid modifying the original DataFrame passed to the function
data_df = data_df.copy()
if not squeeze:
period = self.config['bb_period']
bb_width_threshold = self.config['bb_width']
trending_std_multiplier = self.config['trending']['bb_std_dev_multiplier']
sideways_std_multiplier = self.config['sideways']['bb_std_dev_multiplier']
# Calculate SMA
data_df['SMA'] = data_df[price_column].rolling(window=self.period).mean()
data_df['SMA'] = data_df[price_column].rolling(window=period).mean()
# Calculate Standard Deviation
std_dev = data_df[price_column].rolling(window=self.period).std()
std_dev = data_df[price_column].rolling(window=period).std()
# Calculate Upper and Lower Bands
data_df['UpperBand'] = data_df['SMA'] + (self.std_dev_multiplier * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (self.std_dev_multiplier * std_dev)
# Calculate reference Upper and Lower Bands for BBWidth calculation (e.g., using 2.0 std dev)
# This ensures BBWidth is calculated based on a consistent band definition before applying adaptive multipliers.
ref_upper_band = data_df['SMA'] + (2.0 * std_dev)
ref_lower_band = data_df['SMA'] - (2.0 * std_dev)
# Calculate the width of the Bollinger Bands
# Avoid division by zero or NaN if SMA is zero or NaN by replacing with np.nan
data_df['BBWidth'] = np.where(data_df['SMA'] != 0, (ref_upper_band - ref_lower_band) / data_df['SMA'], np.nan)
# Calculate the market regime (1 = sideways, 0 = trending)
# Handle NaN in BBWidth: if BBWidth is NaN, MarketRegime should also be NaN or a default (e.g. trending)
data_df['MarketRegime'] = np.where(data_df['BBWidth'].isna(), np.nan,
(data_df['BBWidth'] < bb_width_threshold).astype(float)) # Use float for NaN compatibility
# Determine the std dev multiplier for each row based on its market regime
conditions = [
data_df['MarketRegime'] == 1, # Sideways market
data_df['MarketRegime'] == 0 # Trending market
]
choices = [
sideways_std_multiplier,
trending_std_multiplier
]
# Default multiplier if MarketRegime is NaN (e.g., use trending or a neutral default like 2.0)
# For now, let's use trending_std_multiplier as default if MarketRegime is NaN.
# This can be adjusted based on desired behavior for periods where regime is undetermined.
row_specific_std_multiplier = np.select(conditions, choices, default=trending_std_multiplier)
# Calculate final Upper and Lower Bands using the row-specific multiplier
data_df['UpperBand'] = data_df['SMA'] + (row_specific_std_multiplier * std_dev)
data_df['LowerBand'] = data_df['SMA'] - (row_specific_std_multiplier * std_dev)
else: # squeeze is True
price_series = data_df[price_column]
# Use the static method for the squeeze case with fixed parameters
upper_band, sma, lower_band = self.calculate_custom_bands(
price_series,
window=14,
num_std=1.5,
min_periods=14 # Match typical squeeze behavior where bands appear after full period
)
data_df['SMA'] = sma
data_df['UpperBand'] = upper_band
data_df['LowerBand'] = lower_band
# BBWidth and MarketRegime are not typically calculated/used in a simple squeeze context by this method
# If needed, they could be added, but the current structure implies they are part of the non-squeeze path.
data_df['BBWidth'] = np.nan
data_df['MarketRegime'] = np.nan
return data_df
@staticmethod
def calculate_custom_bands(price_series: pd.Series, window: int = 20, num_std: float = 2.0, min_periods: int = None) -> tuple[pd.Series, pd.Series, pd.Series]:
"""
Calculates Bollinger Bands with specified window and standard deviation multiplier.
Args:
price_series (pd.Series): Series of prices.
window (int): The period for the moving average and standard deviation.
num_std (float): The number of standard deviations for the upper and lower bands.
min_periods (int, optional): Minimum number of observations in window required to have a value.
Defaults to `window` if None.
Returns:
tuple[pd.Series, pd.Series, pd.Series]: Upper band, SMA, Lower band.
"""
if not isinstance(price_series, pd.Series):
raise TypeError("price_series must be a pandas Series.")
if not isinstance(window, int) or window <= 0:
raise ValueError("window must be a positive integer.")
if not isinstance(num_std, (int, float)) or num_std <= 0:
raise ValueError("num_std must be a positive number.")
if min_periods is not None and (not isinstance(min_periods, int) or min_periods <= 0):
raise ValueError("min_periods must be a positive integer if provided.")
actual_min_periods = window if min_periods is None else min_periods
sma = price_series.rolling(window=window, min_periods=actual_min_periods).mean()
std = price_series.rolling(window=window, min_periods=actual_min_periods).std()
# Replace NaN std with 0 to avoid issues if sma is present but std is not (e.g. constant price in window)
std = std.fillna(0)
upper_band = sma + (std * num_std)
lower_band = sma - (std * num_std)
return upper_band, sma, lower_band

View File

@@ -5,7 +5,7 @@ class RSI:
"""
A class to calculate the Relative Strength Index (RSI).
"""
def __init__(self, period: int = 14):
def __init__(self, config):
"""
Initializes the RSI calculator.
@@ -13,13 +13,13 @@ class RSI:
period (int): The period for RSI calculation. Default is 14.
Must be a positive integer.
"""
if not isinstance(period, int) or period <= 0:
if not isinstance(config['rsi_period'], int) or config['rsi_period'] <= 0:
raise ValueError("Period must be a positive integer.")
self.period = period
self.period = config['rsi_period']
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
"""
Calculates the RSI and adds it as a column to the input DataFrame.
Calculates the RSI (using Wilder's smoothing) and adds it as a column to the input DataFrame.
Args:
data_df (pd.DataFrame): DataFrame with historical price data.
@@ -35,75 +35,79 @@ class RSI:
if price_column not in data_df.columns:
raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
if len(data_df) < self.period:
print(f"Warning: Data length ({len(data_df)}) is less than RSI period ({self.period}). RSI will not be calculated.")
return data_df.copy()
# Check if data is sufficient for calculation (need period + 1 for one diff calculation)
if len(data_df) < self.period + 1:
print(f"Warning: Data length ({len(data_df)}) is less than RSI period ({self.period}) + 1. RSI will not be calculated meaningfully.")
df_copy = data_df.copy()
df_copy['RSI'] = np.nan # Add an RSI column with NaNs
return df_copy
df = data_df.copy()
delta = df[price_column].diff(1)
df = data_df.copy() # Work on a copy
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0) # Ensure loss is positive
price_series = df[price_column]
# Calculate initial average gain and loss (SMA)
avg_gain = gain.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period]
avg_loss = loss.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period]
# Call the static custom RSI calculator, defaulting to EMA for Wilder's smoothing
rsi_series = self.calculate_custom_rsi(price_series, window=self.period, smoothing='EMA')
# Calculate subsequent average gains and losses (EMA-like)
# Pre-allocate lists for gains and losses to avoid repeated appending to Series
gains = [0.0] * len(df)
losses = [0.0] * len(df)
if not avg_gain.empty:
gains[self.period -1] = avg_gain.iloc[0]
if not avg_loss.empty:
losses[self.period -1] = avg_loss.iloc[0]
for i in range(self.period, len(df)):
gains[i] = ((gains[i-1] * (self.period - 1)) + gain.iloc[i]) / self.period
losses[i] = ((losses[i-1] * (self.period - 1)) + loss.iloc[i]) / self.period
df['avg_gain'] = pd.Series(gains, index=df.index)
df['avg_loss'] = pd.Series(losses, index=df.index)
# Calculate RS
# Handle division by zero: if avg_loss is 0, RS is undefined or infinite.
# If avg_loss is 0 and avg_gain is also 0, RSI is conventionally 50.
# If avg_loss is 0 and avg_gain > 0, RSI is conventionally 100.
rs = df['avg_gain'] / df['avg_loss']
# Calculate RSI
# RSI = 100 - (100 / (1 + RS))
# If avg_loss is 0:
# If avg_gain > 0, RS -> inf, RSI -> 100
# If avg_gain == 0, RS -> NaN (0/0), RSI -> 50 (conventionally, or could be 0 or 100 depending on interpretation)
# We will use a common convention where RSI is 100 if avg_loss is 0 and avg_gain > 0,
# and RSI is 0 if avg_loss is 0 and avg_gain is 0 (or 50, let's use 0 to indicate no strength if both are 0).
# However, to avoid NaN from 0/0, it's better to calculate RSI directly with conditions.
rsi_values = []
for i in range(len(df)):
avg_g = df['avg_gain'].iloc[i]
avg_l = df['avg_loss'].iloc[i]
if i < self.period -1 : # Not enough data for initial SMA
rsi_values.append(np.nan)
continue
if avg_l == 0:
if avg_g == 0:
rsi_values.append(50) # Or 0, or np.nan depending on how you want to treat this. 50 implies neutrality.
else:
rsi_values.append(100) # Max strength
else:
rs_val = avg_g / avg_l
rsi_values.append(100 - (100 / (1 + rs_val)))
df['RSI'] = pd.Series(rsi_values, index=df.index)
# Remove intermediate columns if desired, or keep them for debugging
# df.drop(columns=['avg_gain', 'avg_loss'], inplace=True)
df['RSI'] = rsi_series
return df
@staticmethod
def calculate_custom_rsi(price_series: pd.Series, window: int = 14, smoothing: str = 'SMA') -> pd.Series:
"""
Calculates RSI with specified window and smoothing (SMA or EMA).
Args:
price_series (pd.Series): Series of prices.
window (int): The period for RSI calculation. Must be a positive integer.
smoothing (str): Smoothing method, 'SMA' or 'EMA'. Defaults to 'SMA'.
Returns:
pd.Series: Series containing the RSI values.
"""
if not isinstance(price_series, pd.Series):
raise TypeError("price_series must be a pandas Series.")
if not isinstance(window, int) or window <= 0:
raise ValueError("window must be a positive integer.")
if smoothing not in ['SMA', 'EMA']:
raise ValueError("smoothing must be either 'SMA' or 'EMA'.")
if len(price_series) < window + 1: # Need at least window + 1 prices for one diff
# print(f"Warning: Data length ({len(price_series)}) is less than RSI window ({window}) + 1. RSI will be all NaN.")
return pd.Series(np.nan, index=price_series.index)
delta = price_series.diff()
# The first delta is NaN. For gain/loss calculations, it can be treated as 0.
# However, subsequent rolling/ewm will handle NaNs appropriately if min_periods is set.
gain = delta.where(delta > 0, 0.0)
loss = -delta.where(delta < 0, 0.0) # Ensure loss is positive
# Ensure gain and loss Series have the same index as price_series for rolling/ewm
# This is important if price_series has missing dates/times
gain = gain.reindex(price_series.index, fill_value=0.0)
loss = loss.reindex(price_series.index, fill_value=0.0)
if smoothing == 'EMA':
# adjust=False for Wilder's smoothing used in RSI
avg_gain = gain.ewm(alpha=1/window, adjust=False, min_periods=window).mean()
avg_loss = loss.ewm(alpha=1/window, adjust=False, min_periods=window).mean()
else: # SMA
avg_gain = gain.rolling(window=window, min_periods=window).mean()
avg_loss = loss.rolling(window=window, min_periods=window).mean()
# Handle division by zero for RS calculation
# If avg_loss is 0, RS can be considered infinite (if avg_gain > 0) or undefined (if avg_gain also 0)
rs = avg_gain / avg_loss.replace(0, 1e-9) # Replace 0 with a tiny number to avoid direct division by zero warning
rsi = 100 - (100 / (1 + rs))
# Correct RSI values for edge cases where avg_loss was 0
# If avg_loss is 0 and avg_gain is > 0, RSI is 100.
# If avg_loss is 0 and avg_gain is 0, RSI is 50 (neutral).
rsi[avg_loss == 0] = np.where(avg_gain[avg_loss == 0] > 0, 100, 50)
# Ensure RSI is NaN where avg_gain or avg_loss is NaN (due to min_periods)
rsi[avg_gain.isna() | avg_loss.isna()] = np.nan
return rsi

View File

@@ -0,0 +1,364 @@
import pandas as pd
import numpy as np
from cycles.Analysis.boillinger_band import BollingerBands
from cycles.Analysis.rsi import RSI
from cycles.utils.data_utils import aggregate_to_daily, aggregate_to_hourly, aggregate_to_minutes
class Strategy:
def __init__(self, config = None, logging = None):
if config is None:
raise ValueError("Config must be provided.")
self.config = config
self.logging = logging
def run(self, data, strategy_name):
if strategy_name == "MarketRegimeStrategy":
result = self.MarketRegimeStrategy(data)
return self.standardize_output(result, strategy_name)
elif strategy_name == "CryptoTradingStrategy":
result = self.CryptoTradingStrategy(data)
return self.standardize_output(result, strategy_name)
else:
if self.logging is not None:
self.logging.warning(f"Strategy {strategy_name} not found. Using no_strategy instead.")
return self.no_strategy(data)
def standardize_output(self, data, strategy_name):
"""
Standardize column names across different strategies to ensure consistent plotting and analysis
Args:
data (DataFrame): Strategy output DataFrame
strategy_name (str): Name of the strategy that generated this data
Returns:
DataFrame: Data with standardized column names
"""
if data.empty:
return data
# Create a copy to avoid modifying the original
standardized = data.copy()
# Standardize column names based on strategy
if strategy_name == "MarketRegimeStrategy":
# MarketRegimeStrategy already has standard column names for most fields
# Just ensure all standard columns exist
pass
elif strategy_name == "CryptoTradingStrategy":
# Map strategy-specific column names to standard names
column_mapping = {
'UpperBand_15m': 'UpperBand',
'LowerBand_15m': 'LowerBand',
'SMA_15m': 'SMA',
'RSI_15m': 'RSI',
'VolumeMA_15m': 'VolumeMA',
# Keep StopLoss and TakeProfit as they are
}
# Add standard columns from mapped columns
for old_col, new_col in column_mapping.items():
if old_col in standardized.columns and new_col not in standardized.columns:
standardized[new_col] = standardized[old_col]
# Add additional strategy-specific data as metadata columns
if 'UpperBand_1h' in standardized.columns:
standardized['UpperBand_1h_meta'] = standardized['UpperBand_1h']
if 'LowerBand_1h' in standardized.columns:
standardized['LowerBand_1h_meta'] = standardized['LowerBand_1h']
# Ensure all strategies have BBWidth if possible
if 'BBWidth' not in standardized.columns and 'UpperBand' in standardized.columns and 'LowerBand' in standardized.columns:
standardized['BBWidth'] = (standardized['UpperBand'] - standardized['LowerBand']) / standardized['SMA'] if 'SMA' in standardized.columns else np.nan
return standardized
def no_strategy(self, data):
"""No strategy: returns False for both buy and sell conditions"""
buy_condition = pd.Series([False] * len(data), index=data.index)
sell_condition = pd.Series([False] * len(data), index=data.index)
return buy_condition, sell_condition
def rsi_bollinger_confirmation(self, rsi, window=14, std_mult=1.5):
"""Calculate RSI Bollinger Bands for confirmation
Args:
rsi (Series): RSI values
window (int): Rolling window for SMA
std_mult (float): Standard deviation multiplier
Returns:
tuple: (oversold condition, overbought condition)
"""
valid_rsi = ~rsi.isna()
if not valid_rsi.any():
# Return empty Series if no valid RSI data
return pd.Series(False, index=rsi.index), pd.Series(False, index=rsi.index)
rsi_sma = rsi.rolling(window).mean()
rsi_std = rsi.rolling(window).std()
upper_rsi_band = rsi_sma + std_mult * rsi_std
lower_rsi_band = rsi_sma - std_mult * rsi_std
return (rsi < lower_rsi_band), (rsi > upper_rsi_band)
def MarketRegimeStrategy(self, data):
"""Optimized Bollinger Bands + RSI Strategy for Crypto Trading (Including Sideways Markets)
with adaptive Bollinger Bands
This advanced strategy combines volatility analysis, momentum confirmation, and regime detection
to adapt to Bitcoin's unique market conditions.
Entry Conditions:
- Trending Market (Breakout Mode):
Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike (≥1.5× 20D Avg)
Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike
- Sideways Market (Mean Reversion):
Buy: Price ≤ Lower Band ∧ RSI ≤ 40
Sell: Price ≥ Upper Band ∧ RSI ≥ 60
Enhanced with RSI Bollinger Squeeze for signal confirmation when enabled.
Returns:
DataFrame: A unified DataFrame containing original data, BB, RSI, and signals.
"""
# data = aggregate_to_hourly(data, 4)
data = aggregate_to_daily(data)
# Calculate Bollinger Bands
bb_calculator = BollingerBands(config=self.config)
# Ensure we are working with a copy to avoid modifying the original DataFrame upstream
data_bb = bb_calculator.calculate(data.copy())
# Calculate RSI
rsi_calculator = RSI(config=self.config)
# Use the original data's copy for RSI calculation as well, to maintain index integrity
data_with_rsi = rsi_calculator.calculate(data.copy(), price_column='close')
# Combine BB and RSI data into a single DataFrame for signal generation
# Ensure indices are aligned; they should be as both are from data.copy()
if 'RSI' in data_with_rsi.columns:
data_bb['RSI'] = data_with_rsi['RSI']
else:
# If RSI wasn't calculated (e.g., not enough data), create a dummy column with NaNs
# to prevent errors later, though signals won't be generated.
data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float)
if self.logging:
self.logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.")
# Initialize conditions as all False
buy_condition = pd.Series(False, index=data_bb.index)
sell_condition = pd.Series(False, index=data_bb.index)
# Create masks for different market regimes
# MarketRegime is expected to be in data_bb from BollingerBands calculation
sideways_mask = data_bb['MarketRegime'] > 0
trending_mask = data_bb['MarketRegime'] <= 0
valid_data_mask = ~data_bb['MarketRegime'].isna() # Handle potential NaN values
# Calculate volume spike (≥1.5× 20D Avg)
# 'volume' column should be present in the input 'data', and thus in 'data_bb'
if 'volume' in data_bb.columns:
volume_20d_avg = data_bb['volume'].rolling(window=20).mean()
volume_spike = data_bb['volume'] >= 1.5 * volume_20d_avg
# Additional volume contraction filter for sideways markets
volume_30d_avg = data_bb['volume'].rolling(window=30).mean()
volume_contraction = data_bb['volume'] < 0.7 * volume_30d_avg
else:
# If volume data is not available, assume no volume spike
volume_spike = pd.Series(False, index=data_bb.index)
volume_contraction = pd.Series(False, index=data_bb.index)
if self.logging is not None:
self.logging.warning("Volume data not available. Volume conditions will not be triggered.")
# Calculate RSI Bollinger Squeeze confirmation
# RSI column is now part of data_bb
if 'RSI' in data_bb.columns and not data_bb['RSI'].isna().all():
oversold_rsi, overbought_rsi = self.rsi_bollinger_confirmation(data_bb['RSI'])
else:
oversold_rsi = pd.Series(False, index=data_bb.index)
overbought_rsi = pd.Series(False, index=data_bb.index)
if self.logging is not None and ('RSI' not in data_bb.columns or data_bb['RSI'].isna().all()):
self.logging.warning("RSI data not available or all NaN. RSI Bollinger Squeeze will not be triggered.")
# Calculate conditions for sideways market (Mean Reversion)
if sideways_mask.any():
sideways_buy = (data_bb['close'] <= data_bb['LowerBand']) & (data_bb['RSI'] <= 40)
sideways_sell = (data_bb['close'] >= data_bb['UpperBand']) & (data_bb['RSI'] >= 60)
# Add enhanced confirmation for sideways markets
if self.config.get("SqueezeStrategy", False):
sideways_buy = sideways_buy & oversold_rsi & volume_contraction
sideways_sell = sideways_sell & overbought_rsi & volume_contraction
# Apply only where market is sideways and data is valid
buy_condition = buy_condition | (sideways_buy & sideways_mask & valid_data_mask)
sell_condition = sell_condition | (sideways_sell & sideways_mask & valid_data_mask)
# Calculate conditions for trending market (Breakout Mode)
if trending_mask.any():
trending_buy = (data_bb['close'] < data_bb['LowerBand']) & (data_bb['RSI'] < 50) & volume_spike
trending_sell = (data_bb['close'] > data_bb['UpperBand']) & (data_bb['RSI'] > 50) & volume_spike
# Add enhanced confirmation for trending markets
if self.config.get("SqueezeStrategy", False):
trending_buy = trending_buy & oversold_rsi
trending_sell = trending_sell & overbought_rsi
# Apply only where market is trending and data is valid
buy_condition = buy_condition | (trending_buy & trending_mask & valid_data_mask)
sell_condition = sell_condition | (trending_sell & trending_mask & valid_data_mask)
# Add buy/sell conditions as columns to the DataFrame
data_bb['BuySignal'] = buy_condition
data_bb['SellSignal'] = sell_condition
return data_bb
# Helper functions for CryptoTradingStrategy
def _volume_confirmation_crypto(self, current_volume, volume_ma):
"""Check volume surge against moving average for crypto strategy"""
if pd.isna(current_volume) or pd.isna(volume_ma) or volume_ma == 0:
return False
return current_volume > 1.5 * volume_ma
def _multi_timeframe_signal_crypto(self, current_price, rsi_value,
lower_band_15m, lower_band_1h,
upper_band_15m, upper_band_1h):
"""Generate signals with multi-timeframe confirmation for crypto strategy"""
# Ensure all inputs are not NaN before making comparisons
if any(pd.isna(val) for val in [current_price, rsi_value, lower_band_15m, lower_band_1h, upper_band_15m, upper_band_1h]):
return False, False
buy_signal = (current_price <= lower_band_15m and
current_price <= lower_band_1h and
rsi_value < 35)
sell_signal = (current_price >= upper_band_15m and
current_price >= upper_band_1h and
rsi_value > 65)
return buy_signal, sell_signal
def CryptoTradingStrategy(self, data):
"""Core trading algorithm with risk management
- Multi-Timeframe Confirmation: Combines 15-minute and 1-hour Bollinger Bands
- Adaptive Volatility Filtering: Uses ATR for dynamic stop-loss/take-profit
- Volume Spike Detection: Requires 1.5× average volume for confirmation
- EMA-Smoothed RSI: Reduces false signals in choppy markets
- Regime-Adaptive Parameters:
- Trending: 2σ bands, RSI 35/65 thresholds
- Sideways: 1.8σ bands, RSI 40/60 thresholds
- Strategy Logic:
- Long Entry: Price ≤ both 15m & 1h lower bands + RSI < 35 + Volume surge
- Short Entry: Price ≥ both 15m & 1h upper bands + RSI > 65 + Volume surge
- Exit: 2:1 risk-reward ratio with ATR-based stops
"""
if data.empty or 'close' not in data.columns or 'volume' not in data.columns:
if self.logging:
self.logging.warning("CryptoTradingStrategy: Input data is empty or missing 'close'/'volume' columns.")
return pd.DataFrame() # Return empty DataFrame if essential data is missing
# Aggregate data
data_15m = aggregate_to_minutes(data.copy(), 15)
data_1h = aggregate_to_hourly(data.copy(), 1)
if data_15m.empty or data_1h.empty:
if self.logging:
self.logging.warning("CryptoTradingStrategy: Not enough data for 15m or 1h aggregation.")
return pd.DataFrame() # Return original data if aggregation fails
# --- Calculate indicators for 15m timeframe ---
# Ensure 'close' and 'volume' exist before trying to access them
if 'close' not in data_15m.columns or 'volume' not in data_15m.columns:
if self.logging: self.logging.warning("CryptoTradingStrategy: 15m data missing close or volume.")
return data # Or an empty DF
price_data_15m = data_15m['close']
volume_data_15m = data_15m['volume']
upper_15m, sma_15m, lower_15m = BollingerBands.calculate_custom_bands(price_data_15m, window=20, num_std=2, min_periods=1)
# Use the static method from RSI class
rsi_15m = RSI.calculate_custom_rsi(price_data_15m, window=14, smoothing='EMA')
volume_ma_15m = volume_data_15m.rolling(window=20, min_periods=1).mean()
# Add 15m indicators to data_15m DataFrame
data_15m['UpperBand_15m'] = upper_15m
data_15m['SMA_15m'] = sma_15m
data_15m['LowerBand_15m'] = lower_15m
data_15m['RSI_15m'] = rsi_15m
data_15m['VolumeMA_15m'] = volume_ma_15m
# --- Calculate indicators for 1h timeframe ---
if 'close' not in data_1h.columns:
if self.logging: self.logging.warning("CryptoTradingStrategy: 1h data missing close.")
return data_15m # Return 15m data as 1h failed
price_data_1h = data_1h['close']
# Use the static method from BollingerBands class, setting min_periods to 1 explicitly
upper_1h, _, lower_1h = BollingerBands.calculate_custom_bands(price_data_1h, window=50, num_std=1.8, min_periods=1)
# Add 1h indicators to a temporary DataFrame to be merged
df_1h_indicators = pd.DataFrame(index=data_1h.index)
df_1h_indicators['UpperBand_1h'] = upper_1h
df_1h_indicators['LowerBand_1h'] = lower_1h
# Merge 1h indicators into 15m DataFrame
# Use reindex and ffill to propagate 1h values to 15m intervals
data_15m = pd.merge(data_15m, df_1h_indicators, left_index=True, right_index=True, how='left')
data_15m['UpperBand_1h'] = data_15m['UpperBand_1h'].ffill()
data_15m['LowerBand_1h'] = data_15m['LowerBand_1h'].ffill()
# --- Generate Signals ---
buy_signals = pd.Series(False, index=data_15m.index)
sell_signals = pd.Series(False, index=data_15m.index)
stop_loss_levels = pd.Series(np.nan, index=data_15m.index)
take_profit_levels = pd.Series(np.nan, index=data_15m.index)
# ATR calculation needs a rolling window, apply to 'high', 'low', 'close' if available
# Using a simplified ATR for now: std of close prices over the last 4 15-min periods (1 hour)
if 'close' in data_15m.columns:
atr_series = price_data_15m.rolling(window=4, min_periods=1).std()
else:
atr_series = pd.Series(0, index=data_15m.index) # No ATR if close is missing
for i in range(len(data_15m)):
if i == 0: continue # Skip first row for volume_ma_15m[i-1]
current_price = data_15m['close'].iloc[i]
current_volume = data_15m['volume'].iloc[i]
rsi_val = data_15m['RSI_15m'].iloc[i]
lb_15m = data_15m['LowerBand_15m'].iloc[i]
ub_15m = data_15m['UpperBand_15m'].iloc[i]
lb_1h = data_15m['LowerBand_1h'].iloc[i]
ub_1h = data_15m['UpperBand_1h'].iloc[i]
vol_ma = data_15m['VolumeMA_15m'].iloc[i-1] # Use previous period's MA
atr = atr_series.iloc[i]
vol_confirm = self._volume_confirmation_crypto(current_volume, vol_ma)
buy_signal, sell_signal = self._multi_timeframe_signal_crypto(
current_price, rsi_val, lb_15m, lb_1h, ub_15m, ub_1h
)
if buy_signal and vol_confirm:
buy_signals.iloc[i] = True
if not pd.isna(atr) and atr > 0:
stop_loss_levels.iloc[i] = current_price - 2 * atr
take_profit_levels.iloc[i] = current_price + 4 * atr
elif sell_signal and vol_confirm:
sell_signals.iloc[i] = True
if not pd.isna(atr) and atr > 0:
stop_loss_levels.iloc[i] = current_price + 2 * atr
take_profit_levels.iloc[i] = current_price - 4 * atr
data_15m['BuySignal'] = buy_signals
data_15m['SellSignal'] = sell_signals
data_15m['StopLoss'] = stop_loss_levels
data_15m['TakeProfit'] = take_profit_levels
return data_15m

View File

@@ -0,0 +1,336 @@
import pandas as pd
import numpy as np
import logging
from scipy.signal import find_peaks
from matplotlib.patches import Rectangle
from scipy import stats
import concurrent.futures
from functools import partial
from functools import lru_cache
import matplotlib.pyplot as plt
# Color configuration
# Plot colors
DARK_BG_COLOR = '#181C27'
LEGEND_BG_COLOR = '#333333'
TITLE_COLOR = 'white'
AXIS_LABEL_COLOR = 'white'
# Candlestick colors
CANDLE_UP_COLOR = '#089981' # Green
CANDLE_DOWN_COLOR = '#F23645' # Red
# Marker colors
MIN_COLOR = 'red'
MAX_COLOR = 'green'
# Line style colors
MIN_LINE_STYLE = 'g--' # Green dashed
MAX_LINE_STYLE = 'r--' # Red dashed
SMA7_LINE_STYLE = 'y-' # Yellow solid
SMA15_LINE_STYLE = 'm-' # Magenta solid
# SuperTrend colors
ST_COLOR_UP = 'g-'
ST_COLOR_DOWN = 'r-'
# Cache the calculation results by function parameters
@lru_cache(maxsize=32)
def cached_supertrend_calculation(period, multiplier, data_tuple):
# Convert tuple back to numpy arrays
high = np.array(data_tuple[0])
low = np.array(data_tuple[1])
close = np.array(data_tuple[2])
# Calculate TR and ATR using vectorized operations
tr = np.zeros_like(close)
tr[0] = high[0] - low[0]
hc_range = np.abs(high[1:] - close[:-1])
lc_range = np.abs(low[1:] - close[:-1])
hl_range = high[1:] - low[1:]
tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range])
# Use numpy's exponential moving average
atr = np.zeros_like(tr)
atr[0] = tr[0]
multiplier_ema = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema))
# Calculate bands
upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close)
for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close)
trend = np.zeros_like(close)
final_upper[0] = upper_band[0]
final_lower[0] = lower_band[0]
if close[0] <= upper_band[0]:
supertrend[0] = upper_band[0]
trend[0] = -1
else:
supertrend[0] = lower_band[0]
trend[0] = 1
for i in range(1, len(close)):
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
final_upper[i] = upper_band[i]
else:
final_upper[i] = final_upper[i-1]
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
final_lower[i] = lower_band[i]
else:
final_lower[i] = final_lower[i-1]
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
return {
'supertrend': supertrend,
'trend': trend,
'upper_band': final_upper,
'lower_band': final_lower
}
def calculate_supertrend_external(data, period, multiplier):
# Convert DataFrame columns to hashable tuples
high_tuple = tuple(data['high'])
low_tuple = tuple(data['low'])
close_tuple = tuple(data['close'])
# Call the cached function
return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple))
class Supertrends:
def __init__(self, data, verbose=False, display=False):
"""
Initialize the TrendDetectorSimple class.
Parameters:
- data: pandas DataFrame containing price data
- verbose: boolean, whether to display detailed logging information
- display: boolean, whether to enable display/plotting features
"""
self.data = data
self.verbose = verbose
self.display = display
# Only define display-related variables if display is True
if self.display:
# Plot style configuration
self.plot_style = 'dark_background'
self.bg_color = DARK_BG_COLOR
self.plot_size = (12, 8)
# Candlestick configuration
self.candle_width = 0.6
self.candle_up_color = CANDLE_UP_COLOR
self.candle_down_color = CANDLE_DOWN_COLOR
self.candle_alpha = 0.8
self.wick_width = 1
# Marker configuration
self.min_marker = '^'
self.min_color = MIN_COLOR
self.min_size = 100
self.max_marker = 'v'
self.max_color = MAX_COLOR
self.max_size = 100
self.marker_zorder = 100
# Line configuration
self.line_width = 1
self.min_line_style = MIN_LINE_STYLE
self.max_line_style = MAX_LINE_STYLE
self.sma7_line_style = SMA7_LINE_STYLE
self.sma15_line_style = SMA15_LINE_STYLE
# Text configuration
self.title_size = 14
self.title_color = TITLE_COLOR
self.axis_label_size = 12
self.axis_label_color = AXIS_LABEL_COLOR
# Legend configuration
self.legend_loc = 'best'
self.legend_bg_color = LEGEND_BG_COLOR
# Configure logging
logging.basicConfig(level=logging.INFO if verbose else logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger('TrendDetectorSimple')
# Convert data to pandas DataFrame if it's not already
if not isinstance(self.data, pd.DataFrame):
if isinstance(self.data, list):
self.data = pd.DataFrame({'close': self.data})
else:
raise ValueError("Data must be a pandas DataFrame or a list")
def calculate_tr(self):
"""
Calculate True Range (TR) for the price data.
True Range is the greatest of:
1. Current high - current low
2. |Current high - previous close|
3. |Current low - previous close|
Returns:
- Numpy array of TR values
"""
df = self.data.copy()
high = df['high'].values
low = df['low'].values
close = df['close'].values
tr = np.zeros_like(close)
tr[0] = high[0] - low[0] # First TR is just the first day's range
for i in range(1, len(close)):
# Current high - current low
hl_range = high[i] - low[i]
# |Current high - previous close|
hc_range = abs(high[i] - close[i-1])
# |Current low - previous close|
lc_range = abs(low[i] - close[i-1])
# TR is the maximum of these three values
tr[i] = max(hl_range, hc_range, lc_range)
return tr
def calculate_atr(self, period=14):
"""
Calculate Average True Range (ATR) for the price data.
ATR is the exponential moving average of the True Range over a specified period.
Parameters:
- period: int, the period for the ATR calculation (default: 14)
Returns:
- Numpy array of ATR values
"""
tr = self.calculate_tr()
atr = np.zeros_like(tr)
# First ATR value is just the first TR
atr[0] = tr[0]
# Calculate exponential moving average (EMA) of TR
multiplier = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier))
return atr
def detect_trends(self):
"""
Detect trends by identifying local minima and maxima in the price data
using scipy.signal.find_peaks.
Parameters:
- prominence: float, required prominence of peaks (relative to the price range)
- width: int, required width of peaks in data points
Returns:
- DataFrame with columns for timestamps, prices, and trend indicators
- Dictionary containing analysis results including linear regression, SMAs, and SuperTrend indicators
"""
df = self.data
# close_prices = df['close'].values
# max_peaks, _ = find_peaks(close_prices)
# min_peaks, _ = find_peaks(-close_prices)
# df['is_min'] = False
# df['is_max'] = False
# for peak in max_peaks:
# df.at[peak, 'is_max'] = True
# for peak in min_peaks:
# df.at[peak, 'is_min'] = True
# result = df[['timestamp', 'close', 'is_min', 'is_max']].copy()
# Perform linear regression on min_peaks and max_peaks
# min_prices = df['close'].iloc[min_peaks].values
# max_prices = df['close'].iloc[max_peaks].values
# Linear regression for min peaks if we have at least 2 points
# min_slope, min_intercept, min_r_value, _, _ = stats.linregress(min_peaks, min_prices)
# Linear regression for max peaks if we have at least 2 points
# max_slope, max_intercept, max_r_value, _, _ = stats.linregress(max_peaks, max_prices)
# Calculate Simple Moving Averages (SMA) for 7 and 15 periods
# sma_7 = pd.Series(close_prices).rolling(window=7, min_periods=1).mean().values
# sma_15 = pd.Series(close_prices).rolling(window=15, min_periods=1).mean().values
analysis_results = {}
# analysis_results['linear_regression'] = {
# 'min': {
# 'slope': min_slope,
# 'intercept': min_intercept,
# 'r_squared': min_r_value ** 2
# },
# 'max': {
# 'slope': max_slope,
# 'intercept': max_intercept,
# 'r_squared': max_r_value ** 2
# }
# }
# analysis_results['sma'] = {
# '7': sma_7,
# '15': sma_15
# }
# Calculate SuperTrend indicators
supertrend_results_list = self._calculate_supertrend_indicators()
analysis_results['supertrend'] = supertrend_results_list
return analysis_results
def calculate_supertrend_indicators(self):
"""
Calculate SuperTrend indicators with different parameter sets in parallel.
Returns:
- list, the SuperTrend results
"""
supertrend_params = [
{"period": 12, "multiplier": 3.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN},
{"period": 10, "multiplier": 1.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN},
{"period": 11, "multiplier": 2.0, "color_up": ST_COLOR_UP, "color_down": ST_COLOR_DOWN}
]
data = self.data.copy()
# For just 3 calculations, direct calculation might be faster than process pool
results = []
for p in supertrend_params:
result = calculate_supertrend_external(data, p["period"], p["multiplier"])
results.append(result)
supertrend_results_list = []
for params, result in zip(supertrend_params, results):
supertrend_results_list.append({
"results": result,
"params": params
})
return supertrend_results_list

View File

@@ -1,123 +1,88 @@
import pandas as pd
import numpy as np
import time
from cycles.supertrend import Supertrends
from cycles.market_fees import MarketFees
class Backtest:
@staticmethod
def run(min1_df, df, initial_usd, stop_loss_pct, debug=False):
def __init__(self, initial_usd, df, min1_df, init_strategy_fields) -> None:
self.initial_usd = initial_usd
self.usd = initial_usd
self.max_balance = initial_usd
self.coin = 0
self.position = 0
self.entry_price = 0
self.entry_time = None
self.current_trade_min1_start_idx = None
self.current_min1_end_idx = None
self.price_open = None
self.price_close = None
self.current_date = None
self.strategies = {}
self.df = df
self.min1_df = min1_df
self.trade_log = []
self.drawdowns = []
self.trades = []
self = init_strategy_fields(self)
def run(self, entry_strategy, exit_strategy, debug=False):
"""
Backtest a simple strategy using the meta supertrend (all three supertrends agree).
Buys when meta supertrend is positive, sells when negative, applies a percentage stop loss.
Runs the backtest using provided entry and exit strategy functions.
The method iterates over the main DataFrame (self.df), simulating trades based on the entry and exit strategies. It tracks balances, drawdowns, and logs each trade, including fees. At the end, it returns a dictionary of performance statistics.
Parameters:
- min1_df: pandas DataFrame, 1-minute timeframe data for more accurate stop loss checking (optional)
- initial_usd: float, starting USD amount
- stop_loss_pct: float, stop loss as a fraction (e.g. 0.05 for 5%)
- debug: bool, whether to print debug info
- entry_strategy: function, determines when to enter a trade. Should accept (self, i) and return True to enter.
- exit_strategy: function, determines when to exit a trade. Should accept (self, i) and return (exit_reason, sell_price) or (None, None) to hold.
- debug: bool, whether to print debug info (default: False)
Returns:
- dict with keys: initial_usd, final_usd, n_trades, win_rate, max_drawdown, avg_trade, trade_log, trades, total_fees_usd, and optionally first_trade and last_trade.
"""
_df = df.copy().reset_index(drop=True)
_df['timestamp'] = pd.to_datetime(_df['timestamp'])
supertrends = Supertrends(_df, verbose=False)
for i in range(1, len(self.df)):
self.price_open = self.df['open'].iloc[i]
self.price_close = self.df['close'].iloc[i]
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
# Shift meta_trend by one to avoid lookahead bias
meta_trend_signal = np.roll(meta_trend, 1)
meta_trend_signal[0] = 0 # or np.nan, but 0 means 'no signal' for first bar
self.current_date = self.df['timestamp'].iloc[i]
position = 0 # 0 = no position, 1 = long
entry_price = 0
usd = initial_usd
coin = 0
trade_log = []
max_balance = initial_usd
drawdowns = []
trades = []
entry_time = None
current_trade_min1_start_idx = None
if self.position == 0:
if entry_strategy(self, i):
self.handle_entry()
elif self.position == 1:
exit_test_results, sell_price = exit_strategy(self, i)
min1_df.index = pd.to_datetime(min1_df.index)
min1_timestamps = min1_df.index.values
last_print_time = time.time()
for i in range(1, len(_df)):
current_time = time.time()
if current_time - last_print_time >= 5:
progress = (i / len(_df)) * 100
print(f"\rProgress: {progress:.1f}%", end="", flush=True)
last_print_time = current_time
price_open = _df['open'].iloc[i]
price_close = _df['close'].iloc[i]
date = _df['timestamp'].iloc[i]
prev_mt = meta_trend_signal[i-1]
curr_mt = meta_trend_signal[i]
# Check stop loss if in position
if position == 1:
stop_loss_result = Backtest.check_stop_loss(
min1_df,
entry_time,
date,
entry_price,
stop_loss_pct,
coin,
usd,
debug,
current_trade_min1_start_idx
)
if stop_loss_result is not None:
trade_log_entry, current_trade_min1_start_idx, position, coin, entry_price = stop_loss_result
trade_log.append(trade_log_entry)
continue
# Update the start index for next check
current_trade_min1_start_idx = min1_df.index[min1_df.index <= date][-1]
# Entry: only if not in position and signal changes to 1
if position == 0 and prev_mt != 1 and curr_mt == 1:
entry_result = Backtest.handle_entry(usd, price_open, date)
coin, entry_price, entry_time, usd, position, trade_log_entry = entry_result
trade_log.append(trade_log_entry)
# Exit: only if in position and signal changes from 1 to -1
elif position == 1 and prev_mt == 1 and curr_mt == -1:
exit_result = Backtest.handle_exit(coin, price_open, entry_price, entry_time, date)
usd, coin, position, entry_price, trade_log_entry = exit_result
trade_log.append(trade_log_entry)
if exit_test_results is not None:
self.handle_exit(exit_test_results, sell_price)
# Track drawdown
balance = usd if position == 0 else coin * price_close
if balance > max_balance:
max_balance = balance
drawdown = (max_balance - balance) / max_balance
drawdowns.append(drawdown)
balance = self.usd if self.position == 0 else self.coin * self.price_close
print("\rProgress: 100%\r\n", end="", flush=True)
if balance > self.max_balance:
self.max_balance = balance
drawdown = (self.max_balance - balance) / self.max_balance
self.drawdowns.append(drawdown)
# If still in position at end, sell at last close
if position == 1:
exit_result = Backtest.handle_exit(coin, _df['close'].iloc[-1], entry_price, entry_time, _df['timestamp'].iloc[-1])
usd, coin, position, entry_price, trade_log_entry = exit_result
trade_log.append(trade_log_entry)
if self.position == 1:
self.handle_exit("EOD", None)
# Calculate statistics
final_balance = usd
n_trades = len(trade_log)
wins = [1 for t in trade_log if t['exit'] is not None and t['exit'] > t['entry']]
final_balance = self.usd
n_trades = len(self.trade_log)
wins = [1 for t in self.trade_log if t['exit'] is not None and t['exit'] > t['entry']]
win_rate = len(wins) / n_trades if n_trades > 0 else 0
max_drawdown = max(drawdowns) if drawdowns else 0
avg_trade = np.mean([t['exit']/t['entry']-1 for t in trade_log if t['exit'] is not None]) if trade_log else 0
max_drawdown = max(self.drawdowns) if self.drawdowns else 0
avg_trade = np.mean([t['exit']/t['entry']-1 for t in self.trade_log if t['exit'] is not None]) if self.trade_log else 0
trades = []
total_fees_usd = 0.0
for trade in trade_log:
for trade in self.trade_log:
if trade['exit'] is not None:
profit_pct = (trade['exit'] - trade['entry']) / trade['entry']
else:
@@ -128,103 +93,73 @@ class Backtest:
'entry': trade['entry'],
'exit': trade['exit'],
'profit_pct': profit_pct,
'type': trade.get('type', 'SELL'),
'fee_usd': trade.get('fee_usd')
'type': trade['type'],
'fee_usd': trade['fee_usd']
})
fee_usd = trade.get('fee_usd')
total_fees_usd += fee_usd
results = {
"initial_usd": initial_usd,
"initial_usd": self.initial_usd,
"final_usd": final_balance,
"n_trades": n_trades,
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"trade_log": trade_log,
"trade_log": self.trade_log,
"trades": trades,
"total_fees_usd": total_fees_usd,
}
if n_trades > 0:
results["first_trade"] = {
"entry_time": trade_log[0]['entry_time'],
"entry": trade_log[0]['entry']
"entry_time": self.trade_log[0]['entry_time'],
"entry": self.trade_log[0]['entry']
}
results["last_trade"] = {
"exit_time": trade_log[-1]['exit_time'],
"exit": trade_log[-1]['exit']
"exit_time": self.trade_log[-1]['exit_time'],
"exit": self.trade_log[-1]['exit']
}
return results
@staticmethod
def check_stop_loss(min1_df, entry_time, date, entry_price, stop_loss_pct, coin, usd, debug, current_trade_min1_start_idx):
stop_price = entry_price * (1 - stop_loss_pct)
def handle_entry(self):
entry_fee = MarketFees.calculate_okx_taker_maker_fee(self.usd, is_maker=False)
usd_after_fee = self.usd - entry_fee
if current_trade_min1_start_idx is None:
current_trade_min1_start_idx = min1_df.index[min1_df.index >= entry_time][0]
current_min1_end_idx = min1_df.index[min1_df.index <= date][-1]
self.coin = usd_after_fee / self.price_open
self.entry_price = self.price_open
self.entry_time = self.current_date
self.usd = 0
self.position = 1
# Check all 1-minute candles in between for stop loss
min1_slice = min1_df.loc[current_trade_min1_start_idx:current_min1_end_idx]
if (min1_slice['low'] <= stop_price).any():
# Stop loss triggered, find the exact candle
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
# More realistic fill: if open < stop, fill at open, else at stop
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
if debug:
print(f"STOP LOSS triggered: entry={entry_price}, stop={stop_price}, sell_price={sell_price}, entry_time={entry_time}, stop_time={stop_candle.name}")
btc_to_sell = coin
usd_gross = btc_to_sell * sell_price
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
trade_log_entry = {
'type': 'STOP',
'entry': entry_price,
'exit': sell_price,
'entry_time': entry_time,
'exit_time': stop_candle.name,
'fee_usd': exit_fee
}
# After stop loss, reset position and entry
return trade_log_entry, None, 0, 0, 0
return None
@staticmethod
def handle_entry(usd, price_open, date):
entry_fee = MarketFees.calculate_okx_taker_maker_fee(usd, is_maker=False)
usd_after_fee = usd - entry_fee
coin = usd_after_fee / price_open
entry_price = price_open
entry_time = date
usd = 0
position = 1
trade_log_entry = {
'type': 'BUY',
'entry': entry_price,
'entry': self.entry_price,
'exit': None,
'entry_time': entry_time,
'entry_time': self.entry_time,
'exit_time': None,
'fee_usd': entry_fee
}
return coin, entry_price, entry_time, usd, position, trade_log_entry
self.trade_log.append(trade_log_entry)
@staticmethod
def handle_exit(coin, price_open, entry_price, entry_time, date):
btc_to_sell = coin
usd_gross = btc_to_sell * price_open
def handle_exit(self, exit_reason, sell_price):
btc_to_sell = self.coin
exit_price = sell_price if sell_price is not None else self.price_open
usd_gross = btc_to_sell * exit_price
exit_fee = MarketFees.calculate_okx_taker_maker_fee(usd_gross, is_maker=False)
usd = usd_gross - exit_fee
trade_log_entry = {
'type': 'SELL',
'entry': entry_price,
'exit': price_open,
'entry_time': entry_time,
'exit_time': date,
self.usd = usd_gross - exit_fee
exit_log_entry = {
'type': exit_reason,
'entry': self.entry_price,
'exit': exit_price,
'entry_time': self.entry_time,
'exit_time': self.current_date,
'fee_usd': exit_fee
}
coin = 0
position = 0
entry_price = 0
return usd, coin, position, entry_price, trade_log_entry
self.coin = 0
self.position = 0
self.entry_price = 0
self.trade_log.append(exit_log_entry)

View File

@@ -1,86 +1,71 @@
import os
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
class BacktestCharts:
def __init__(self, charts_dir="charts"):
self.charts_dir = charts_dir
os.makedirs(self.charts_dir, exist_ok=True)
def plot_profit_ratio_vs_stop_loss(self, results, filename="profit_ratio_vs_stop_loss.png"):
@staticmethod
def plot(df, meta_trend):
"""
Plots profit ratio vs stop loss percentage for each timeframe.
Parameters:
- results: list of dicts, each with keys: 'timeframe', 'stop_loss_pct', 'profit_ratio'
- filename: output filename (will be saved in charts_dir)
Plot close price line chart with a bar at the bottom: green when trend is 1, red when trend is 0.
The bar stays at the bottom even when zooming/panning.
- df: DataFrame with columns ['close', ...] and a datetime index or 'timestamp' column.
- meta_trend: array-like, same length as df, values 1 (green) or 0 (red).
"""
# Organize data by timeframe
from collections import defaultdict
data = defaultdict(lambda: {"stop_loss_pct": [], "profit_ratio": []})
for row in results:
tf = row["timeframe"]
data[tf]["stop_loss_pct"].append(row["stop_loss_pct"])
data[tf]["profit_ratio"].append(row["profit_ratio"])
plt.figure(figsize=(10, 6))
for tf, vals in data.items():
# Sort by stop_loss_pct for smooth lines
sorted_pairs = sorted(zip(vals["stop_loss_pct"], vals["profit_ratio"]))
stop_loss, profit_ratio = zip(*sorted_pairs)
plt.plot(
[s * 100 for s in stop_loss], # Convert to percent
profit_ratio,
marker="o",
label=tf
fig, (ax_price, ax_bar) = plt.subplots(
nrows=2, ncols=1, figsize=(16, 8), sharex=True,
gridspec_kw={'height_ratios': [12, 1]}
)
plt.xlabel("Stop Loss (%)")
plt.ylabel("Profit Ratio")
plt.title("Profit Ratio vs Stop Loss (%) per Timeframe")
plt.legend(title="Timeframe")
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
sns.lineplot(x=df.index, y=df['close'], label='Close Price', color='blue', ax=ax_price)
ax_price.set_title('Close Price with Trend Bar (Green=1, Red=0)')
ax_price.set_ylabel('Price')
ax_price.grid(True, alpha=0.3)
ax_price.legend()
output_path = os.path.join(self.charts_dir, filename)
plt.savefig(output_path)
plt.close()
# Clean meta_trend: ensure only 0/1, handle NaNs by forward-fill then fill remaining with 0
meta_trend_arr = np.asarray(meta_trend)
if not np.issubdtype(meta_trend_arr.dtype, np.number):
meta_trend_arr = pd.Series(meta_trend_arr).astype(float).to_numpy()
if np.isnan(meta_trend_arr).any():
meta_trend_arr = pd.Series(meta_trend_arr).fillna(method='ffill').fillna(0).astype(int).to_numpy()
else:
meta_trend_arr = meta_trend_arr.astype(int)
meta_trend_arr = np.where(meta_trend_arr != 1, 0, 1) # force only 0 or 1
if hasattr(df.index, 'to_numpy'):
x_vals = df.index.to_numpy()
else:
x_vals = np.array(df.index)
def plot_average_trade_vs_stop_loss(self, results, filename="average_trade_vs_stop_loss.png"):
"""
Plots average trade vs stop loss percentage for each timeframe.
# Find contiguous regions
regions = []
start = 0
for i in range(1, len(meta_trend_arr)):
if meta_trend_arr[i] != meta_trend_arr[i-1]:
regions.append((start, i-1, meta_trend_arr[i-1]))
start = i
regions.append((start, len(meta_trend_arr)-1, meta_trend_arr[-1]))
Parameters:
- results: list of dicts, each with keys: 'timeframe', 'stop_loss_pct', 'average_trade'
- filename: output filename (will be saved in charts_dir)
"""
from collections import defaultdict
data = defaultdict(lambda: {"stop_loss_pct": [], "average_trade": []})
for row in results:
tf = row["timeframe"]
if "average_trade" not in row:
continue # Skip rows without average_trade
data[tf]["stop_loss_pct"].append(row["stop_loss_pct"])
data[tf]["average_trade"].append(row["average_trade"])
# Draw red vertical lines at the start of each new region (except the first)
for region_idx in range(1, len(regions)):
region_start = regions[region_idx][0]
ax_price.axvline(x=x_vals[region_start], color='black', linestyle='--', alpha=0.7, linewidth=1)
plt.figure(figsize=(10, 6))
for tf, vals in data.items():
# Sort by stop_loss_pct for smooth lines
sorted_pairs = sorted(zip(vals["stop_loss_pct"], vals["average_trade"]))
stop_loss, average_trade = zip(*sorted_pairs)
plt.plot(
[s * 100 for s in stop_loss], # Convert to percent
average_trade,
marker="o",
label=tf
)
for start, end, trend in regions:
color = '#089981' if trend == 1 else '#F23645'
# Offset by 1 on x: span from x_vals[start] to x_vals[end+1] if possible
x_start = x_vals[start]
x_end = x_vals[end+1] if end+1 < len(x_vals) else x_vals[end]
ax_bar.axvspan(x_start, x_end, color=color, alpha=1, ymin=0, ymax=1)
plt.xlabel("Stop Loss (%)")
plt.ylabel("Average Trade")
plt.title("Average Trade vs Stop Loss (%) per Timeframe")
plt.legend(title="Timeframe")
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
ax_bar.set_ylim(0, 1)
ax_bar.set_yticks([])
ax_bar.set_ylabel('Trend')
ax_bar.set_xlabel('Time')
ax_bar.grid(False)
ax_bar.set_title('Meta Trend')
plt.tight_layout(h_pad=0.1)
plt.show()
output_path = os.path.join(self.charts_dir, filename)
plt.savefig(output_path)
plt.close()

View File

@@ -2,6 +2,6 @@ import pandas as pd
class MarketFees:
@staticmethod
def calculate_okx_taker_maker_fee(amount, is_maker=True):
def calculate_okx_taker_maker_fee(amount, is_maker=True) -> float:
fee_rate = 0.0008 if is_maker else 0.0010
return amount * fee_rate

View File

@@ -1,185 +0,0 @@
import pandas as pd
import numpy as np
import logging
from functools import lru_cache
@lru_cache(maxsize=32)
def cached_supertrend_calculation(period, multiplier, data_tuple):
high = np.array(data_tuple[0])
low = np.array(data_tuple[1])
close = np.array(data_tuple[2])
tr = np.zeros_like(close)
tr[0] = high[0] - low[0]
hc_range = np.abs(high[1:] - close[:-1])
lc_range = np.abs(low[1:] - close[:-1])
hl_range = high[1:] - low[1:]
tr[1:] = np.maximum.reduce([hl_range, hc_range, lc_range])
atr = np.zeros_like(tr)
atr[0] = tr[0]
multiplier_ema = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier_ema) + (atr[i-1] * (1 - multiplier_ema))
upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close)
for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close)
trend = np.zeros_like(close)
final_upper[0] = upper_band[0]
final_lower[0] = lower_band[0]
if close[0] <= upper_band[0]:
supertrend[0] = upper_band[0]
trend[0] = -1
else:
supertrend[0] = lower_band[0]
trend[0] = 1
for i in range(1, len(close)):
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
final_upper[i] = upper_band[i]
else:
final_upper[i] = final_upper[i-1]
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
final_lower[i] = lower_band[i]
else:
final_lower[i] = final_lower[i-1]
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
return {
'supertrend': supertrend,
'trend': trend,
'upper_band': final_upper,
'lower_band': final_lower
}
def calculate_supertrend_external(data, period, multiplier):
high_tuple = tuple(data['high'])
low_tuple = tuple(data['low'])
close_tuple = tuple(data['close'])
return cached_supertrend_calculation(period, multiplier, (high_tuple, low_tuple, close_tuple))
class Supertrends:
def __init__(self, data, verbose=False, display=False):
self.data = data
self.verbose = verbose
logging.basicConfig(level=logging.INFO if verbose else logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger('TrendDetectorSimple')
if not isinstance(self.data, pd.DataFrame):
if isinstance(self.data, list):
self.data = pd.DataFrame({'close': self.data})
else:
raise ValueError("Data must be a pandas DataFrame or a list")
def calculate_tr(self):
df = self.data.copy()
high = df['high'].values
low = df['low'].values
close = df['close'].values
tr = np.zeros_like(close)
tr[0] = high[0] - low[0]
for i in range(1, len(close)):
hl_range = high[i] - low[i]
hc_range = abs(high[i] - close[i-1])
lc_range = abs(low[i] - close[i-1])
tr[i] = max(hl_range, hc_range, lc_range)
return tr
def calculate_atr(self, period=14):
tr = self.calculate_tr()
atr = np.zeros_like(tr)
atr[0] = tr[0]
multiplier = 2.0 / (period + 1)
for i in range(1, len(tr)):
atr[i] = (tr[i] * multiplier) + (atr[i-1] * (1 - multiplier))
return atr
def calculate_supertrend(self, period=10, multiplier=3.0):
"""
Calculate SuperTrend indicator for the price data.
SuperTrend is a trend-following indicator that uses ATR to determine the trend direction.
Parameters:
- period: int, the period for the ATR calculation (default: 10)
- multiplier: float, the multiplier for the ATR (default: 3.0)
Returns:
- Dictionary containing SuperTrend values, trend direction, and upper/lower bands
"""
df = self.data.copy()
high = df['high'].values
low = df['low'].values
close = df['close'].values
atr = self.calculate_atr(period)
upper_band = np.zeros_like(close)
lower_band = np.zeros_like(close)
for i in range(len(close)):
hl_avg = (high[i] + low[i]) / 2
upper_band[i] = hl_avg + (multiplier * atr[i])
lower_band[i] = hl_avg - (multiplier * atr[i])
final_upper = np.zeros_like(close)
final_lower = np.zeros_like(close)
supertrend = np.zeros_like(close)
trend = np.zeros_like(close)
final_upper[0] = upper_band[0]
final_lower[0] = lower_band[0]
if close[0] <= upper_band[0]:
supertrend[0] = upper_band[0]
trend[0] = -1
else:
supertrend[0] = lower_band[0]
trend[0] = 1
for i in range(1, len(close)):
if (upper_band[i] < final_upper[i-1]) or (close[i-1] > final_upper[i-1]):
final_upper[i] = upper_band[i]
else:
final_upper[i] = final_upper[i-1]
if (lower_band[i] > final_lower[i-1]) or (close[i-1] < final_lower[i-1]):
final_lower[i] = lower_band[i]
else:
final_lower[i] = final_lower[i-1]
if supertrend[i-1] == final_upper[i-1] and close[i] <= final_upper[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
elif supertrend[i-1] == final_upper[i-1] and close[i] > final_upper[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] >= final_lower[i]:
supertrend[i] = final_lower[i]
trend[i] = 1
elif supertrend[i-1] == final_lower[i-1] and close[i] < final_lower[i]:
supertrend[i] = final_upper[i]
trend[i] = -1
supertrend_results = {
'supertrend': supertrend,
'trend': trend,
'upper_band': final_upper,
'lower_band': final_lower
}
return supertrend_results
def calculate_supertrend_indicators(self):
supertrend_params = [
{"period": 12, "multiplier": 3.0},
{"period": 10, "multiplier": 1.0},
{"period": 11, "multiplier": 2.0}
]
results = []
for p in supertrend_params:
result = self.calculate_supertrend(period=p["period"], multiplier=p["multiplier"])
results.append({
"results": result,
"params": p
})
return results

View File

@@ -1,5 +1,80 @@
import pandas as pd
def check_data(data_df: pd.DataFrame) -> bool:
"""
Checks if the input DataFrame has a DatetimeIndex.
Args:
data_df (pd.DataFrame): DataFrame to check.
Returns:
bool: True if the DataFrame has a DatetimeIndex, False otherwise.
"""
if not isinstance(data_df.index, pd.DatetimeIndex):
print("Warning: Input DataFrame must have a DatetimeIndex.")
return False
agg_rules = {}
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.")
return False
return agg_rules
def aggregate_to_weekly(data_df: pd.DataFrame, weeks: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to weekly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the week.
'close' will be the last 'close' price of the week.
'high' will be the maximum 'high' price of the week.
'low' will be the minimum 'low' price of the week.
'volume' (if present) will be the sum of volumes for the week.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
weeks (int): The number of weeks to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to weekly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the week.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for weekly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to weekly frequency and apply aggregation rules
weekly_data = data_df.resample(f'{weeks}W').agg(agg_rules)
weekly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the week
if not weekly_data.empty and isinstance(weekly_data.index, pd.DatetimeIndex):
weekly_data.index = weekly_data.index.floor('W')
return weekly_data
def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
"""
Aggregates time-series financial data to daily OHLCV format.
@@ -24,22 +99,8 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
Raises:
ValueError: If the input DataFrame does not have a DatetimeIndex.
"""
if not isinstance(data_df.index, pd.DatetimeIndex):
raise ValueError("Input DataFrame must have a DatetimeIndex.")
agg_rules = {}
# Define aggregation rules based on available columns
if 'open' in data_df.columns:
agg_rules['open'] = 'first'
if 'high' in data_df.columns:
agg_rules['high'] = 'max'
if 'low' in data_df.columns:
agg_rules['low'] = 'min'
if 'close' in data_df.columns:
agg_rules['close'] = 'last'
if 'volume' in data_df.columns:
agg_rules['volume'] = 'sum'
agg_rules = check_data(data_df)
if not agg_rules:
# Log a warning or raise an error if no relevant columns are found
@@ -58,3 +119,81 @@ def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
daily_data.dropna(how='all', inplace=True)
return daily_data
def aggregate_to_hourly(data_df: pd.DataFrame, hours: int = 1) -> pd.DataFrame:
"""
Aggregates time-series financial data to hourly OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the hour.
'close' will be the last 'close' price of the hour.
'high' will be the maximum 'high' price of the hour.
'low' will be the minimum 'low' price of the hour.
'volume' (if present) will be the sum of volumes for the hour.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
hours (int): The number of hours to aggregate to. Default is 1.
Returns:
pd.DataFrame: DataFrame aggregated to hourly OHLCV data.
The index will be a DatetimeIndex with the time set to the start of the hour.
Returns an empty DataFrame if no relevant OHLCV columns are found.
"""
agg_rules = check_data(data_df)
if not agg_rules:
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for hourly aggregation.")
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to hourly frequency and apply aggregation rules
hourly_data = data_df.resample(f'{hours}h').agg(agg_rules)
hourly_data.dropna(how='all', inplace=True)
# Adjust timestamps to the start of the hour
if not hourly_data.empty and isinstance(hourly_data.index, pd.DatetimeIndex):
hourly_data.index = hourly_data.index.floor('h')
return hourly_data
def aggregate_to_minutes(data_df: pd.DataFrame, minutes: int) -> pd.DataFrame:
"""
Aggregates time-series financial data to N-minute OHLCV format.
The input DataFrame is expected to have a DatetimeIndex.
'open' will be the first 'open' price of the N-minute interval.
'close' will be the last 'close' price of the N-minute interval.
'high' will be the maximum 'high' price of the N-minute interval.
'low' will be the minimum 'low' price of the N-minute interval.
'volume' (if present) will be the sum of volumes for the N-minute interval.
Args:
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
like 'open', 'high', 'low', 'close', and optionally 'volume'.
minutes (int): The number of minutes to aggregate to.
Returns:
pd.DataFrame: DataFrame aggregated to N-minute OHLCV data.
The index will be a DatetimeIndex.
Returns an empty DataFrame if no relevant OHLCV columns are found or
if the input DataFrame does not have a DatetimeIndex.
"""
agg_rules_obj = check_data(data_df) # check_data returns rules or False
if not agg_rules_obj:
# check_data already prints a warning if index is not DatetimeIndex or no OHLCV columns
# Ensure an empty DataFrame with a DatetimeIndex is returned for consistency
return pd.DataFrame(index=pd.to_datetime([]))
# Resample to N-minute frequency and apply aggregation rules
# Using .agg(agg_rules_obj) where agg_rules_obj is the dict from check_data
resampled_data = data_df.resample(f'{minutes}min').agg(agg_rules_obj)
resampled_data.dropna(how='all', inplace=True)
return resampled_data

View File

@@ -8,6 +8,7 @@ The `Analysis` module includes classes for calculating common technical indicato
- **Relative Strength Index (RSI)**: Implemented in `cycles/Analysis/rsi.py`.
- **Bollinger Bands**: Implemented in `cycles/Analysis/boillinger_band.py`.
- Note: Trading strategies are detailed in `strategies.md`.
## Class: `RSI`
@@ -15,64 +16,91 @@ Found in `cycles/Analysis/rsi.py`.
Calculates the Relative Strength Index.
### Mathematical Model
1. **Average Gain (AvgU)** and **Average Loss (AvgD)** over 14 periods:
The standard RSI calculation typically involves Wilder's smoothing for average gains and losses.
1. **Price Change (Delta)**: Difference between consecutive closing prices.
2. **Gain and Loss**: Separate positive (gain) and negative (loss, expressed as positive) price changes.
3. **Average Gain (AvgU)** and **Average Loss (AvgD)**: Smoothed averages of gains and losses over the RSI period. Wilder's smoothing is a specific type of exponential moving average (EMA):
- Initial AvgU/AvgD: Simple Moving Average (SMA) over the first `period` values.
- Subsequent AvgU: `(Previous AvgU * (period - 1) + Current Gain) / period`
- Subsequent AvgD: `(Previous AvgD * (period - 1) + Current Loss) / period`
4. **Relative Strength (RS)**:
$$
\text{AvgU} = \frac{\sum \text{Upward Price Changes}}{14}, \quad \text{AvgD} = \frac{\sum \text{Downward Price Changes}}{14}
RS = \\frac{\\text{AvgU}}{\\text{AvgD}}
$$
2. **Relative Strength (RS)**:
5. **RSI**:
$$
RS = \frac{\text{AvgU}}{\text{AvgD}}
$$
3. **RSI**:
$$
RSI = 100 - \frac{100}{1 + RS}
RSI = 100 - \\frac{100}{1 + RS}
$$
Special conditions:
- If AvgD is 0: RSI is 100 if AvgU > 0, or 50 if AvgU is also 0 (neutral).
### `__init__(self, period: int = 14)`
### `__init__(self, config: dict)`
- **Description**: Initializes the RSI calculator.
- **Parameters**:
- `period` (int, optional): The period for RSI calculation. Defaults to 14. Must be a positive integer.
- **Parameters**:\n - `config` (dict): Configuration dictionary. Must contain an `'rsi_period'` key with a positive integer value (e.g., `{'rsi_period': 14}`).
### `calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame`
- **Description**: Calculates the RSI and adds it as an 'RSI' column to the input DataFrame. Handles cases where data length is less than the period by returning the original DataFrame with a warning.
- **Description**: Calculates the RSI (using Wilder's smoothing by default) and adds it as an 'RSI' column to the input DataFrame. This method utilizes `calculate_custom_rsi` internally with `smoothing='EMA'`.
- **Parameters**:\n - `data_df` (pd.DataFrame): DataFrame with historical price data. Must contain the `price_column`.\n - `price_column` (str, optional): The name of the column containing price data. Defaults to 'close'.
- **Returns**: `pd.DataFrame` - A copy of the input DataFrame with an added 'RSI' column. If data length is insufficient for the period, the 'RSI' column will contain `np.nan`.
### `calculate_custom_rsi(price_series: pd.Series, window: int = 14, smoothing: str = 'SMA') -> pd.Series` (Static Method)
- **Description**: Calculates RSI with a specified window and smoothing method (SMA or EMA). This is the core calculation engine.
- **Parameters**:
- `data_df` (pd.DataFrame): DataFrame with historical price data. Must contain the `price_column`.
- `price_column` (str, optional): The name of the column containing price data. Defaults to 'close'.
- **Returns**: `pd.DataFrame` - The input DataFrame with an added 'RSI' column (containing `np.nan` for initial periods where RSI cannot be calculated). Returns a copy of the original DataFrame if the period is larger than the number of data points.
- `price_series` (pd.Series): Series of prices.
- `window` (int, optional): The period for RSI calculation. Defaults to 14. Must be a positive integer.
- `smoothing` (str, optional): Smoothing method, can be 'SMA' (Simple Moving Average) or 'EMA' (Exponential Moving Average, specifically Wilder's smoothing when `alpha = 1/window`). Defaults to 'SMA'.
- **Returns**: `pd.Series` - Series containing the RSI values. Returns a series of NaNs if data length is insufficient.
## Class: `BollingerBands`
Found in `cycles/Analysis/boillinger_band.py`.
## **Bollinger Bands**
Calculates Bollinger Bands.
### Mathematical Model
1. **Middle Band**: 20-day Simple Moving Average (SMA)
1. **Middle Band**: Simple Moving Average (SMA) over `period`.
$$
\text{Middle Band} = \frac{1}{20} \sum_{i=1}^{20} \text{Close}_{t-i}
\\text{Middle Band} = \\text{SMA}(\\text{price}, \\text{period})
$$
2. **Upper Band**: Middle Band + 2 × 20-day Standard Deviation (σ)
2. **Standard Deviation (σ)**: Standard deviation of price over `period`.
3. **Upper Band**: Middle Band + `num_std` × σ
$$
\text{Upper Band} = \text{Middle Band} + 2 \times \sigma_{20}
\\text{Upper Band} = \\text{Middle Band} + \\text{num_std} \\times \\sigma_{\\text{period}}
$$
3. **Lower Band**: Middle Band 2 × 20-day Standard Deviation (σ)
4. **Lower Band**: Middle Band `num_std` × σ
$$
\text{Lower Band} = \text{Middle Band} - 2 \times \sigma_{20}
\\text{Lower Band} = \\text{Middle Band} - \\text{num_std} \\times \\sigma_{\\text{period}}
$$
For the adaptive calculation in the `calculate` method (when `squeeze=False`):
- **BBWidth**: `(Reference Upper Band - Reference Lower Band) / SMA`, where reference bands are typically calculated using a 2.0 standard deviation multiplier.
- **MarketRegime**: Determined by comparing `BBWidth` to a threshold from the configuration. `1` for sideways, `0` for trending.
- The `num_std` used for the final Upper and Lower Bands then varies based on this `MarketRegime` and the `bb_std_dev_multiplier` values for "trending" and "sideways" markets from the configuration, applied row-wise.
### `__init__(self, period: int = 20, std_dev_multiplier: float = 2.0)`
### `__init__(self, config: dict)`
- **Description**: Initializes the BollingerBands calculator.
- **Parameters**:
- `period` (int, optional): The period for the moving average and standard deviation. Defaults to 20. Must be a positive integer.
- `std_dev_multiplier` (float, optional): The number of standard deviations for the upper and lower bands. Defaults to 2.0. Must be positive.
- **Parameters**:\n - `config` (dict): Configuration dictionary. It must contain:
- `'bb_period'` (int): Positive integer for the moving average and standard deviation period.
- `'trending'` (dict): Containing `'bb_std_dev_multiplier'` (float, positive) for trending markets.
- `'sideways'` (dict): Containing `'bb_std_dev_multiplier'` (float, positive) for sideways markets.
- `'bb_width'` (float): Positive float threshold for determining market regime.
### `calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame`
### `calculate(self, data_df: pd.DataFrame, price_column: str = 'close', squeeze: bool = False) -> pd.DataFrame`
- **Description**: Calculates Bollinger Bands and adds 'SMA' (Simple Moving Average), 'UpperBand', and 'LowerBand' columns to the DataFrame.
- **Description**: Calculates Bollinger Bands and adds relevant columns to the DataFrame.
- If `squeeze` is `False` (default): Calculates adaptive Bollinger Bands. It determines the market regime (trending/sideways) based on `BBWidth` and applies different standard deviation multipliers (from the `config`) on a row-by-row basis. Adds 'SMA', 'UpperBand', 'LowerBand', 'BBWidth', and 'MarketRegime' columns.
- If `squeeze` is `True`: Calculates simpler Bollinger Bands with a fixed window of 14 and a standard deviation multiplier of 1.5 by calling `calculate_custom_bands`. Adds 'SMA', 'UpperBand', 'LowerBand' columns; 'BBWidth' and 'MarketRegime' will be `NaN`.
- **Parameters**:\n - `data_df` (pd.DataFrame): DataFrame with price data. Must include the `price_column`.\n - `price_column` (str, optional): The name of the column containing the price data. Defaults to 'close'.\n - `squeeze` (bool, optional): If `True`, calculates bands with fixed parameters (window 14, std 1.5). Defaults to `False`.
- **Returns**: `pd.DataFrame` - A copy of the original DataFrame with added Bollinger Band related columns.
### `calculate_custom_bands(price_series: pd.Series, window: int = 20, num_std: float = 2.0, min_periods: int = None) -> tuple[pd.Series, pd.Series, pd.Series]` (Static Method)
- **Description**: Calculates Bollinger Bands with a specified window, standard deviation multiplier, and minimum periods.
- **Parameters**:
- `data_df` (pd.DataFrame): DataFrame with price data. Must include the `price_column`.
- `price_column` (str, optional): The name of the column containing the price data (e.g., 'close'). Defaults to 'close'.
- **Returns**: `pd.DataFrame` - The original DataFrame with added columns: 'SMA', 'UpperBand', 'LowerBand'.
- `price_series` (pd.Series): Series of prices.
- `window` (int, optional): The period for the moving average and standard deviation. Defaults to 20.
- `num_std` (float, optional): The number of standard deviations for the upper and lower bands. Defaults to 2.0.
- `min_periods` (int, optional): Minimum number of observations in window required to have a value. Defaults to `window` if `None`.
- **Returns**: `tuple[pd.Series, pd.Series, pd.Series]` - A tuple containing the Upper band, SMA, and Lower band series.

98
docs/strategies.md Normal file
View File

@@ -0,0 +1,98 @@
# Trading Strategies (`cycles/Analysis/strategies.py`)
This document outlines the trading strategies implemented within the `Strategy` class. These strategies utilize technical indicators calculated by other classes in the `Analysis` module.
## Class: `Strategy`
Manages and executes different trading strategies.
### `__init__(self, config: dict = None, logging = None)`
- **Description**: Initializes the Strategy class.
- **Parameters**:
- `config` (dict, optional): Configuration dictionary containing parameters for various indicators and strategy settings. Must be provided if strategies requiring config are used.
- `logging` (logging.Logger, optional): Logger object for outputting messages. Defaults to `None`.
### `run(self, data: pd.DataFrame, strategy_name: str) -> pd.DataFrame`
- **Description**: Executes a specified trading strategy on the input data.
- **Parameters**:
- `data` (pd.DataFrame): Input DataFrame containing at least price data (e.g., 'close', 'volume'). Specific strategies might require other columns or will calculate them.
- `strategy_name` (str): The name of the strategy to run. Supported names include:
- `"MarketRegimeStrategy"`
- `"CryptoTradingStrategy"`
- `"no_strategy"` (or any other unrecognized name will default to this)
- **Returns**: `pd.DataFrame` - A DataFrame containing the original data augmented with indicator values, and `BuySignal` and `SellSignal` (boolean) columns specific to the executed strategy. The structure of the DataFrame (e.g., daily, 15-minute) depends on the strategy.
### `no_strategy(self, data: pd.DataFrame) -> pd.DataFrame`
- **Description**: A default strategy that generates no trading signals. It can serve as a baseline or placeholder.
- **Parameters**:
- `data` (pd.DataFrame): Input data DataFrame.
- **Returns**: `pd.DataFrame` - The input DataFrame with `BuySignal` and `SellSignal` columns added, both containing all `False` values.
---
## Implemented Strategies
### 1. `MarketRegimeStrategy`
- **Description**: An adaptive strategy that combines Bollinger Bands and RSI, adjusting its parameters based on detected market regimes (trending vs. sideways). It operates on daily aggregated data (aggregation is performed internally).
- **Core Logic**:
- Calculates Bollinger Bands (using `BollingerBands` class) with adaptive standard deviation multipliers based on `MarketRegime` (derived from `BBWidth`).
- Calculates RSI (using `RSI` class).
- **Trending Market (Breakout Mode)**:
- Buy: Price < Lower Band ∧ RSI < 50 ∧ Volume Spike.
- Sell: Price > Upper Band ∧ RSI > 50 ∧ Volume Spike.
- **Sideways Market (Mean Reversion)**:
- Buy: Price ≤ Lower Band ∧ RSI ≤ 40.
- Sell: Price ≥ Upper Band ∧ RSI ≥ 60.
- **Squeeze Confirmation** (if `config["SqueezeStrategy"]` is `True`):
- Requires additional confirmation from RSI Bollinger Bands (calculated by `rsi_bollinger_confirmation` helper method).
- Sideways markets also check for volume contraction.
- **Key Configuration Parameters (from `config` dict)**:
- `bb_period`, `bb_width`
- `trending['bb_std_dev_multiplier']`, `trending['rsi_threshold']`
- `sideways['bb_std_dev_multiplier']`, `sideways['rsi_threshold']`
- `rsi_period`
- `SqueezeStrategy` (boolean)
- **Output DataFrame Columns (Daily)**: Includes input columns plus `SMA`, `UpperBand`, `LowerBand`, `BBWidth`, `MarketRegime`, `RSI`, `BuySignal`, `SellSignal`.
#### `rsi_bollinger_confirmation(self, rsi: pd.Series, window: int = 14, std_mult: float = 1.5) -> tuple`
- **Description** (Helper for `MarketRegimeStrategy`): Calculates Bollinger Bands on RSI values for signal confirmation.
- **Parameters**:
- `rsi` (pd.Series): Series containing RSI values.
- `window` (int, optional): The period for the moving average. Defaults to 14.
- `std_mult` (float, optional): Standard deviation multiplier for bands. Defaults to 1.5.
- **Returns**: `tuple` - (oversold_condition, overbought_condition) as pandas Series (boolean).
### 2. `CryptoTradingStrategy`
- **Description**: A multi-timeframe strategy primarily designed for volatile assets like cryptocurrencies. It aggregates input data into 15-minute and 1-hour intervals for analysis.
- **Core Logic**:
- Aggregates data to 15-minute (`data_15m`) and 1-hour (`data_1h`) resolutions using `aggregate_to_minutes` and `aggregate_to_hourly` from `data_utils.py`.
- Calculates 15-minute Bollinger Bands (20-period, 2 std dev) and 15-minute EMA-smoothed RSI (14-period) using `BollingerBands.calculate_custom_bands` and `RSI.calculate_custom_rsi`.
- Calculates 1-hour Bollinger Bands (50-period, 1.8 std dev) using `BollingerBands.calculate_custom_bands`.
- **Signal Generation (on 15m timeframe)**:
- Buy Signal: Price ≤ Lower 15m Band ∧ Price ≤ Lower 1h Band ∧ RSI_15m < 35 ∧ Volume Confirmation.
- Sell Signal: Price ≥ Upper 15m Band ∧ Price ≥ Upper 1h Band ∧ RSI_15m > 65 ∧ Volume Confirmation.
- **Volume Confirmation**: Current 15m volume > 1.5 × 20-period MA of 15m volume.
- **Risk Management**: Calculates `StopLoss` and `TakeProfit` levels based on a simplified ATR (standard deviation of 15m close prices over the last 4 periods).
- Buy: SL = Price - 2 * ATR; TP = Price + 4 * ATR
- Sell: SL = Price + 2 * ATR; TP = Price - 4 * ATR
- **Key Configuration Parameters**: While this strategy uses fixed parameters for its core indicator calculations, the `config` object passed to the `Strategy` class might be used by helper functions or for future extensions (though not heavily used by the current `CryptoTradingStrategy` logic itself for primary indicator settings).
- **Output DataFrame Columns (15-minute)**: Includes resampled 15m OHLCV, plus `UpperBand_15m`, `SMA_15m`, `LowerBand_15m`, `RSI_15m`, `VolumeMA_15m`, `UpperBand_1h` (forward-filled), `LowerBand_1h` (forward-filled), `BuySignal`, `SellSignal`, `StopLoss`, `TakeProfit`.
---
## General Strategy Concepts (from previous high-level notes)
While the specific implementations above have their own detailed logic, some general concepts that often inspire trading strategies include:
- **Adaptive Parameters**: Adjusting indicator settings (like Bollinger Band width or RSI thresholds) based on market conditions (e.g., trending vs. sideways).
- **Multi-Timeframe Analysis**: Confirming signals on one timeframe with trends or levels on another (e.g., 15-minute signals confirmed by 1-hour context).
- **Volume Confirmation**: Using volume spikes or contractions to validate price-based signals.
- **Volatility-Adjusted Risk Management**: Using measures like ATR (Average True Range) to set stop-loss and take-profit levels, or to size positions dynamically.
These concepts are partially reflected in the implemented strategies, particularly in `MarketRegimeStrategy` (adaptive parameters) and `CryptoTradingStrategy` (multi-timeframe, volume confirmation, ATR-based risk levels).

144
main.py
View File

@@ -10,6 +10,8 @@ import json
from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils
from cycles.backtest import Backtest
from cycles.Analysis.supertrend import Supertrends
from cycles.charts import BacktestCharts
logging.basicConfig(
level=logging.INFO,
@@ -20,6 +22,62 @@ logging.basicConfig(
]
)
def default_init_strategy(backtester: Backtest):
supertrends = Supertrends(backtester.df, verbose=False)
supertrend_results_list = supertrends.calculate_supertrend_indicators()
trends = [st['results']['trend'] for st in supertrend_results_list]
trends_arr = np.stack(trends, axis=1)
meta_trend = np.where((trends_arr[:,0] == trends_arr[:,1]) & (trends_arr[:,1] == trends_arr[:,2]),
trends_arr[:,0], 0)
backtester.strategies["meta_trend"] = meta_trend
def default_entry_strategy(backtester: Backtest, df_index):
return backtester.strategies["meta_trend"][df_index - 1] != 1 and backtester.strategies["meta_trend"][df_index] == 1
def stop_loss_strategy(backtester: Backtest):
stop_price = backtester.entry_price * (1 - backtester.strategies["stop_loss_pct"])
min1_index = backtester.min1_df.index
start_candidates = min1_index[min1_index >= backtester.entry_time]
backtester.current_trade_min1_start_idx = start_candidates[0]
end_candidates = min1_index[min1_index <= backtester.current_date]
if len(end_candidates) == 0:
print("Warning: no end candidate here. Need to be checked")
return False, None
backtester.current_min1_end_idx = end_candidates[-1]
min1_slice = backtester.min1_df.loc[backtester.current_trade_min1_start_idx:backtester.current_min1_end_idx]
# print(f"lowest low in that range: {min1_slice['low'].min()}, count: {len(min1_slice)}")
# print(f"slice start: {min1_slice.index[0]}, slice end: {min1_slice.index[-1]}")
if (min1_slice['low'] <= stop_price).any():
stop_candle = min1_slice[min1_slice['low'] <= stop_price].iloc[0]
if stop_candle['open'] < stop_price:
sell_price = stop_candle['open']
else:
sell_price = stop_price
return True, sell_price
return False, None
def default_exit_strategy(backtester: Backtest, df_index):
if backtester.strategies["meta_trend"][df_index - 1] != 1 and \
backtester.strategies["meta_trend"][df_index] == -1:
return "META_TREND_EXIT_SIGNAL", None
stop_loss_result, sell_price = stop_loss_strategy(backtester)
if stop_loss_result:
backtester.strategies["current_trade_min1_start_idx"] = \
backtester.min1_df.index[backtester.min1_df.index <= backtester.current_date][-1]
return "STOP_LOSS", sell_price
return None, None
def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, debug=False):
"""Process the entire timeframe with all stop loss values (no monthly split)"""
df = df.copy().reset_index(drop=True)
@@ -27,13 +85,16 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
results_rows = []
trade_rows = []
min1_df['timestamp'] = pd.to_datetime(min1_df.index) # need ?
for stop_loss_pct in stop_loss_pcts:
results = Backtest.run(
min1_df,
df,
initial_usd=initial_usd,
stop_loss_pct=stop_loss_pct,
debug=debug
backtester = Backtest(initial_usd, df, min1_df, default_init_strategy)
backtester.strategies["stop_loss_pct"] = stop_loss_pct
results = backtester.run(
default_entry_strategy,
default_exit_strategy,
debug
)
n_trades = results["n_trades"]
trades = results.get('trades', [])
@@ -50,9 +111,11 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
@@ -61,13 +124,13 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
for trade in trades:
final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade['fee_usd'] for trade in trades)
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
row = {
"timeframe": rule_name,
"stop_loss_pct": stop_loss_pct,
"n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'),
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
@@ -92,26 +155,23 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
"type": trade.get("type"),
"fee_usd": trade.get("fee_usd"),
})
logging.info(f"Timeframe: {rule_name}, Stop Loss: {stop_loss_pct}, Trades: {n_trades}")
if debug:
for trade in trades:
if trade['type'] == 'STOP':
print(trade)
for trade in trades:
if trade['profit_pct'] < -0.09: # or whatever is close to -0.10
print("Large loss trade:", trade)
# Plot after each backtest run
try:
meta_trend = backtester.strategies["meta_trend"]
BacktestCharts.plot(df, meta_trend)
except Exception as e:
print(f"Plotting failed: {e}")
return results_rows, trade_rows
def process(timeframe_info, debug=False):
from cycles.utils.storage import Storage # import inside function for safety
storage = Storage(logging=None) # or pass a logger if you want, but None is safest for multiprocessing
"""Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
if rule == "1T" or rule == "1min":
if rule == "1min":
df = data_1min.copy()
else:
df = data_1min.resample(rule).agg({
@@ -122,33 +182,7 @@ def process(timeframe_info, debug=False):
'volume': 'sum'
}).dropna()
df = df.reset_index()
results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug)
if all_trade_rows:
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
# Prepare header
summary_fields = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "final_usd"]
summary_row = results_rows[0]
header_line = "\t".join(summary_fields) + "\n"
value_line = "\t".join(str(summary_row.get(f, "")) for f in summary_fields) + "\n"
# File name
tf = summary_row["timeframe"]
sl = summary_row["stop_loss_pct"]
sl_percent = int(round(sl * 100))
trades_filename = os.path.join(storage.results_dir, f"trades_{tf}_ST{sl_percent}pct.csv")
# Write header
with open(trades_filename, "w") as f:
f.write(header_line)
f.write(value_line)
# Now write trades (append mode, skip header)
with open(trades_filename, "a", newline="") as f:
import csv
writer = csv.DictWriter(f, fieldnames=trades_fieldnames)
writer.writeheader()
for trade in all_trade_rows:
writer.writerow({k: trade.get(k, "") for k in trades_fieldnames})
return results_rows, all_trade_rows
def aggregate_results(all_rows):
@@ -162,6 +196,7 @@ def aggregate_results(all_rows):
summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items():
n_months = len(rows)
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows])
@@ -198,7 +233,7 @@ def get_nearest_price(df, target_date):
return nearest_time, price
if __name__ == "__main__":
debug = False
debug = True
parser = argparse.ArgumentParser(description="Run backtest with config file.")
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
@@ -209,14 +244,14 @@ if __name__ == "__main__":
"start_date": "2025-05-01",
"stop_date": datetime.datetime.today().strftime('%Y-%m-%d'),
"initial_usd": 10000,
"timeframes": ["1D", "6h", "3h", "1h", "30m", "15m", "5m", "1m"],
"stop_loss_pcts": [0.01, 0.02, 0.03, 0.05],
"timeframes": ["15min"],
"stop_loss_pcts": [0.03],
}
if args.config:
with open(args.config, 'r') as f:
config = json.load(f)
else:
elif not debug:
print("No config file provided. Please enter the following values (press Enter to use default):")
start_date = input(f"Start date [{default_config['start_date']}]: ") or default_config['start_date']
@@ -238,8 +273,9 @@ if __name__ == "__main__":
'timeframes': timeframes,
'stop_loss_pcts': stop_loss_pcts,
}
else:
config = default_config
# Use config values
start_date = config['start_date']
stop_date = config['stop_date']
initial_usd = config['initial_usd']
@@ -268,18 +304,17 @@ if __name__ == "__main__":
for stop_loss_pct in stop_loss_pcts
]
workers = system_utils.get_optimal_workers()
if debug:
all_results_rows = []
all_trade_rows = []
for task in tasks:
results, trades = process(task, debug)
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
else:
workers = system_utils.get_optimal_workers()
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process, task, debug): task for task in tasks}
all_results_rows = []
@@ -299,4 +334,7 @@ if __name__ == "__main__":
]
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
storage.write_trades(all_trade_rows, trades_fieldnames)

View File

@@ -4,9 +4,7 @@ import matplotlib.pyplot as plt
import pandas as pd
from cycles.utils.storage import Storage
from cycles.utils.data_utils import aggregate_to_daily
from cycles.Analysis.boillinger_band import BollingerBands
from cycles.Analysis.rsi import RSI
from cycles.Analysis.strategies import Strategy
logging.basicConfig(
level=logging.INFO,
@@ -17,115 +15,145 @@ logging.basicConfig(
]
)
config_minute = {
"start_date": "2022-01-01",
"stop_date": "2023-01-01",
config = {
"start_date": "2023-01-01",
"stop_date": "2024-01-01",
"data_file": "btcusd_1-min_data.csv"
}
config_day = {
"start_date": "2022-01-01",
"stop_date": "2023-01-01",
"data_file": "btcusd_1-day_data.csv"
config_strategy = {
"bb_width": 0.05,
"bb_period": 20,
"rsi_period": 14,
"trending": {
"rsi_threshold": [30, 70],
"bb_std_dev_multiplier": 2.5,
},
"sideways": {
"rsi_threshold": [40, 60],
"bb_std_dev_multiplier": 1.8,
},
"strategy_name": "MarketRegimeStrategy", # CryptoTradingStrategy
"SqueezeStrategy": True
}
IS_DAY = True
def no_strategy(data_bb, data_with_rsi):
buy_condition = pd.Series([False] * len(data_bb), index=data_bb.index)
sell_condition = pd.Series([False] * len(data_bb), index=data_bb.index)
return buy_condition, sell_condition
def strategy_1(data_bb, data_with_rsi):
# Long trade: price move below lower Bollinger band and RSI go below 25
buy_condition = (data_bb['close'] < data_bb['LowerBand']) & (data_bb['RSI'] < 25)
# Short only: price move above top Bollinger band and RSI goes over 75
sell_condition = (data_bb['close'] > data_bb['UpperBand']) & (data_bb['RSI'] > 75)
return buy_condition, sell_condition
IS_DAY = False
if __name__ == "__main__":
# Load data
storage = Storage(logging=logging)
if IS_DAY:
config = config_day
else:
config = config_minute
data = storage.load_data(config["data_file"], config["start_date"], config["stop_date"])
if not IS_DAY:
data_daily = aggregate_to_daily(data)
storage.save_data(data, "btcusd_1-day_data.csv")
df_to_plot = data_daily
else:
df_to_plot = data
# Run strategy
strategy = Strategy(config=config_strategy, logging=logging)
processed_data = strategy.run(data.copy(), config_strategy["strategy_name"])
bb = BollingerBands(period=30, std_dev_multiplier=2.0)
data_bb = bb.calculate(df_to_plot.copy())
# Get buy and sell signals
buy_condition = processed_data.get('BuySignal', pd.Series(False, index=processed_data.index)).astype(bool)
sell_condition = processed_data.get('SellSignal', pd.Series(False, index=processed_data.index)).astype(bool)
rsi_calculator = RSI(period=13)
data_with_rsi = rsi_calculator.calculate(df_to_plot.copy(), price_column='close')
buy_signals = processed_data[buy_condition]
sell_signals = processed_data[sell_condition]
# Combine BB and RSI data into a single DataFrame for signal generation
# Ensure indices are aligned; they should be as both are from df_to_plot.copy()
if 'RSI' in data_with_rsi.columns:
data_bb['RSI'] = data_with_rsi['RSI']
else:
# If RSI wasn't calculated (e.g., not enough data), create a dummy column with NaNs
# to prevent errors later, though signals won't be generated.
data_bb['RSI'] = pd.Series(index=data_bb.index, dtype=float)
logging.warning("RSI column not found or not calculated. Signals relying on RSI may not be generated.")
strategy = 1
if strategy == 1:
buy_condition, sell_condition = strategy_1(data_bb, data_with_rsi)
else:
buy_condition, sell_condition = no_strategy(data_bb, data_with_rsi)
buy_signals = data_bb[buy_condition]
sell_signals = data_bb[sell_condition]
# plot the data with seaborn library
if df_to_plot is not None and not df_to_plot.empty:
# Plot the data with seaborn library
if processed_data is not None and not processed_data.empty:
# Create a figure with two subplots, sharing the x-axis
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 8), sharex=True)
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(16, 8), sharex=True)
strategy_name = config_strategy["strategy_name"]
# Plot 1: Close Price and Strategy-Specific Bands/Levels
sns.lineplot(x=processed_data.index, y='close', data=processed_data, label='Close Price', ax=ax1)
# Use standardized column names for bands
if 'UpperBand' in processed_data.columns and 'LowerBand' in processed_data.columns:
# Instead of lines, shade the area between upper and lower bands
ax1.fill_between(processed_data.index,
processed_data['LowerBand'],
processed_data['UpperBand'],
alpha=0.1, color='blue', label='Bollinger Bands')
else:
logging.warning(f"{strategy_name}: UpperBand or LowerBand not found for plotting.")
# Add strategy-specific extra indicators if available
if strategy_name == "CryptoTradingStrategy":
if 'StopLoss' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='StopLoss', data=processed_data, label='Stop Loss', ax=ax1, linestyle='--', color='orange')
if 'TakeProfit' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='TakeProfit', data=processed_data, label='Take Profit', ax=ax1, linestyle='--', color='purple')
# Plot 1: Close Price and Bollinger Bands
sns.lineplot(x=data_bb.index, y='close', data=data_bb, label='Close Price', ax=ax1)
sns.lineplot(x=data_bb.index, y='UpperBand', data=data_bb, label='Upper Band (BB)', ax=ax1)
sns.lineplot(x=data_bb.index, y='LowerBand', data=data_bb, label='Lower Band (BB)', ax=ax1)
# Plot Buy/Sell signals on Price chart
if not buy_signals.empty:
ax1.scatter(buy_signals.index, buy_signals['close'], color='green', marker='o', s=20, label='Buy Signal', zorder=5)
if not sell_signals.empty:
ax1.scatter(sell_signals.index, sell_signals['close'], color='red', marker='o', s=20, label='Sell Signal', zorder=5)
ax1.set_title('Price and Bollinger Bands with Signals')
ax1.set_title(f'Price and Signals ({strategy_name})')
ax1.set_ylabel('Price')
ax1.legend()
ax1.grid(True)
# Plot 2: RSI
if 'RSI' in data_bb.columns: # Check data_bb now as it should contain RSI
sns.lineplot(x=data_bb.index, y='RSI', data=data_bb, label='RSI (14)', ax=ax2, color='purple')
ax2.axhline(75, color='red', linestyle='--', linewidth=0.8, label='Overbought (75)')
ax2.axhline(25, color='green', linestyle='--', linewidth=0.8, label='Oversold (25)')
# Plot 2: RSI and Strategy-Specific Thresholds
if 'RSI' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='RSI', data=processed_data, label=f'RSI (' + str(config_strategy.get("rsi_period", 14)) + ')', ax=ax2, color='purple')
if strategy_name == "MarketRegimeStrategy":
# Get threshold values
upper_threshold = config_strategy.get("trending", {}).get("rsi_threshold", [30,70])[1]
lower_threshold = config_strategy.get("trending", {}).get("rsi_threshold", [30,70])[0]
# Shade overbought area (upper)
ax2.fill_between(processed_data.index, upper_threshold, 100,
alpha=0.1, color='red', label=f'Overbought (>{upper_threshold})')
# Shade oversold area (lower)
ax2.fill_between(processed_data.index, 0, lower_threshold,
alpha=0.1, color='green', label=f'Oversold (<{lower_threshold})')
elif strategy_name == "CryptoTradingStrategy":
# Shade overbought area (upper)
ax2.fill_between(processed_data.index, 65, 100,
alpha=0.1, color='red', label='Overbought (>65)')
# Shade oversold area (lower)
ax2.fill_between(processed_data.index, 0, 35,
alpha=0.1, color='green', label='Oversold (<35)')
# Plot Buy/Sell signals on RSI chart
if not buy_signals.empty:
if not buy_signals.empty and 'RSI' in buy_signals.columns:
ax2.scatter(buy_signals.index, buy_signals['RSI'], color='green', marker='o', s=20, label='Buy Signal (RSI)', zorder=5)
if not sell_signals.empty:
if not sell_signals.empty and 'RSI' in sell_signals.columns:
ax2.scatter(sell_signals.index, sell_signals['RSI'], color='red', marker='o', s=20, label='Sell Signal (RSI)', zorder=5)
ax2.set_title('Relative Strength Index (RSI) with Signals')
ax2.set_ylabel('RSI Value')
ax2.set_ylim(0, 100) # RSI is typically bounded between 0 and 100
ax2.set_ylim(0, 100)
ax2.legend()
ax2.grid(True)
else:
logging.info("RSI data not available for plotting.")
plt.xlabel('Date') # Common X-axis label
fig.tight_layout() # Adjust layout to prevent overlapping titles/labels
# Plot 3: Strategy-Specific Indicators
ax3.clear() # Clear previous plot content if any
if 'BBWidth' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='BBWidth', data=processed_data, label='BB Width', ax=ax3)
if strategy_name == "MarketRegimeStrategy":
if 'MarketRegime' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='MarketRegime', data=processed_data, label='Market Regime (Sideways: 1, Trending: 0)', ax=ax3)
ax3.set_title('Bollinger Bands Width & Market Regime')
ax3.set_ylabel('Value')
elif strategy_name == "CryptoTradingStrategy":
if 'VolumeMA' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='VolumeMA', data=processed_data, label='Volume MA', ax=ax3)
if 'volume' in processed_data.columns:
sns.lineplot(x=processed_data.index, y='volume', data=processed_data, label='Volume', ax=ax3, alpha=0.5)
ax3.set_title('Volume Analysis')
ax3.set_ylabel('Volume')
ax3.legend()
ax3.grid(True)
plt.xlabel('Date')
fig.tight_layout()
plt.show()
else:
logging.info("No data to plot.")