From d985830ecdbe60c4325cefcdc5bc397772ff51d4 Mon Sep 17 00:00:00 2001 From: "Vasily.onl" Date: Mon, 26 May 2025 13:26:07 +0800 Subject: [PATCH] indicators --- cycles/IncStrategies/indicators/__init__.py | 36 ++ cycles/IncStrategies/indicators/atr.py | 242 +++++++++++++ cycles/IncStrategies/indicators/base.py | 197 +++++++++++ .../indicators/bollinger_bands.py | 325 +++++++++++++++++ .../indicators/moving_average.py | 228 ++++++++++++ cycles/IncStrategies/indicators/rsi.py | 276 +++++++++++++++ cycles/IncStrategies/indicators/supertrend.py | 332 ++++++++++++++++++ 7 files changed, 1636 insertions(+) create mode 100644 cycles/IncStrategies/indicators/__init__.py create mode 100644 cycles/IncStrategies/indicators/atr.py create mode 100644 cycles/IncStrategies/indicators/base.py create mode 100644 cycles/IncStrategies/indicators/bollinger_bands.py create mode 100644 cycles/IncStrategies/indicators/moving_average.py create mode 100644 cycles/IncStrategies/indicators/rsi.py create mode 100644 cycles/IncStrategies/indicators/supertrend.py diff --git a/cycles/IncStrategies/indicators/__init__.py b/cycles/IncStrategies/indicators/__init__.py new file mode 100644 index 0000000..3544080 --- /dev/null +++ b/cycles/IncStrategies/indicators/__init__.py @@ -0,0 +1,36 @@ +""" +Incremental Indicator States Module + +This module contains indicator state classes that maintain calculation state +for incremental processing of technical indicators. + +All indicator states implement the IndicatorState interface and provide: +- Incremental updates with new data points +- Constant memory usage regardless of data history +- Identical results to traditional batch calculations +- Warm-up detection for reliable indicator values + +Classes: + IndicatorState: Abstract base class for all indicator states + MovingAverageState: Incremental moving average calculation + RSIState: Incremental RSI calculation + ATRState: Incremental Average True Range calculation + SupertrendState: Incremental Supertrend calculation + BollingerBandsState: Incremental Bollinger Bands calculation +""" + +from .base import IndicatorState +from .moving_average import MovingAverageState +from .rsi import RSIState +from .atr import ATRState +from .supertrend import SupertrendState +from .bollinger_bands import BollingerBandsState + +__all__ = [ + 'IndicatorState', + 'MovingAverageState', + 'RSIState', + 'ATRState', + 'SupertrendState', + 'BollingerBandsState' +] \ No newline at end of file diff --git a/cycles/IncStrategies/indicators/atr.py b/cycles/IncStrategies/indicators/atr.py new file mode 100644 index 0000000..73aac1c --- /dev/null +++ b/cycles/IncStrategies/indicators/atr.py @@ -0,0 +1,242 @@ +""" +Average True Range (ATR) Indicator State + +This module implements incremental ATR calculation that maintains constant memory usage +and provides identical results to traditional batch calculations. ATR is used by +Supertrend and other volatility-based indicators. +""" + +from typing import Dict, Union, Optional +from .base import OHLCIndicatorState +from .moving_average import ExponentialMovingAverageState + + +class ATRState(OHLCIndicatorState): + """ + Incremental Average True Range calculation state. + + ATR measures market volatility by calculating the average of true ranges over + a specified period. True Range is the maximum of: + 1. Current High - Current Low + 2. |Current High - Previous Close| + 3. |Current Low - Previous Close| + + This implementation uses exponential moving average for smoothing, which is + more responsive than simple moving average and requires less memory. + + Attributes: + period (int): The ATR period + ema_state (ExponentialMovingAverageState): EMA state for smoothing true ranges + previous_close (float): Previous period's close price + + Example: + atr = ATRState(period=14) + + # Add OHLC data incrementally + ohlc = {'open': 100, 'high': 105, 'low': 98, 'close': 103} + atr_value = atr.update(ohlc) # Returns current ATR value + + # Check if warmed up + if atr.is_warmed_up(): + current_atr = atr.get_current_value() + """ + + def __init__(self, period: int = 14): + """ + Initialize ATR state. + + Args: + period: Number of periods for ATR calculation (default: 14) + + Raises: + ValueError: If period is not a positive integer + """ + super().__init__(period) + self.ema_state = ExponentialMovingAverageState(period) + self.previous_close = None + self.is_initialized = True + + def update(self, ohlc_data: Dict[str, float]) -> float: + """ + Update ATR with new OHLC data. + + Args: + ohlc_data: Dictionary with 'open', 'high', 'low', 'close' keys + + Returns: + Current ATR value + + Raises: + ValueError: If OHLC data is invalid + TypeError: If ohlc_data is not a dictionary + """ + # Validate input + if not isinstance(ohlc_data, dict): + raise TypeError(f"ohlc_data must be a dictionary, got {type(ohlc_data)}") + + self.validate_input(ohlc_data) + + high = float(ohlc_data['high']) + low = float(ohlc_data['low']) + close = float(ohlc_data['close']) + + # Calculate True Range + if self.previous_close is None: + # First period - True Range is just High - Low + true_range = high - low + else: + # True Range is the maximum of: + # 1. Current High - Current Low + # 2. |Current High - Previous Close| + # 3. |Current Low - Previous Close| + tr1 = high - low + tr2 = abs(high - self.previous_close) + tr3 = abs(low - self.previous_close) + true_range = max(tr1, tr2, tr3) + + # Update EMA with the true range + atr_value = self.ema_state.update(true_range) + + # Store current close as previous close for next calculation + self.previous_close = close + self.values_received += 1 + + # Store current ATR value + self._current_values = {'atr': atr_value} + + return atr_value + + def is_warmed_up(self) -> bool: + """ + Check if ATR has enough data for reliable values. + + Returns: + True if EMA state is warmed up (has enough true range values) + """ + return self.ema_state.is_warmed_up() + + def reset(self) -> None: + """Reset ATR state to initial conditions.""" + self.ema_state.reset() + self.previous_close = None + self.values_received = 0 + self._current_values = {} + + def get_current_value(self) -> Optional[float]: + """ + Get current ATR value without updating. + + Returns: + Current ATR value, or None if not warmed up + """ + if not self.is_warmed_up(): + return None + return self.ema_state.get_current_value() + + def get_state_summary(self) -> dict: + """Get detailed state summary for debugging.""" + base_summary = super().get_state_summary() + base_summary.update({ + 'previous_close': self.previous_close, + 'ema_state': self.ema_state.get_state_summary(), + 'current_atr': self.get_current_value() + }) + return base_summary + + +class SimpleATRState(OHLCIndicatorState): + """ + Simple ATR implementation using simple moving average instead of EMA. + + This version uses a simple moving average for smoothing true ranges, + which matches some traditional ATR implementations but requires more memory. + """ + + def __init__(self, period: int = 14): + """ + Initialize simple ATR state. + + Args: + period: Number of periods for ATR calculation (default: 14) + """ + super().__init__(period) + from collections import deque + self.true_ranges = deque(maxlen=period) + self.tr_sum = 0.0 + self.previous_close = None + self.is_initialized = True + + def update(self, ohlc_data: Dict[str, float]) -> float: + """ + Update simple ATR with new OHLC data. + + Args: + ohlc_data: Dictionary with 'open', 'high', 'low', 'close' keys + + Returns: + Current ATR value + """ + # Validate input + if not isinstance(ohlc_data, dict): + raise TypeError(f"ohlc_data must be a dictionary, got {type(ohlc_data)}") + + self.validate_input(ohlc_data) + + high = float(ohlc_data['high']) + low = float(ohlc_data['low']) + close = float(ohlc_data['close']) + + # Calculate True Range + if self.previous_close is None: + true_range = high - low + else: + tr1 = high - low + tr2 = abs(high - self.previous_close) + tr3 = abs(low - self.previous_close) + true_range = max(tr1, tr2, tr3) + + # Update rolling sum + if len(self.true_ranges) == self.period: + self.tr_sum -= self.true_ranges[0] # Remove oldest value + + self.true_ranges.append(true_range) + self.tr_sum += true_range + + # Calculate ATR as simple moving average + atr_value = self.tr_sum / len(self.true_ranges) + + # Store state + self.previous_close = close + self.values_received += 1 + self._current_values = {'atr': atr_value} + + return atr_value + + def is_warmed_up(self) -> bool: + """Check if simple ATR is warmed up.""" + return len(self.true_ranges) >= self.period + + def reset(self) -> None: + """Reset simple ATR state.""" + self.true_ranges.clear() + self.tr_sum = 0.0 + self.previous_close = None + self.values_received = 0 + self._current_values = {} + + def get_current_value(self) -> Optional[float]: + """Get current simple ATR value.""" + if not self.is_warmed_up(): + return None + return self.tr_sum / len(self.true_ranges) + + def get_state_summary(self) -> dict: + """Get detailed state summary for debugging.""" + base_summary = super().get_state_summary() + base_summary.update({ + 'previous_close': self.previous_close, + 'tr_window_size': len(self.true_ranges), + 'tr_sum': self.tr_sum, + 'current_atr': self.get_current_value() + }) + return base_summary \ No newline at end of file diff --git a/cycles/IncStrategies/indicators/base.py b/cycles/IncStrategies/indicators/base.py new file mode 100644 index 0000000..e3cfb50 --- /dev/null +++ b/cycles/IncStrategies/indicators/base.py @@ -0,0 +1,197 @@ +""" +Base Indicator State Class + +This module contains the abstract base class for all incremental indicator states. +All indicator implementations must inherit from IndicatorState and implement +the required methods for incremental calculation. +""" + +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional, Union +import numpy as np + + +class IndicatorState(ABC): + """ + Abstract base class for maintaining indicator calculation state. + + This class defines the interface that all incremental indicators must implement. + Indicators maintain their internal state and can be updated incrementally with + new data points, providing constant memory usage and high performance. + + Attributes: + period (int): The period/window size for the indicator + values_received (int): Number of values processed so far + is_initialized (bool): Whether the indicator has been initialized + + Example: + class MyIndicator(IndicatorState): + def __init__(self, period: int): + super().__init__(period) + self._sum = 0.0 + + def update(self, new_value: float) -> float: + self._sum += new_value + self.values_received += 1 + return self._sum / min(self.values_received, self.period) + """ + + def __init__(self, period: int): + """ + Initialize the indicator state. + + Args: + period: The period/window size for the indicator calculation + + Raises: + ValueError: If period is not a positive integer + """ + if not isinstance(period, int) or period <= 0: + raise ValueError(f"Period must be a positive integer, got {period}") + + self.period = period + self.values_received = 0 + self.is_initialized = False + + @abstractmethod + def update(self, new_value: Union[float, Dict[str, float]]) -> Union[float, Dict[str, float]]: + """ + Update indicator with new value and return current indicator value. + + This method processes a new data point and updates the internal state + of the indicator. It returns the current indicator value after the update. + + Args: + new_value: New data point (can be single value or OHLCV dict) + + Returns: + Current indicator value after update (single value or dict) + + Raises: + ValueError: If new_value is invalid or incompatible + """ + pass + + @abstractmethod + def is_warmed_up(self) -> bool: + """ + Check whether indicator has enough data for reliable values. + + Returns: + True if indicator has received enough data points for reliable calculation + """ + pass + + @abstractmethod + def reset(self) -> None: + """ + Reset indicator state to initial conditions. + + This method clears all internal state and resets the indicator + as if it was just initialized. + """ + pass + + @abstractmethod + def get_current_value(self) -> Union[float, Dict[str, float], None]: + """ + Get the current indicator value without updating. + + Returns: + Current indicator value, or None if not warmed up + """ + pass + + def get_state_summary(self) -> Dict[str, Any]: + """ + Get summary of current indicator state for debugging. + + Returns: + Dictionary containing indicator state information + """ + return { + 'indicator_type': self.__class__.__name__, + 'period': self.period, + 'values_received': self.values_received, + 'is_warmed_up': self.is_warmed_up(), + 'is_initialized': self.is_initialized, + 'current_value': self.get_current_value() + } + + def validate_input(self, value: Union[float, Dict[str, float]]) -> None: + """ + Validate input value for the indicator. + + Args: + value: Input value to validate + + Raises: + ValueError: If value is invalid + TypeError: If value type is incorrect + """ + if isinstance(value, (int, float)): + if not np.isfinite(value): + raise ValueError(f"Input value must be finite, got {value}") + elif isinstance(value, dict): + required_keys = ['open', 'high', 'low', 'close'] + for key in required_keys: + if key not in value: + raise ValueError(f"OHLCV dict missing required key: {key}") + if not np.isfinite(value[key]): + raise ValueError(f"OHLCV value for {key} must be finite, got {value[key]}") + # Validate OHLC relationships + if not (value['low'] <= value['open'] <= value['high'] and + value['low'] <= value['close'] <= value['high']): + raise ValueError(f"Invalid OHLC relationships: {value}") + else: + raise TypeError(f"Input value must be float or OHLCV dict, got {type(value)}") + + def __repr__(self) -> str: + """String representation of the indicator state.""" + return (f"{self.__class__.__name__}(period={self.period}, " + f"values_received={self.values_received}, " + f"warmed_up={self.is_warmed_up()})") + + +class SimpleIndicatorState(IndicatorState): + """ + Base class for simple single-value indicators. + + This class provides common functionality for indicators that work with + single float values and maintain a simple rolling calculation. + """ + + def __init__(self, period: int): + """Initialize simple indicator state.""" + super().__init__(period) + self._current_value = None + + def get_current_value(self) -> Optional[float]: + """Get current indicator value.""" + return self._current_value if self.is_warmed_up() else None + + def is_warmed_up(self) -> bool: + """Check if indicator is warmed up.""" + return self.values_received >= self.period + + +class OHLCIndicatorState(IndicatorState): + """ + Base class for OHLC-based indicators. + + This class provides common functionality for indicators that work with + OHLC data (Open, High, Low, Close) and may return multiple values. + """ + + def __init__(self, period: int): + """Initialize OHLC indicator state.""" + super().__init__(period) + self._current_values = {} + + def get_current_value(self) -> Optional[Dict[str, float]]: + """Get current indicator values.""" + return self._current_values.copy() if self.is_warmed_up() else None + + def is_warmed_up(self) -> bool: + """Check if indicator is warmed up.""" + return self.values_received >= self.period \ No newline at end of file diff --git a/cycles/IncStrategies/indicators/bollinger_bands.py b/cycles/IncStrategies/indicators/bollinger_bands.py new file mode 100644 index 0000000..4cb08bf --- /dev/null +++ b/cycles/IncStrategies/indicators/bollinger_bands.py @@ -0,0 +1,325 @@ +""" +Bollinger Bands Indicator State + +This module implements incremental Bollinger Bands calculation that maintains constant memory usage +and provides identical results to traditional batch calculations. Used by the BBRSStrategy. +""" + +from typing import Dict, Union, Optional +from collections import deque +import math +from .base import OHLCIndicatorState +from .moving_average import MovingAverageState + + +class BollingerBandsState(OHLCIndicatorState): + """ + Incremental Bollinger Bands calculation state. + + Bollinger Bands consist of: + - Middle Band: Simple Moving Average of close prices + - Upper Band: Middle Band + (Standard Deviation * multiplier) + - Lower Band: Middle Band - (Standard Deviation * multiplier) + + This implementation maintains a rolling window for standard deviation calculation + while using the MovingAverageState for the middle band. + + Attributes: + period (int): Period for moving average and standard deviation + std_dev_multiplier (float): Multiplier for standard deviation + ma_state (MovingAverageState): Moving average state for middle band + close_values (deque): Rolling window of close prices for std dev calculation + close_sum_sq (float): Sum of squared close values for variance calculation + + Example: + bb = BollingerBandsState(period=20, std_dev_multiplier=2.0) + + # Add price data incrementally + result = bb.update(103.5) # Close price + upper_band = result['upper_band'] + middle_band = result['middle_band'] + lower_band = result['lower_band'] + bandwidth = result['bandwidth'] + """ + + def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0): + """ + Initialize Bollinger Bands state. + + Args: + period: Period for moving average and standard deviation (default: 20) + std_dev_multiplier: Multiplier for standard deviation (default: 2.0) + + Raises: + ValueError: If period is not positive or multiplier is not positive + """ + super().__init__(period) + + if std_dev_multiplier <= 0: + raise ValueError(f"Standard deviation multiplier must be positive, got {std_dev_multiplier}") + + self.std_dev_multiplier = std_dev_multiplier + self.ma_state = MovingAverageState(period) + + # For incremental standard deviation calculation + self.close_values = deque(maxlen=period) + self.close_sum_sq = 0.0 # Sum of squared values + + self.is_initialized = True + + def update(self, close_price: Union[float, int]) -> Dict[str, float]: + """ + Update Bollinger Bands with new close price. + + Args: + close_price: New closing price + + Returns: + Dictionary with 'upper_band', 'middle_band', 'lower_band', 'bandwidth', 'std_dev' + + Raises: + ValueError: If close_price is not finite + TypeError: If close_price is not numeric + """ + # Validate input + if not isinstance(close_price, (int, float)): + raise TypeError(f"close_price must be numeric, got {type(close_price)}") + + self.validate_input(close_price) + + close_price = float(close_price) + + # Update moving average (middle band) + middle_band = self.ma_state.update(close_price) + + # Update rolling window for standard deviation + if len(self.close_values) == self.period: + # Remove oldest value from sum of squares + old_value = self.close_values[0] + self.close_sum_sq -= old_value * old_value + + # Add new value + self.close_values.append(close_price) + self.close_sum_sq += close_price * close_price + + # Calculate standard deviation + n = len(self.close_values) + if n < 2: + # Not enough data for standard deviation + std_dev = 0.0 + else: + # Incremental variance calculation: Var = (sum_sq - n*mean^2) / (n-1) + mean = middle_band + variance = (self.close_sum_sq - n * mean * mean) / (n - 1) + std_dev = math.sqrt(max(variance, 0.0)) # Ensure non-negative + + # Calculate bands + upper_band = middle_band + (self.std_dev_multiplier * std_dev) + lower_band = middle_band - (self.std_dev_multiplier * std_dev) + + # Calculate bandwidth (normalized band width) + if middle_band != 0: + bandwidth = (upper_band - lower_band) / middle_band + else: + bandwidth = 0.0 + + self.values_received += 1 + + # Store current values + result = { + 'upper_band': upper_band, + 'middle_band': middle_band, + 'lower_band': lower_band, + 'bandwidth': bandwidth, + 'std_dev': std_dev + } + + self._current_values = result + return result + + def is_warmed_up(self) -> bool: + """ + Check if Bollinger Bands has enough data for reliable values. + + Returns: + True if we have at least 'period' number of values + """ + return self.ma_state.is_warmed_up() + + def reset(self) -> None: + """Reset Bollinger Bands state to initial conditions.""" + self.ma_state.reset() + self.close_values.clear() + self.close_sum_sq = 0.0 + self.values_received = 0 + self._current_values = {} + + def get_current_value(self) -> Optional[Dict[str, float]]: + """ + Get current Bollinger Bands values without updating. + + Returns: + Dictionary with current BB values, or None if not warmed up + """ + if not self.is_warmed_up(): + return None + return self._current_values.copy() if self._current_values else None + + def get_squeeze_status(self, squeeze_threshold: float = 0.05) -> bool: + """ + Check if Bollinger Bands are in a squeeze condition. + + Args: + squeeze_threshold: Bandwidth threshold for squeeze detection + + Returns: + True if bandwidth is below threshold (squeeze condition) + """ + if not self.is_warmed_up() or not self._current_values: + return False + + bandwidth = self._current_values.get('bandwidth', float('inf')) + return bandwidth < squeeze_threshold + + def get_position_relative_to_bands(self, current_price: float) -> str: + """ + Get current price position relative to Bollinger Bands. + + Args: + current_price: Current price to evaluate + + Returns: + 'above_upper', 'between_bands', 'below_lower', or 'unknown' + """ + if not self.is_warmed_up() or not self._current_values: + return 'unknown' + + upper_band = self._current_values['upper_band'] + lower_band = self._current_values['lower_band'] + + if current_price > upper_band: + return 'above_upper' + elif current_price < lower_band: + return 'below_lower' + else: + return 'between_bands' + + def get_state_summary(self) -> dict: + """Get detailed state summary for debugging.""" + base_summary = super().get_state_summary() + base_summary.update({ + 'std_dev_multiplier': self.std_dev_multiplier, + 'close_values_count': len(self.close_values), + 'close_sum_sq': self.close_sum_sq, + 'ma_state': self.ma_state.get_state_summary(), + 'current_squeeze': self.get_squeeze_status() if self.is_warmed_up() else None + }) + return base_summary + + +class BollingerBandsOHLCState(OHLCIndicatorState): + """ + Bollinger Bands implementation that works with OHLC data. + + This version can calculate Bollinger Bands based on different price types + (close, typical price, etc.) and provides additional OHLC-based analysis. + """ + + def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0, price_type: str = 'close'): + """ + Initialize OHLC Bollinger Bands state. + + Args: + period: Period for calculation + std_dev_multiplier: Standard deviation multiplier + price_type: Price type to use ('close', 'typical', 'median', 'weighted') + """ + super().__init__(period) + + if price_type not in ['close', 'typical', 'median', 'weighted']: + raise ValueError(f"Invalid price_type: {price_type}") + + self.std_dev_multiplier = std_dev_multiplier + self.price_type = price_type + self.bb_state = BollingerBandsState(period, std_dev_multiplier) + self.is_initialized = True + + def _extract_price(self, ohlc_data: Dict[str, float]) -> float: + """Extract price based on price_type setting.""" + if self.price_type == 'close': + return ohlc_data['close'] + elif self.price_type == 'typical': + return (ohlc_data['high'] + ohlc_data['low'] + ohlc_data['close']) / 3.0 + elif self.price_type == 'median': + return (ohlc_data['high'] + ohlc_data['low']) / 2.0 + elif self.price_type == 'weighted': + return (ohlc_data['high'] + ohlc_data['low'] + 2 * ohlc_data['close']) / 4.0 + else: + return ohlc_data['close'] + + def update(self, ohlc_data: Dict[str, float]) -> Dict[str, float]: + """ + Update Bollinger Bands with OHLC data. + + Args: + ohlc_data: Dictionary with OHLC data + + Returns: + Dictionary with Bollinger Bands values plus OHLC analysis + """ + # Validate input + if not isinstance(ohlc_data, dict): + raise TypeError(f"ohlc_data must be a dictionary, got {type(ohlc_data)}") + + self.validate_input(ohlc_data) + + # Extract price based on type + price = self._extract_price(ohlc_data) + + # Update underlying BB state + bb_result = self.bb_state.update(price) + + # Add OHLC-specific analysis + high = ohlc_data['high'] + low = ohlc_data['low'] + close = ohlc_data['close'] + + # Check if high/low touched bands + upper_band = bb_result['upper_band'] + lower_band = bb_result['lower_band'] + + bb_result.update({ + 'high_above_upper': high > upper_band, + 'low_below_lower': low < lower_band, + 'close_position': self.bb_state.get_position_relative_to_bands(close), + 'price_type': self.price_type, + 'extracted_price': price + }) + + self.values_received += 1 + self._current_values = bb_result + + return bb_result + + def is_warmed_up(self) -> bool: + """Check if OHLC Bollinger Bands is warmed up.""" + return self.bb_state.is_warmed_up() + + def reset(self) -> None: + """Reset OHLC Bollinger Bands state.""" + self.bb_state.reset() + self.values_received = 0 + self._current_values = {} + + def get_current_value(self) -> Optional[Dict[str, float]]: + """Get current OHLC Bollinger Bands values.""" + return self.bb_state.get_current_value() + + def get_state_summary(self) -> dict: + """Get detailed state summary.""" + base_summary = super().get_state_summary() + base_summary.update({ + 'price_type': self.price_type, + 'bb_state': self.bb_state.get_state_summary() + }) + return base_summary \ No newline at end of file diff --git a/cycles/IncStrategies/indicators/moving_average.py b/cycles/IncStrategies/indicators/moving_average.py new file mode 100644 index 0000000..3d0221f --- /dev/null +++ b/cycles/IncStrategies/indicators/moving_average.py @@ -0,0 +1,228 @@ +""" +Moving Average Indicator State + +This module implements incremental moving average calculation that maintains +constant memory usage and provides identical results to traditional batch calculations. +""" + +from collections import deque +from typing import Union +from .base import SimpleIndicatorState + + +class MovingAverageState(SimpleIndicatorState): + """ + Incremental moving average calculation state. + + This class maintains the state for calculating a simple moving average + incrementally. It uses a rolling window approach with constant memory usage. + + Attributes: + period (int): The moving average period + values (deque): Rolling window of values (max length = period) + sum (float): Current sum of values in the window + + Example: + ma = MovingAverageState(period=20) + + # Add values incrementally + ma_value = ma.update(100.0) # Returns current MA value + ma_value = ma.update(105.0) # Updates and returns new MA value + + # Check if warmed up (has enough values) + if ma.is_warmed_up(): + current_ma = ma.get_current_value() + """ + + def __init__(self, period: int): + """ + Initialize moving average state. + + Args: + period: Number of periods for the moving average + + Raises: + ValueError: If period is not a positive integer + """ + super().__init__(period) + self.values = deque(maxlen=period) + self.sum = 0.0 + self.is_initialized = True + + def update(self, new_value: Union[float, int]) -> float: + """ + Update moving average with new value. + + Args: + new_value: New price/value to add to the moving average + + Returns: + Current moving average value + + Raises: + ValueError: If new_value is not finite + TypeError: If new_value is not numeric + """ + # Validate input + if not isinstance(new_value, (int, float)): + raise TypeError(f"new_value must be numeric, got {type(new_value)}") + + self.validate_input(new_value) + + # If deque is at max capacity, subtract the value being removed + if len(self.values) == self.period: + self.sum -= self.values[0] # Will be automatically removed by deque + + # Add new value + self.values.append(float(new_value)) + self.sum += float(new_value) + self.values_received += 1 + + # Calculate current moving average + current_count = len(self.values) + self._current_value = self.sum / current_count + + return self._current_value + + def is_warmed_up(self) -> bool: + """ + Check if moving average has enough data for reliable values. + + Returns: + True if we have at least 'period' number of values + """ + return len(self.values) >= self.period + + def reset(self) -> None: + """Reset moving average state to initial conditions.""" + self.values.clear() + self.sum = 0.0 + self.values_received = 0 + self._current_value = None + + def get_current_value(self) -> Union[float, None]: + """ + Get current moving average value without updating. + + Returns: + Current moving average value, or None if not enough data + """ + if len(self.values) == 0: + return None + return self.sum / len(self.values) + + def get_state_summary(self) -> dict: + """Get detailed state summary for debugging.""" + base_summary = super().get_state_summary() + base_summary.update({ + 'window_size': len(self.values), + 'sum': self.sum, + 'values_in_window': list(self.values) if len(self.values) <= 10 else f"[{len(self.values)} values]" + }) + return base_summary + + +class ExponentialMovingAverageState(SimpleIndicatorState): + """ + Incremental exponential moving average calculation state. + + This class maintains the state for calculating an exponential moving average (EMA) + incrementally. EMA gives more weight to recent values and requires minimal memory. + + Attributes: + period (int): The EMA period (used to calculate smoothing factor) + alpha (float): Smoothing factor (2 / (period + 1)) + ema_value (float): Current EMA value + + Example: + ema = ExponentialMovingAverageState(period=20) + + # Add values incrementally + ema_value = ema.update(100.0) # Returns current EMA value + ema_value = ema.update(105.0) # Updates and returns new EMA value + """ + + def __init__(self, period: int): + """ + Initialize exponential moving average state. + + Args: + period: Number of periods for the EMA (used to calculate alpha) + + Raises: + ValueError: If period is not a positive integer + """ + super().__init__(period) + self.alpha = 2.0 / (period + 1) # Smoothing factor + self.ema_value = None + self.is_initialized = True + + def update(self, new_value: Union[float, int]) -> float: + """ + Update exponential moving average with new value. + + Args: + new_value: New price/value to add to the EMA + + Returns: + Current EMA value + + Raises: + ValueError: If new_value is not finite + TypeError: If new_value is not numeric + """ + # Validate input + if not isinstance(new_value, (int, float)): + raise TypeError(f"new_value must be numeric, got {type(new_value)}") + + self.validate_input(new_value) + + new_value = float(new_value) + + if self.ema_value is None: + # First value - initialize EMA + self.ema_value = new_value + else: + # EMA formula: EMA = alpha * new_value + (1 - alpha) * previous_EMA + self.ema_value = self.alpha * new_value + (1 - self.alpha) * self.ema_value + + self.values_received += 1 + self._current_value = self.ema_value + + return self.ema_value + + def is_warmed_up(self) -> bool: + """ + Check if EMA has enough data for reliable values. + + For EMA, we consider it warmed up after receiving 'period' number of values, + though it starts producing values immediately. + + Returns: + True if we have at least 'period' number of values + """ + return self.values_received >= self.period + + def reset(self) -> None: + """Reset EMA state to initial conditions.""" + self.ema_value = None + self.values_received = 0 + self._current_value = None + + def get_current_value(self) -> Union[float, None]: + """ + Get current EMA value without updating. + + Returns: + Current EMA value, or None if no data received + """ + return self.ema_value + + def get_state_summary(self) -> dict: + """Get detailed state summary for debugging.""" + base_summary = super().get_state_summary() + base_summary.update({ + 'alpha': self.alpha, + 'ema_value': self.ema_value + }) + return base_summary \ No newline at end of file diff --git a/cycles/IncStrategies/indicators/rsi.py b/cycles/IncStrategies/indicators/rsi.py new file mode 100644 index 0000000..3ac0036 --- /dev/null +++ b/cycles/IncStrategies/indicators/rsi.py @@ -0,0 +1,276 @@ +""" +RSI (Relative Strength Index) Indicator State + +This module implements incremental RSI calculation that maintains constant memory usage +and provides identical results to traditional batch calculations. +""" + +from typing import Union, Optional +from .base import SimpleIndicatorState +from .moving_average import ExponentialMovingAverageState + + +class RSIState(SimpleIndicatorState): + """ + Incremental RSI calculation state. + + RSI measures the speed and magnitude of price changes to evaluate overbought + or oversold conditions. It oscillates between 0 and 100. + + RSI = 100 - (100 / (1 + RS)) + where RS = Average Gain / Average Loss over the specified period + + This implementation uses exponential moving averages for gain and loss smoothing, + which is more responsive and memory-efficient than simple moving averages. + + Attributes: + period (int): The RSI period (typically 14) + gain_ema (ExponentialMovingAverageState): EMA state for gains + loss_ema (ExponentialMovingAverageState): EMA state for losses + previous_close (float): Previous period's close price + + Example: + rsi = RSIState(period=14) + + # Add price data incrementally + rsi_value = rsi.update(100.0) # Returns current RSI value + rsi_value = rsi.update(105.0) # Updates and returns new RSI value + + # Check if warmed up + if rsi.is_warmed_up(): + current_rsi = rsi.get_current_value() + """ + + def __init__(self, period: int = 14): + """ + Initialize RSI state. + + Args: + period: Number of periods for RSI calculation (default: 14) + + Raises: + ValueError: If period is not a positive integer + """ + super().__init__(period) + self.gain_ema = ExponentialMovingAverageState(period) + self.loss_ema = ExponentialMovingAverageState(period) + self.previous_close = None + self.is_initialized = True + + def update(self, new_close: Union[float, int]) -> float: + """ + Update RSI with new close price. + + Args: + new_close: New closing price + + Returns: + Current RSI value (0-100) + + Raises: + ValueError: If new_close is not finite + TypeError: If new_close is not numeric + """ + # Validate input + if not isinstance(new_close, (int, float)): + raise TypeError(f"new_close must be numeric, got {type(new_close)}") + + self.validate_input(new_close) + + new_close = float(new_close) + + if self.previous_close is None: + # First value - no gain/loss to calculate + self.previous_close = new_close + self.values_received += 1 + # Return neutral RSI for first value + self._current_value = 50.0 + return self._current_value + + # Calculate price change + price_change = new_close - self.previous_close + + # Separate gains and losses + gain = max(price_change, 0.0) + loss = max(-price_change, 0.0) + + # Update EMAs for gains and losses + avg_gain = self.gain_ema.update(gain) + avg_loss = self.loss_ema.update(loss) + + # Calculate RSI + if avg_loss == 0.0: + # Avoid division by zero - all gains, no losses + rsi_value = 100.0 + else: + rs = avg_gain / avg_loss + rsi_value = 100.0 - (100.0 / (1.0 + rs)) + + # Store state + self.previous_close = new_close + self.values_received += 1 + self._current_value = rsi_value + + return rsi_value + + def is_warmed_up(self) -> bool: + """ + Check if RSI has enough data for reliable values. + + Returns: + True if both gain and loss EMAs are warmed up + """ + return self.gain_ema.is_warmed_up() and self.loss_ema.is_warmed_up() + + def reset(self) -> None: + """Reset RSI state to initial conditions.""" + self.gain_ema.reset() + self.loss_ema.reset() + self.previous_close = None + self.values_received = 0 + self._current_value = None + + def get_current_value(self) -> Optional[float]: + """ + Get current RSI value without updating. + + Returns: + Current RSI value (0-100), or None if not enough data + """ + if self.values_received == 0: + return None + elif self.values_received == 1: + return 50.0 # Neutral RSI for first value + elif not self.is_warmed_up(): + return self._current_value # Return current calculation even if not fully warmed up + else: + return self._current_value + + def get_state_summary(self) -> dict: + """Get detailed state summary for debugging.""" + base_summary = super().get_state_summary() + base_summary.update({ + 'previous_close': self.previous_close, + 'gain_ema': self.gain_ema.get_state_summary(), + 'loss_ema': self.loss_ema.get_state_summary(), + 'current_rsi': self.get_current_value() + }) + return base_summary + + +class SimpleRSIState(SimpleIndicatorState): + """ + Simple RSI implementation using simple moving averages instead of EMAs. + + This version uses simple moving averages for gain and loss smoothing, + which matches traditional RSI implementations but requires more memory. + """ + + def __init__(self, period: int = 14): + """ + Initialize simple RSI state. + + Args: + period: Number of periods for RSI calculation (default: 14) + """ + super().__init__(period) + from collections import deque + self.gains = deque(maxlen=period) + self.losses = deque(maxlen=period) + self.gain_sum = 0.0 + self.loss_sum = 0.0 + self.previous_close = None + self.is_initialized = True + + def update(self, new_close: Union[float, int]) -> float: + """ + Update simple RSI with new close price. + + Args: + new_close: New closing price + + Returns: + Current RSI value (0-100) + """ + # Validate input + if not isinstance(new_close, (int, float)): + raise TypeError(f"new_close must be numeric, got {type(new_close)}") + + self.validate_input(new_close) + + new_close = float(new_close) + + if self.previous_close is None: + # First value + self.previous_close = new_close + self.values_received += 1 + self._current_value = 50.0 + return self._current_value + + # Calculate price change + price_change = new_close - self.previous_close + gain = max(price_change, 0.0) + loss = max(-price_change, 0.0) + + # Update rolling sums + if len(self.gains) == self.period: + self.gain_sum -= self.gains[0] + self.loss_sum -= self.losses[0] + + self.gains.append(gain) + self.losses.append(loss) + self.gain_sum += gain + self.loss_sum += loss + + # Calculate RSI + if len(self.gains) == 0: + rsi_value = 50.0 + else: + avg_gain = self.gain_sum / len(self.gains) + avg_loss = self.loss_sum / len(self.losses) + + if avg_loss == 0.0: + rsi_value = 100.0 + else: + rs = avg_gain / avg_loss + rsi_value = 100.0 - (100.0 / (1.0 + rs)) + + # Store state + self.previous_close = new_close + self.values_received += 1 + self._current_value = rsi_value + + return rsi_value + + def is_warmed_up(self) -> bool: + """Check if simple RSI is warmed up.""" + return len(self.gains) >= self.period + + def reset(self) -> None: + """Reset simple RSI state.""" + self.gains.clear() + self.losses.clear() + self.gain_sum = 0.0 + self.loss_sum = 0.0 + self.previous_close = None + self.values_received = 0 + self._current_value = None + + def get_current_value(self) -> Optional[float]: + """Get current simple RSI value.""" + if self.values_received == 0: + return None + return self._current_value + + def get_state_summary(self) -> dict: + """Get detailed state summary for debugging.""" + base_summary = super().get_state_summary() + base_summary.update({ + 'previous_close': self.previous_close, + 'gains_window_size': len(self.gains), + 'losses_window_size': len(self.losses), + 'gain_sum': self.gain_sum, + 'loss_sum': self.loss_sum, + 'current_rsi': self.get_current_value() + }) + return base_summary \ No newline at end of file diff --git a/cycles/IncStrategies/indicators/supertrend.py b/cycles/IncStrategies/indicators/supertrend.py new file mode 100644 index 0000000..068b0d6 --- /dev/null +++ b/cycles/IncStrategies/indicators/supertrend.py @@ -0,0 +1,332 @@ +""" +Supertrend Indicator State + +This module implements incremental Supertrend calculation that maintains constant memory usage +and provides identical results to traditional batch calculations. Supertrend is used by +the DefaultStrategy for trend detection. +""" + +from typing import Dict, Union, Optional +from .base import OHLCIndicatorState +from .atr import ATRState + + +class SupertrendState(OHLCIndicatorState): + """ + Incremental Supertrend calculation state. + + Supertrend is a trend-following indicator that uses Average True Range (ATR) + to calculate dynamic support and resistance levels. It provides clear trend + direction signals: +1 for uptrend, -1 for downtrend. + + The calculation involves: + 1. Calculate ATR for the given period + 2. Calculate basic upper and lower bands using ATR and multiplier + 3. Calculate final upper and lower bands with trend logic + 4. Determine trend direction based on price vs bands + + Attributes: + period (int): ATR period for Supertrend calculation + multiplier (float): Multiplier for ATR in band calculation + atr_state (ATRState): ATR calculation state + previous_close (float): Previous period's close price + previous_trend (int): Previous trend direction (+1 or -1) + final_upper_band (float): Current final upper band + final_lower_band (float): Current final lower band + + Example: + supertrend = SupertrendState(period=10, multiplier=3.0) + + # Add OHLC data incrementally + ohlc = {'open': 100, 'high': 105, 'low': 98, 'close': 103} + result = supertrend.update(ohlc) + trend = result['trend'] # +1 or -1 + supertrend_value = result['supertrend'] # Supertrend line value + """ + + def __init__(self, period: int = 10, multiplier: float = 3.0): + """ + Initialize Supertrend state. + + Args: + period: ATR period for Supertrend calculation (default: 10) + multiplier: Multiplier for ATR in band calculation (default: 3.0) + + Raises: + ValueError: If period is not positive or multiplier is not positive + """ + super().__init__(period) + + if multiplier <= 0: + raise ValueError(f"Multiplier must be positive, got {multiplier}") + + self.multiplier = multiplier + self.atr_state = ATRState(period) + + # State variables + self.previous_close = None + self.previous_trend = 1 # Start with uptrend assumption + self.final_upper_band = None + self.final_lower_band = None + + # Current values + self.current_trend = 1 + self.current_supertrend = None + + self.is_initialized = True + + def update(self, ohlc_data: Dict[str, float]) -> Dict[str, float]: + """ + Update Supertrend with new OHLC data. + + Args: + ohlc_data: Dictionary with 'open', 'high', 'low', 'close' keys + + Returns: + Dictionary with 'trend', 'supertrend', 'upper_band', 'lower_band' keys + + Raises: + ValueError: If OHLC data is invalid + TypeError: If ohlc_data is not a dictionary + """ + # Validate input + if not isinstance(ohlc_data, dict): + raise TypeError(f"ohlc_data must be a dictionary, got {type(ohlc_data)}") + + self.validate_input(ohlc_data) + + high = float(ohlc_data['high']) + low = float(ohlc_data['low']) + close = float(ohlc_data['close']) + + # Update ATR + atr_value = self.atr_state.update(ohlc_data) + + # Calculate HL2 (typical price) + hl2 = (high + low) / 2.0 + + # Calculate basic upper and lower bands + basic_upper_band = hl2 + (self.multiplier * atr_value) + basic_lower_band = hl2 - (self.multiplier * atr_value) + + # Calculate final upper band + if self.final_upper_band is None or basic_upper_band < self.final_upper_band or self.previous_close > self.final_upper_band: + final_upper_band = basic_upper_band + else: + final_upper_band = self.final_upper_band + + # Calculate final lower band + if self.final_lower_band is None or basic_lower_band > self.final_lower_band or self.previous_close < self.final_lower_band: + final_lower_band = basic_lower_band + else: + final_lower_band = self.final_lower_band + + # Determine trend + if self.previous_close is None: + # First calculation + trend = 1 if close > final_lower_band else -1 + else: + # Trend logic + if self.previous_trend == 1 and close <= final_lower_band: + trend = -1 + elif self.previous_trend == -1 and close >= final_upper_band: + trend = 1 + else: + trend = self.previous_trend + + # Calculate Supertrend value + if trend == 1: + supertrend_value = final_lower_band + else: + supertrend_value = final_upper_band + + # Store current state + self.previous_close = close + self.previous_trend = trend + self.final_upper_band = final_upper_band + self.final_lower_band = final_lower_band + self.current_trend = trend + self.current_supertrend = supertrend_value + self.values_received += 1 + + # Prepare result + result = { + 'trend': trend, + 'supertrend': supertrend_value, + 'upper_band': final_upper_band, + 'lower_band': final_lower_band, + 'atr': atr_value + } + + self._current_values = result + return result + + def is_warmed_up(self) -> bool: + """ + Check if Supertrend has enough data for reliable values. + + Returns: + True if ATR state is warmed up + """ + return self.atr_state.is_warmed_up() + + def reset(self) -> None: + """Reset Supertrend state to initial conditions.""" + self.atr_state.reset() + self.previous_close = None + self.previous_trend = 1 + self.final_upper_band = None + self.final_lower_band = None + self.current_trend = 1 + self.current_supertrend = None + self.values_received = 0 + self._current_values = {} + + def get_current_value(self) -> Optional[Dict[str, float]]: + """ + Get current Supertrend values without updating. + + Returns: + Dictionary with current Supertrend values, or None if not warmed up + """ + if not self.is_warmed_up(): + return None + return self._current_values.copy() if self._current_values else None + + def get_current_trend(self) -> int: + """ + Get current trend direction. + + Returns: + Current trend: +1 for uptrend, -1 for downtrend + """ + return self.current_trend + + def get_current_supertrend_value(self) -> Optional[float]: + """ + Get current Supertrend line value. + + Returns: + Current Supertrend value, or None if not available + """ + return self.current_supertrend + + def get_state_summary(self) -> dict: + """Get detailed state summary for debugging.""" + base_summary = super().get_state_summary() + base_summary.update({ + 'multiplier': self.multiplier, + 'previous_close': self.previous_close, + 'previous_trend': self.previous_trend, + 'current_trend': self.current_trend, + 'current_supertrend': self.current_supertrend, + 'final_upper_band': self.final_upper_band, + 'final_lower_band': self.final_lower_band, + 'atr_state': self.atr_state.get_state_summary() + }) + return base_summary + + +class SupertrendCollection: + """ + Collection of multiple Supertrend indicators with different parameters. + + This class manages multiple Supertrend indicators and provides meta-trend + calculation based on agreement between different Supertrend configurations. + Used by the DefaultStrategy for robust trend detection. + + Example: + # Create collection with three Supertrend indicators + collection = SupertrendCollection([ + (10, 3.0), # period=10, multiplier=3.0 + (11, 2.0), # period=11, multiplier=2.0 + (12, 1.0) # period=12, multiplier=1.0 + ]) + + # Update all indicators + results = collection.update(ohlc_data) + meta_trend = results['meta_trend'] # 1, -1, or 0 (neutral) + """ + + def __init__(self, supertrend_configs: list): + """ + Initialize Supertrend collection. + + Args: + supertrend_configs: List of (period, multiplier) tuples + """ + self.supertrends = [] + for period, multiplier in supertrend_configs: + self.supertrends.append(SupertrendState(period, multiplier)) + + self.values_received = 0 + + def update(self, ohlc_data: Dict[str, float]) -> Dict[str, Union[int, list]]: + """ + Update all Supertrend indicators and calculate meta-trend. + + Args: + ohlc_data: OHLC data dictionary + + Returns: + Dictionary with individual trends and meta-trend + """ + trends = [] + results = [] + + # Update each Supertrend + for supertrend in self.supertrends: + result = supertrend.update(ohlc_data) + trends.append(result['trend']) + results.append(result) + + # Calculate meta-trend: all must agree for directional signal + if all(trend == trends[0] for trend in trends): + meta_trend = trends[0] # All agree + else: + meta_trend = 0 # Neutral when trends don't agree + + self.values_received += 1 + + return { + 'trends': trends, + 'meta_trend': meta_trend, + 'results': results + } + + def is_warmed_up(self) -> bool: + """Check if all Supertrend indicators are warmed up.""" + return all(st.is_warmed_up() for st in self.supertrends) + + def reset(self) -> None: + """Reset all Supertrend indicators.""" + for supertrend in self.supertrends: + supertrend.reset() + self.values_received = 0 + + def get_current_meta_trend(self) -> int: + """ + Get current meta-trend without updating. + + Returns: + Current meta-trend: +1, -1, or 0 + """ + if not self.is_warmed_up(): + return 0 + + trends = [st.get_current_trend() for st in self.supertrends] + + if all(trend == trends[0] for trend in trends): + return trends[0] + else: + return 0 + + def get_state_summary(self) -> dict: + """Get detailed state summary for all Supertrends.""" + return { + 'num_supertrends': len(self.supertrends), + 'values_received': self.values_received, + 'is_warmed_up': self.is_warmed_up(), + 'current_meta_trend': self.get_current_meta_trend(), + 'supertrends': [st.get_state_summary() for st in self.supertrends] + } \ No newline at end of file