indicators

This commit is contained in:
Vasily.onl 2025-05-26 13:26:07 +08:00
parent e89317c65e
commit d985830ecd
7 changed files with 1636 additions and 0 deletions

View File

@ -0,0 +1,36 @@
"""
Incremental Indicator States Module
This module contains indicator state classes that maintain calculation state
for incremental processing of technical indicators.
All indicator states implement the IndicatorState interface and provide:
- Incremental updates with new data points
- Constant memory usage regardless of data history
- Identical results to traditional batch calculations
- Warm-up detection for reliable indicator values
Classes:
IndicatorState: Abstract base class for all indicator states
MovingAverageState: Incremental moving average calculation
RSIState: Incremental RSI calculation
ATRState: Incremental Average True Range calculation
SupertrendState: Incremental Supertrend calculation
BollingerBandsState: Incremental Bollinger Bands calculation
"""
from .base import IndicatorState
from .moving_average import MovingAverageState
from .rsi import RSIState
from .atr import ATRState
from .supertrend import SupertrendState
from .bollinger_bands import BollingerBandsState
__all__ = [
'IndicatorState',
'MovingAverageState',
'RSIState',
'ATRState',
'SupertrendState',
'BollingerBandsState'
]

View File

@ -0,0 +1,242 @@
"""
Average True Range (ATR) Indicator State
This module implements incremental ATR calculation that maintains constant memory usage
and provides identical results to traditional batch calculations. ATR is used by
Supertrend and other volatility-based indicators.
"""
from typing import Dict, Union, Optional
from .base import OHLCIndicatorState
from .moving_average import ExponentialMovingAverageState
class ATRState(OHLCIndicatorState):
"""
Incremental Average True Range calculation state.
ATR measures market volatility by calculating the average of true ranges over
a specified period. True Range is the maximum of:
1. Current High - Current Low
2. |Current High - Previous Close|
3. |Current Low - Previous Close|
This implementation uses exponential moving average for smoothing, which is
more responsive than simple moving average and requires less memory.
Attributes:
period (int): The ATR period
ema_state (ExponentialMovingAverageState): EMA state for smoothing true ranges
previous_close (float): Previous period's close price
Example:
atr = ATRState(period=14)
# Add OHLC data incrementally
ohlc = {'open': 100, 'high': 105, 'low': 98, 'close': 103}
atr_value = atr.update(ohlc) # Returns current ATR value
# Check if warmed up
if atr.is_warmed_up():
current_atr = atr.get_current_value()
"""
def __init__(self, period: int = 14):
"""
Initialize ATR state.
Args:
period: Number of periods for ATR calculation (default: 14)
Raises:
ValueError: If period is not a positive integer
"""
super().__init__(period)
self.ema_state = ExponentialMovingAverageState(period)
self.previous_close = None
self.is_initialized = True
def update(self, ohlc_data: Dict[str, float]) -> float:
"""
Update ATR with new OHLC data.
Args:
ohlc_data: Dictionary with 'open', 'high', 'low', 'close' keys
Returns:
Current ATR value
Raises:
ValueError: If OHLC data is invalid
TypeError: If ohlc_data is not a dictionary
"""
# Validate input
if not isinstance(ohlc_data, dict):
raise TypeError(f"ohlc_data must be a dictionary, got {type(ohlc_data)}")
self.validate_input(ohlc_data)
high = float(ohlc_data['high'])
low = float(ohlc_data['low'])
close = float(ohlc_data['close'])
# Calculate True Range
if self.previous_close is None:
# First period - True Range is just High - Low
true_range = high - low
else:
# True Range is the maximum of:
# 1. Current High - Current Low
# 2. |Current High - Previous Close|
# 3. |Current Low - Previous Close|
tr1 = high - low
tr2 = abs(high - self.previous_close)
tr3 = abs(low - self.previous_close)
true_range = max(tr1, tr2, tr3)
# Update EMA with the true range
atr_value = self.ema_state.update(true_range)
# Store current close as previous close for next calculation
self.previous_close = close
self.values_received += 1
# Store current ATR value
self._current_values = {'atr': atr_value}
return atr_value
def is_warmed_up(self) -> bool:
"""
Check if ATR has enough data for reliable values.
Returns:
True if EMA state is warmed up (has enough true range values)
"""
return self.ema_state.is_warmed_up()
def reset(self) -> None:
"""Reset ATR state to initial conditions."""
self.ema_state.reset()
self.previous_close = None
self.values_received = 0
self._current_values = {}
def get_current_value(self) -> Optional[float]:
"""
Get current ATR value without updating.
Returns:
Current ATR value, or None if not warmed up
"""
if not self.is_warmed_up():
return None
return self.ema_state.get_current_value()
def get_state_summary(self) -> dict:
"""Get detailed state summary for debugging."""
base_summary = super().get_state_summary()
base_summary.update({
'previous_close': self.previous_close,
'ema_state': self.ema_state.get_state_summary(),
'current_atr': self.get_current_value()
})
return base_summary
class SimpleATRState(OHLCIndicatorState):
"""
Simple ATR implementation using simple moving average instead of EMA.
This version uses a simple moving average for smoothing true ranges,
which matches some traditional ATR implementations but requires more memory.
"""
def __init__(self, period: int = 14):
"""
Initialize simple ATR state.
Args:
period: Number of periods for ATR calculation (default: 14)
"""
super().__init__(period)
from collections import deque
self.true_ranges = deque(maxlen=period)
self.tr_sum = 0.0
self.previous_close = None
self.is_initialized = True
def update(self, ohlc_data: Dict[str, float]) -> float:
"""
Update simple ATR with new OHLC data.
Args:
ohlc_data: Dictionary with 'open', 'high', 'low', 'close' keys
Returns:
Current ATR value
"""
# Validate input
if not isinstance(ohlc_data, dict):
raise TypeError(f"ohlc_data must be a dictionary, got {type(ohlc_data)}")
self.validate_input(ohlc_data)
high = float(ohlc_data['high'])
low = float(ohlc_data['low'])
close = float(ohlc_data['close'])
# Calculate True Range
if self.previous_close is None:
true_range = high - low
else:
tr1 = high - low
tr2 = abs(high - self.previous_close)
tr3 = abs(low - self.previous_close)
true_range = max(tr1, tr2, tr3)
# Update rolling sum
if len(self.true_ranges) == self.period:
self.tr_sum -= self.true_ranges[0] # Remove oldest value
self.true_ranges.append(true_range)
self.tr_sum += true_range
# Calculate ATR as simple moving average
atr_value = self.tr_sum / len(self.true_ranges)
# Store state
self.previous_close = close
self.values_received += 1
self._current_values = {'atr': atr_value}
return atr_value
def is_warmed_up(self) -> bool:
"""Check if simple ATR is warmed up."""
return len(self.true_ranges) >= self.period
def reset(self) -> None:
"""Reset simple ATR state."""
self.true_ranges.clear()
self.tr_sum = 0.0
self.previous_close = None
self.values_received = 0
self._current_values = {}
def get_current_value(self) -> Optional[float]:
"""Get current simple ATR value."""
if not self.is_warmed_up():
return None
return self.tr_sum / len(self.true_ranges)
def get_state_summary(self) -> dict:
"""Get detailed state summary for debugging."""
base_summary = super().get_state_summary()
base_summary.update({
'previous_close': self.previous_close,
'tr_window_size': len(self.true_ranges),
'tr_sum': self.tr_sum,
'current_atr': self.get_current_value()
})
return base_summary

View File

@ -0,0 +1,197 @@
"""
Base Indicator State Class
This module contains the abstract base class for all incremental indicator states.
All indicator implementations must inherit from IndicatorState and implement
the required methods for incremental calculation.
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Union
import numpy as np
class IndicatorState(ABC):
"""
Abstract base class for maintaining indicator calculation state.
This class defines the interface that all incremental indicators must implement.
Indicators maintain their internal state and can be updated incrementally with
new data points, providing constant memory usage and high performance.
Attributes:
period (int): The period/window size for the indicator
values_received (int): Number of values processed so far
is_initialized (bool): Whether the indicator has been initialized
Example:
class MyIndicator(IndicatorState):
def __init__(self, period: int):
super().__init__(period)
self._sum = 0.0
def update(self, new_value: float) -> float:
self._sum += new_value
self.values_received += 1
return self._sum / min(self.values_received, self.period)
"""
def __init__(self, period: int):
"""
Initialize the indicator state.
Args:
period: The period/window size for the indicator calculation
Raises:
ValueError: If period is not a positive integer
"""
if not isinstance(period, int) or period <= 0:
raise ValueError(f"Period must be a positive integer, got {period}")
self.period = period
self.values_received = 0
self.is_initialized = False
@abstractmethod
def update(self, new_value: Union[float, Dict[str, float]]) -> Union[float, Dict[str, float]]:
"""
Update indicator with new value and return current indicator value.
This method processes a new data point and updates the internal state
of the indicator. It returns the current indicator value after the update.
Args:
new_value: New data point (can be single value or OHLCV dict)
Returns:
Current indicator value after update (single value or dict)
Raises:
ValueError: If new_value is invalid or incompatible
"""
pass
@abstractmethod
def is_warmed_up(self) -> bool:
"""
Check whether indicator has enough data for reliable values.
Returns:
True if indicator has received enough data points for reliable calculation
"""
pass
@abstractmethod
def reset(self) -> None:
"""
Reset indicator state to initial conditions.
This method clears all internal state and resets the indicator
as if it was just initialized.
"""
pass
@abstractmethod
def get_current_value(self) -> Union[float, Dict[str, float], None]:
"""
Get the current indicator value without updating.
Returns:
Current indicator value, or None if not warmed up
"""
pass
def get_state_summary(self) -> Dict[str, Any]:
"""
Get summary of current indicator state for debugging.
Returns:
Dictionary containing indicator state information
"""
return {
'indicator_type': self.__class__.__name__,
'period': self.period,
'values_received': self.values_received,
'is_warmed_up': self.is_warmed_up(),
'is_initialized': self.is_initialized,
'current_value': self.get_current_value()
}
def validate_input(self, value: Union[float, Dict[str, float]]) -> None:
"""
Validate input value for the indicator.
Args:
value: Input value to validate
Raises:
ValueError: If value is invalid
TypeError: If value type is incorrect
"""
if isinstance(value, (int, float)):
if not np.isfinite(value):
raise ValueError(f"Input value must be finite, got {value}")
elif isinstance(value, dict):
required_keys = ['open', 'high', 'low', 'close']
for key in required_keys:
if key not in value:
raise ValueError(f"OHLCV dict missing required key: {key}")
if not np.isfinite(value[key]):
raise ValueError(f"OHLCV value for {key} must be finite, got {value[key]}")
# Validate OHLC relationships
if not (value['low'] <= value['open'] <= value['high'] and
value['low'] <= value['close'] <= value['high']):
raise ValueError(f"Invalid OHLC relationships: {value}")
else:
raise TypeError(f"Input value must be float or OHLCV dict, got {type(value)}")
def __repr__(self) -> str:
"""String representation of the indicator state."""
return (f"{self.__class__.__name__}(period={self.period}, "
f"values_received={self.values_received}, "
f"warmed_up={self.is_warmed_up()})")
class SimpleIndicatorState(IndicatorState):
"""
Base class for simple single-value indicators.
This class provides common functionality for indicators that work with
single float values and maintain a simple rolling calculation.
"""
def __init__(self, period: int):
"""Initialize simple indicator state."""
super().__init__(period)
self._current_value = None
def get_current_value(self) -> Optional[float]:
"""Get current indicator value."""
return self._current_value if self.is_warmed_up() else None
def is_warmed_up(self) -> bool:
"""Check if indicator is warmed up."""
return self.values_received >= self.period
class OHLCIndicatorState(IndicatorState):
"""
Base class for OHLC-based indicators.
This class provides common functionality for indicators that work with
OHLC data (Open, High, Low, Close) and may return multiple values.
"""
def __init__(self, period: int):
"""Initialize OHLC indicator state."""
super().__init__(period)
self._current_values = {}
def get_current_value(self) -> Optional[Dict[str, float]]:
"""Get current indicator values."""
return self._current_values.copy() if self.is_warmed_up() else None
def is_warmed_up(self) -> bool:
"""Check if indicator is warmed up."""
return self.values_received >= self.period

View File

@ -0,0 +1,325 @@
"""
Bollinger Bands Indicator State
This module implements incremental Bollinger Bands calculation that maintains constant memory usage
and provides identical results to traditional batch calculations. Used by the BBRSStrategy.
"""
from typing import Dict, Union, Optional
from collections import deque
import math
from .base import OHLCIndicatorState
from .moving_average import MovingAverageState
class BollingerBandsState(OHLCIndicatorState):
"""
Incremental Bollinger Bands calculation state.
Bollinger Bands consist of:
- Middle Band: Simple Moving Average of close prices
- Upper Band: Middle Band + (Standard Deviation * multiplier)
- Lower Band: Middle Band - (Standard Deviation * multiplier)
This implementation maintains a rolling window for standard deviation calculation
while using the MovingAverageState for the middle band.
Attributes:
period (int): Period for moving average and standard deviation
std_dev_multiplier (float): Multiplier for standard deviation
ma_state (MovingAverageState): Moving average state for middle band
close_values (deque): Rolling window of close prices for std dev calculation
close_sum_sq (float): Sum of squared close values for variance calculation
Example:
bb = BollingerBandsState(period=20, std_dev_multiplier=2.0)
# Add price data incrementally
result = bb.update(103.5) # Close price
upper_band = result['upper_band']
middle_band = result['middle_band']
lower_band = result['lower_band']
bandwidth = result['bandwidth']
"""
def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0):
"""
Initialize Bollinger Bands state.
Args:
period: Period for moving average and standard deviation (default: 20)
std_dev_multiplier: Multiplier for standard deviation (default: 2.0)
Raises:
ValueError: If period is not positive or multiplier is not positive
"""
super().__init__(period)
if std_dev_multiplier <= 0:
raise ValueError(f"Standard deviation multiplier must be positive, got {std_dev_multiplier}")
self.std_dev_multiplier = std_dev_multiplier
self.ma_state = MovingAverageState(period)
# For incremental standard deviation calculation
self.close_values = deque(maxlen=period)
self.close_sum_sq = 0.0 # Sum of squared values
self.is_initialized = True
def update(self, close_price: Union[float, int]) -> Dict[str, float]:
"""
Update Bollinger Bands with new close price.
Args:
close_price: New closing price
Returns:
Dictionary with 'upper_band', 'middle_band', 'lower_band', 'bandwidth', 'std_dev'
Raises:
ValueError: If close_price is not finite
TypeError: If close_price is not numeric
"""
# Validate input
if not isinstance(close_price, (int, float)):
raise TypeError(f"close_price must be numeric, got {type(close_price)}")
self.validate_input(close_price)
close_price = float(close_price)
# Update moving average (middle band)
middle_band = self.ma_state.update(close_price)
# Update rolling window for standard deviation
if len(self.close_values) == self.period:
# Remove oldest value from sum of squares
old_value = self.close_values[0]
self.close_sum_sq -= old_value * old_value
# Add new value
self.close_values.append(close_price)
self.close_sum_sq += close_price * close_price
# Calculate standard deviation
n = len(self.close_values)
if n < 2:
# Not enough data for standard deviation
std_dev = 0.0
else:
# Incremental variance calculation: Var = (sum_sq - n*mean^2) / (n-1)
mean = middle_band
variance = (self.close_sum_sq - n * mean * mean) / (n - 1)
std_dev = math.sqrt(max(variance, 0.0)) # Ensure non-negative
# Calculate bands
upper_band = middle_band + (self.std_dev_multiplier * std_dev)
lower_band = middle_band - (self.std_dev_multiplier * std_dev)
# Calculate bandwidth (normalized band width)
if middle_band != 0:
bandwidth = (upper_band - lower_band) / middle_band
else:
bandwidth = 0.0
self.values_received += 1
# Store current values
result = {
'upper_band': upper_band,
'middle_band': middle_band,
'lower_band': lower_band,
'bandwidth': bandwidth,
'std_dev': std_dev
}
self._current_values = result
return result
def is_warmed_up(self) -> bool:
"""
Check if Bollinger Bands has enough data for reliable values.
Returns:
True if we have at least 'period' number of values
"""
return self.ma_state.is_warmed_up()
def reset(self) -> None:
"""Reset Bollinger Bands state to initial conditions."""
self.ma_state.reset()
self.close_values.clear()
self.close_sum_sq = 0.0
self.values_received = 0
self._current_values = {}
def get_current_value(self) -> Optional[Dict[str, float]]:
"""
Get current Bollinger Bands values without updating.
Returns:
Dictionary with current BB values, or None if not warmed up
"""
if not self.is_warmed_up():
return None
return self._current_values.copy() if self._current_values else None
def get_squeeze_status(self, squeeze_threshold: float = 0.05) -> bool:
"""
Check if Bollinger Bands are in a squeeze condition.
Args:
squeeze_threshold: Bandwidth threshold for squeeze detection
Returns:
True if bandwidth is below threshold (squeeze condition)
"""
if not self.is_warmed_up() or not self._current_values:
return False
bandwidth = self._current_values.get('bandwidth', float('inf'))
return bandwidth < squeeze_threshold
def get_position_relative_to_bands(self, current_price: float) -> str:
"""
Get current price position relative to Bollinger Bands.
Args:
current_price: Current price to evaluate
Returns:
'above_upper', 'between_bands', 'below_lower', or 'unknown'
"""
if not self.is_warmed_up() or not self._current_values:
return 'unknown'
upper_band = self._current_values['upper_band']
lower_band = self._current_values['lower_band']
if current_price > upper_band:
return 'above_upper'
elif current_price < lower_band:
return 'below_lower'
else:
return 'between_bands'
def get_state_summary(self) -> dict:
"""Get detailed state summary for debugging."""
base_summary = super().get_state_summary()
base_summary.update({
'std_dev_multiplier': self.std_dev_multiplier,
'close_values_count': len(self.close_values),
'close_sum_sq': self.close_sum_sq,
'ma_state': self.ma_state.get_state_summary(),
'current_squeeze': self.get_squeeze_status() if self.is_warmed_up() else None
})
return base_summary
class BollingerBandsOHLCState(OHLCIndicatorState):
"""
Bollinger Bands implementation that works with OHLC data.
This version can calculate Bollinger Bands based on different price types
(close, typical price, etc.) and provides additional OHLC-based analysis.
"""
def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0, price_type: str = 'close'):
"""
Initialize OHLC Bollinger Bands state.
Args:
period: Period for calculation
std_dev_multiplier: Standard deviation multiplier
price_type: Price type to use ('close', 'typical', 'median', 'weighted')
"""
super().__init__(period)
if price_type not in ['close', 'typical', 'median', 'weighted']:
raise ValueError(f"Invalid price_type: {price_type}")
self.std_dev_multiplier = std_dev_multiplier
self.price_type = price_type
self.bb_state = BollingerBandsState(period, std_dev_multiplier)
self.is_initialized = True
def _extract_price(self, ohlc_data: Dict[str, float]) -> float:
"""Extract price based on price_type setting."""
if self.price_type == 'close':
return ohlc_data['close']
elif self.price_type == 'typical':
return (ohlc_data['high'] + ohlc_data['low'] + ohlc_data['close']) / 3.0
elif self.price_type == 'median':
return (ohlc_data['high'] + ohlc_data['low']) / 2.0
elif self.price_type == 'weighted':
return (ohlc_data['high'] + ohlc_data['low'] + 2 * ohlc_data['close']) / 4.0
else:
return ohlc_data['close']
def update(self, ohlc_data: Dict[str, float]) -> Dict[str, float]:
"""
Update Bollinger Bands with OHLC data.
Args:
ohlc_data: Dictionary with OHLC data
Returns:
Dictionary with Bollinger Bands values plus OHLC analysis
"""
# Validate input
if not isinstance(ohlc_data, dict):
raise TypeError(f"ohlc_data must be a dictionary, got {type(ohlc_data)}")
self.validate_input(ohlc_data)
# Extract price based on type
price = self._extract_price(ohlc_data)
# Update underlying BB state
bb_result = self.bb_state.update(price)
# Add OHLC-specific analysis
high = ohlc_data['high']
low = ohlc_data['low']
close = ohlc_data['close']
# Check if high/low touched bands
upper_band = bb_result['upper_band']
lower_band = bb_result['lower_band']
bb_result.update({
'high_above_upper': high > upper_band,
'low_below_lower': low < lower_band,
'close_position': self.bb_state.get_position_relative_to_bands(close),
'price_type': self.price_type,
'extracted_price': price
})
self.values_received += 1
self._current_values = bb_result
return bb_result
def is_warmed_up(self) -> bool:
"""Check if OHLC Bollinger Bands is warmed up."""
return self.bb_state.is_warmed_up()
def reset(self) -> None:
"""Reset OHLC Bollinger Bands state."""
self.bb_state.reset()
self.values_received = 0
self._current_values = {}
def get_current_value(self) -> Optional[Dict[str, float]]:
"""Get current OHLC Bollinger Bands values."""
return self.bb_state.get_current_value()
def get_state_summary(self) -> dict:
"""Get detailed state summary."""
base_summary = super().get_state_summary()
base_summary.update({
'price_type': self.price_type,
'bb_state': self.bb_state.get_state_summary()
})
return base_summary

View File

@ -0,0 +1,228 @@
"""
Moving Average Indicator State
This module implements incremental moving average calculation that maintains
constant memory usage and provides identical results to traditional batch calculations.
"""
from collections import deque
from typing import Union
from .base import SimpleIndicatorState
class MovingAverageState(SimpleIndicatorState):
"""
Incremental moving average calculation state.
This class maintains the state for calculating a simple moving average
incrementally. It uses a rolling window approach with constant memory usage.
Attributes:
period (int): The moving average period
values (deque): Rolling window of values (max length = period)
sum (float): Current sum of values in the window
Example:
ma = MovingAverageState(period=20)
# Add values incrementally
ma_value = ma.update(100.0) # Returns current MA value
ma_value = ma.update(105.0) # Updates and returns new MA value
# Check if warmed up (has enough values)
if ma.is_warmed_up():
current_ma = ma.get_current_value()
"""
def __init__(self, period: int):
"""
Initialize moving average state.
Args:
period: Number of periods for the moving average
Raises:
ValueError: If period is not a positive integer
"""
super().__init__(period)
self.values = deque(maxlen=period)
self.sum = 0.0
self.is_initialized = True
def update(self, new_value: Union[float, int]) -> float:
"""
Update moving average with new value.
Args:
new_value: New price/value to add to the moving average
Returns:
Current moving average value
Raises:
ValueError: If new_value is not finite
TypeError: If new_value is not numeric
"""
# Validate input
if not isinstance(new_value, (int, float)):
raise TypeError(f"new_value must be numeric, got {type(new_value)}")
self.validate_input(new_value)
# If deque is at max capacity, subtract the value being removed
if len(self.values) == self.period:
self.sum -= self.values[0] # Will be automatically removed by deque
# Add new value
self.values.append(float(new_value))
self.sum += float(new_value)
self.values_received += 1
# Calculate current moving average
current_count = len(self.values)
self._current_value = self.sum / current_count
return self._current_value
def is_warmed_up(self) -> bool:
"""
Check if moving average has enough data for reliable values.
Returns:
True if we have at least 'period' number of values
"""
return len(self.values) >= self.period
def reset(self) -> None:
"""Reset moving average state to initial conditions."""
self.values.clear()
self.sum = 0.0
self.values_received = 0
self._current_value = None
def get_current_value(self) -> Union[float, None]:
"""
Get current moving average value without updating.
Returns:
Current moving average value, or None if not enough data
"""
if len(self.values) == 0:
return None
return self.sum / len(self.values)
def get_state_summary(self) -> dict:
"""Get detailed state summary for debugging."""
base_summary = super().get_state_summary()
base_summary.update({
'window_size': len(self.values),
'sum': self.sum,
'values_in_window': list(self.values) if len(self.values) <= 10 else f"[{len(self.values)} values]"
})
return base_summary
class ExponentialMovingAverageState(SimpleIndicatorState):
"""
Incremental exponential moving average calculation state.
This class maintains the state for calculating an exponential moving average (EMA)
incrementally. EMA gives more weight to recent values and requires minimal memory.
Attributes:
period (int): The EMA period (used to calculate smoothing factor)
alpha (float): Smoothing factor (2 / (period + 1))
ema_value (float): Current EMA value
Example:
ema = ExponentialMovingAverageState(period=20)
# Add values incrementally
ema_value = ema.update(100.0) # Returns current EMA value
ema_value = ema.update(105.0) # Updates and returns new EMA value
"""
def __init__(self, period: int):
"""
Initialize exponential moving average state.
Args:
period: Number of periods for the EMA (used to calculate alpha)
Raises:
ValueError: If period is not a positive integer
"""
super().__init__(period)
self.alpha = 2.0 / (period + 1) # Smoothing factor
self.ema_value = None
self.is_initialized = True
def update(self, new_value: Union[float, int]) -> float:
"""
Update exponential moving average with new value.
Args:
new_value: New price/value to add to the EMA
Returns:
Current EMA value
Raises:
ValueError: If new_value is not finite
TypeError: If new_value is not numeric
"""
# Validate input
if not isinstance(new_value, (int, float)):
raise TypeError(f"new_value must be numeric, got {type(new_value)}")
self.validate_input(new_value)
new_value = float(new_value)
if self.ema_value is None:
# First value - initialize EMA
self.ema_value = new_value
else:
# EMA formula: EMA = alpha * new_value + (1 - alpha) * previous_EMA
self.ema_value = self.alpha * new_value + (1 - self.alpha) * self.ema_value
self.values_received += 1
self._current_value = self.ema_value
return self.ema_value
def is_warmed_up(self) -> bool:
"""
Check if EMA has enough data for reliable values.
For EMA, we consider it warmed up after receiving 'period' number of values,
though it starts producing values immediately.
Returns:
True if we have at least 'period' number of values
"""
return self.values_received >= self.period
def reset(self) -> None:
"""Reset EMA state to initial conditions."""
self.ema_value = None
self.values_received = 0
self._current_value = None
def get_current_value(self) -> Union[float, None]:
"""
Get current EMA value without updating.
Returns:
Current EMA value, or None if no data received
"""
return self.ema_value
def get_state_summary(self) -> dict:
"""Get detailed state summary for debugging."""
base_summary = super().get_state_summary()
base_summary.update({
'alpha': self.alpha,
'ema_value': self.ema_value
})
return base_summary

View File

@ -0,0 +1,276 @@
"""
RSI (Relative Strength Index) Indicator State
This module implements incremental RSI calculation that maintains constant memory usage
and provides identical results to traditional batch calculations.
"""
from typing import Union, Optional
from .base import SimpleIndicatorState
from .moving_average import ExponentialMovingAverageState
class RSIState(SimpleIndicatorState):
"""
Incremental RSI calculation state.
RSI measures the speed and magnitude of price changes to evaluate overbought
or oversold conditions. It oscillates between 0 and 100.
RSI = 100 - (100 / (1 + RS))
where RS = Average Gain / Average Loss over the specified period
This implementation uses exponential moving averages for gain and loss smoothing,
which is more responsive and memory-efficient than simple moving averages.
Attributes:
period (int): The RSI period (typically 14)
gain_ema (ExponentialMovingAverageState): EMA state for gains
loss_ema (ExponentialMovingAverageState): EMA state for losses
previous_close (float): Previous period's close price
Example:
rsi = RSIState(period=14)
# Add price data incrementally
rsi_value = rsi.update(100.0) # Returns current RSI value
rsi_value = rsi.update(105.0) # Updates and returns new RSI value
# Check if warmed up
if rsi.is_warmed_up():
current_rsi = rsi.get_current_value()
"""
def __init__(self, period: int = 14):
"""
Initialize RSI state.
Args:
period: Number of periods for RSI calculation (default: 14)
Raises:
ValueError: If period is not a positive integer
"""
super().__init__(period)
self.gain_ema = ExponentialMovingAverageState(period)
self.loss_ema = ExponentialMovingAverageState(period)
self.previous_close = None
self.is_initialized = True
def update(self, new_close: Union[float, int]) -> float:
"""
Update RSI with new close price.
Args:
new_close: New closing price
Returns:
Current RSI value (0-100)
Raises:
ValueError: If new_close is not finite
TypeError: If new_close is not numeric
"""
# Validate input
if not isinstance(new_close, (int, float)):
raise TypeError(f"new_close must be numeric, got {type(new_close)}")
self.validate_input(new_close)
new_close = float(new_close)
if self.previous_close is None:
# First value - no gain/loss to calculate
self.previous_close = new_close
self.values_received += 1
# Return neutral RSI for first value
self._current_value = 50.0
return self._current_value
# Calculate price change
price_change = new_close - self.previous_close
# Separate gains and losses
gain = max(price_change, 0.0)
loss = max(-price_change, 0.0)
# Update EMAs for gains and losses
avg_gain = self.gain_ema.update(gain)
avg_loss = self.loss_ema.update(loss)
# Calculate RSI
if avg_loss == 0.0:
# Avoid division by zero - all gains, no losses
rsi_value = 100.0
else:
rs = avg_gain / avg_loss
rsi_value = 100.0 - (100.0 / (1.0 + rs))
# Store state
self.previous_close = new_close
self.values_received += 1
self._current_value = rsi_value
return rsi_value
def is_warmed_up(self) -> bool:
"""
Check if RSI has enough data for reliable values.
Returns:
True if both gain and loss EMAs are warmed up
"""
return self.gain_ema.is_warmed_up() and self.loss_ema.is_warmed_up()
def reset(self) -> None:
"""Reset RSI state to initial conditions."""
self.gain_ema.reset()
self.loss_ema.reset()
self.previous_close = None
self.values_received = 0
self._current_value = None
def get_current_value(self) -> Optional[float]:
"""
Get current RSI value without updating.
Returns:
Current RSI value (0-100), or None if not enough data
"""
if self.values_received == 0:
return None
elif self.values_received == 1:
return 50.0 # Neutral RSI for first value
elif not self.is_warmed_up():
return self._current_value # Return current calculation even if not fully warmed up
else:
return self._current_value
def get_state_summary(self) -> dict:
"""Get detailed state summary for debugging."""
base_summary = super().get_state_summary()
base_summary.update({
'previous_close': self.previous_close,
'gain_ema': self.gain_ema.get_state_summary(),
'loss_ema': self.loss_ema.get_state_summary(),
'current_rsi': self.get_current_value()
})
return base_summary
class SimpleRSIState(SimpleIndicatorState):
"""
Simple RSI implementation using simple moving averages instead of EMAs.
This version uses simple moving averages for gain and loss smoothing,
which matches traditional RSI implementations but requires more memory.
"""
def __init__(self, period: int = 14):
"""
Initialize simple RSI state.
Args:
period: Number of periods for RSI calculation (default: 14)
"""
super().__init__(period)
from collections import deque
self.gains = deque(maxlen=period)
self.losses = deque(maxlen=period)
self.gain_sum = 0.0
self.loss_sum = 0.0
self.previous_close = None
self.is_initialized = True
def update(self, new_close: Union[float, int]) -> float:
"""
Update simple RSI with new close price.
Args:
new_close: New closing price
Returns:
Current RSI value (0-100)
"""
# Validate input
if not isinstance(new_close, (int, float)):
raise TypeError(f"new_close must be numeric, got {type(new_close)}")
self.validate_input(new_close)
new_close = float(new_close)
if self.previous_close is None:
# First value
self.previous_close = new_close
self.values_received += 1
self._current_value = 50.0
return self._current_value
# Calculate price change
price_change = new_close - self.previous_close
gain = max(price_change, 0.0)
loss = max(-price_change, 0.0)
# Update rolling sums
if len(self.gains) == self.period:
self.gain_sum -= self.gains[0]
self.loss_sum -= self.losses[0]
self.gains.append(gain)
self.losses.append(loss)
self.gain_sum += gain
self.loss_sum += loss
# Calculate RSI
if len(self.gains) == 0:
rsi_value = 50.0
else:
avg_gain = self.gain_sum / len(self.gains)
avg_loss = self.loss_sum / len(self.losses)
if avg_loss == 0.0:
rsi_value = 100.0
else:
rs = avg_gain / avg_loss
rsi_value = 100.0 - (100.0 / (1.0 + rs))
# Store state
self.previous_close = new_close
self.values_received += 1
self._current_value = rsi_value
return rsi_value
def is_warmed_up(self) -> bool:
"""Check if simple RSI is warmed up."""
return len(self.gains) >= self.period
def reset(self) -> None:
"""Reset simple RSI state."""
self.gains.clear()
self.losses.clear()
self.gain_sum = 0.0
self.loss_sum = 0.0
self.previous_close = None
self.values_received = 0
self._current_value = None
def get_current_value(self) -> Optional[float]:
"""Get current simple RSI value."""
if self.values_received == 0:
return None
return self._current_value
def get_state_summary(self) -> dict:
"""Get detailed state summary for debugging."""
base_summary = super().get_state_summary()
base_summary.update({
'previous_close': self.previous_close,
'gains_window_size': len(self.gains),
'losses_window_size': len(self.losses),
'gain_sum': self.gain_sum,
'loss_sum': self.loss_sum,
'current_rsi': self.get_current_value()
})
return base_summary

View File

@ -0,0 +1,332 @@
"""
Supertrend Indicator State
This module implements incremental Supertrend calculation that maintains constant memory usage
and provides identical results to traditional batch calculations. Supertrend is used by
the DefaultStrategy for trend detection.
"""
from typing import Dict, Union, Optional
from .base import OHLCIndicatorState
from .atr import ATRState
class SupertrendState(OHLCIndicatorState):
"""
Incremental Supertrend calculation state.
Supertrend is a trend-following indicator that uses Average True Range (ATR)
to calculate dynamic support and resistance levels. It provides clear trend
direction signals: +1 for uptrend, -1 for downtrend.
The calculation involves:
1. Calculate ATR for the given period
2. Calculate basic upper and lower bands using ATR and multiplier
3. Calculate final upper and lower bands with trend logic
4. Determine trend direction based on price vs bands
Attributes:
period (int): ATR period for Supertrend calculation
multiplier (float): Multiplier for ATR in band calculation
atr_state (ATRState): ATR calculation state
previous_close (float): Previous period's close price
previous_trend (int): Previous trend direction (+1 or -1)
final_upper_band (float): Current final upper band
final_lower_band (float): Current final lower band
Example:
supertrend = SupertrendState(period=10, multiplier=3.0)
# Add OHLC data incrementally
ohlc = {'open': 100, 'high': 105, 'low': 98, 'close': 103}
result = supertrend.update(ohlc)
trend = result['trend'] # +1 or -1
supertrend_value = result['supertrend'] # Supertrend line value
"""
def __init__(self, period: int = 10, multiplier: float = 3.0):
"""
Initialize Supertrend state.
Args:
period: ATR period for Supertrend calculation (default: 10)
multiplier: Multiplier for ATR in band calculation (default: 3.0)
Raises:
ValueError: If period is not positive or multiplier is not positive
"""
super().__init__(period)
if multiplier <= 0:
raise ValueError(f"Multiplier must be positive, got {multiplier}")
self.multiplier = multiplier
self.atr_state = ATRState(period)
# State variables
self.previous_close = None
self.previous_trend = 1 # Start with uptrend assumption
self.final_upper_band = None
self.final_lower_band = None
# Current values
self.current_trend = 1
self.current_supertrend = None
self.is_initialized = True
def update(self, ohlc_data: Dict[str, float]) -> Dict[str, float]:
"""
Update Supertrend with new OHLC data.
Args:
ohlc_data: Dictionary with 'open', 'high', 'low', 'close' keys
Returns:
Dictionary with 'trend', 'supertrend', 'upper_band', 'lower_band' keys
Raises:
ValueError: If OHLC data is invalid
TypeError: If ohlc_data is not a dictionary
"""
# Validate input
if not isinstance(ohlc_data, dict):
raise TypeError(f"ohlc_data must be a dictionary, got {type(ohlc_data)}")
self.validate_input(ohlc_data)
high = float(ohlc_data['high'])
low = float(ohlc_data['low'])
close = float(ohlc_data['close'])
# Update ATR
atr_value = self.atr_state.update(ohlc_data)
# Calculate HL2 (typical price)
hl2 = (high + low) / 2.0
# Calculate basic upper and lower bands
basic_upper_band = hl2 + (self.multiplier * atr_value)
basic_lower_band = hl2 - (self.multiplier * atr_value)
# Calculate final upper band
if self.final_upper_band is None or basic_upper_band < self.final_upper_band or self.previous_close > self.final_upper_band:
final_upper_band = basic_upper_band
else:
final_upper_band = self.final_upper_band
# Calculate final lower band
if self.final_lower_band is None or basic_lower_band > self.final_lower_band or self.previous_close < self.final_lower_band:
final_lower_band = basic_lower_band
else:
final_lower_band = self.final_lower_band
# Determine trend
if self.previous_close is None:
# First calculation
trend = 1 if close > final_lower_band else -1
else:
# Trend logic
if self.previous_trend == 1 and close <= final_lower_band:
trend = -1
elif self.previous_trend == -1 and close >= final_upper_band:
trend = 1
else:
trend = self.previous_trend
# Calculate Supertrend value
if trend == 1:
supertrend_value = final_lower_band
else:
supertrend_value = final_upper_band
# Store current state
self.previous_close = close
self.previous_trend = trend
self.final_upper_band = final_upper_band
self.final_lower_band = final_lower_band
self.current_trend = trend
self.current_supertrend = supertrend_value
self.values_received += 1
# Prepare result
result = {
'trend': trend,
'supertrend': supertrend_value,
'upper_band': final_upper_band,
'lower_band': final_lower_band,
'atr': atr_value
}
self._current_values = result
return result
def is_warmed_up(self) -> bool:
"""
Check if Supertrend has enough data for reliable values.
Returns:
True if ATR state is warmed up
"""
return self.atr_state.is_warmed_up()
def reset(self) -> None:
"""Reset Supertrend state to initial conditions."""
self.atr_state.reset()
self.previous_close = None
self.previous_trend = 1
self.final_upper_band = None
self.final_lower_band = None
self.current_trend = 1
self.current_supertrend = None
self.values_received = 0
self._current_values = {}
def get_current_value(self) -> Optional[Dict[str, float]]:
"""
Get current Supertrend values without updating.
Returns:
Dictionary with current Supertrend values, or None if not warmed up
"""
if not self.is_warmed_up():
return None
return self._current_values.copy() if self._current_values else None
def get_current_trend(self) -> int:
"""
Get current trend direction.
Returns:
Current trend: +1 for uptrend, -1 for downtrend
"""
return self.current_trend
def get_current_supertrend_value(self) -> Optional[float]:
"""
Get current Supertrend line value.
Returns:
Current Supertrend value, or None if not available
"""
return self.current_supertrend
def get_state_summary(self) -> dict:
"""Get detailed state summary for debugging."""
base_summary = super().get_state_summary()
base_summary.update({
'multiplier': self.multiplier,
'previous_close': self.previous_close,
'previous_trend': self.previous_trend,
'current_trend': self.current_trend,
'current_supertrend': self.current_supertrend,
'final_upper_band': self.final_upper_band,
'final_lower_band': self.final_lower_band,
'atr_state': self.atr_state.get_state_summary()
})
return base_summary
class SupertrendCollection:
"""
Collection of multiple Supertrend indicators with different parameters.
This class manages multiple Supertrend indicators and provides meta-trend
calculation based on agreement between different Supertrend configurations.
Used by the DefaultStrategy for robust trend detection.
Example:
# Create collection with three Supertrend indicators
collection = SupertrendCollection([
(10, 3.0), # period=10, multiplier=3.0
(11, 2.0), # period=11, multiplier=2.0
(12, 1.0) # period=12, multiplier=1.0
])
# Update all indicators
results = collection.update(ohlc_data)
meta_trend = results['meta_trend'] # 1, -1, or 0 (neutral)
"""
def __init__(self, supertrend_configs: list):
"""
Initialize Supertrend collection.
Args:
supertrend_configs: List of (period, multiplier) tuples
"""
self.supertrends = []
for period, multiplier in supertrend_configs:
self.supertrends.append(SupertrendState(period, multiplier))
self.values_received = 0
def update(self, ohlc_data: Dict[str, float]) -> Dict[str, Union[int, list]]:
"""
Update all Supertrend indicators and calculate meta-trend.
Args:
ohlc_data: OHLC data dictionary
Returns:
Dictionary with individual trends and meta-trend
"""
trends = []
results = []
# Update each Supertrend
for supertrend in self.supertrends:
result = supertrend.update(ohlc_data)
trends.append(result['trend'])
results.append(result)
# Calculate meta-trend: all must agree for directional signal
if all(trend == trends[0] for trend in trends):
meta_trend = trends[0] # All agree
else:
meta_trend = 0 # Neutral when trends don't agree
self.values_received += 1
return {
'trends': trends,
'meta_trend': meta_trend,
'results': results
}
def is_warmed_up(self) -> bool:
"""Check if all Supertrend indicators are warmed up."""
return all(st.is_warmed_up() for st in self.supertrends)
def reset(self) -> None:
"""Reset all Supertrend indicators."""
for supertrend in self.supertrends:
supertrend.reset()
self.values_received = 0
def get_current_meta_trend(self) -> int:
"""
Get current meta-trend without updating.
Returns:
Current meta-trend: +1, -1, or 0
"""
if not self.is_warmed_up():
return 0
trends = [st.get_current_trend() for st in self.supertrends]
if all(trend == trends[0] for trend in trends):
return trends[0]
else:
return 0
def get_state_summary(self) -> dict:
"""Get detailed state summary for all Supertrends."""
return {
'num_supertrends': len(self.supertrends),
'values_received': self.values_received,
'is_warmed_up': self.is_warmed_up(),
'current_meta_trend': self.get_current_meta_trend(),
'supertrends': [st.get_state_summary() for st in self.supertrends]
}