""" RSI (Relative Strength Index) Indicator State This module implements incremental RSI calculation that maintains constant memory usage and provides identical results to traditional batch calculations. """ from typing import Union, Optional from .base import SimpleIndicatorState from .moving_average import ExponentialMovingAverageState class RSIState(SimpleIndicatorState): """ Incremental RSI calculation state. RSI measures the speed and magnitude of price changes to evaluate overbought or oversold conditions. It oscillates between 0 and 100. RSI = 100 - (100 / (1 + RS)) where RS = Average Gain / Average Loss over the specified period This implementation uses exponential moving averages for gain and loss smoothing, which is more responsive and memory-efficient than simple moving averages. Attributes: period (int): The RSI period (typically 14) gain_ema (ExponentialMovingAverageState): EMA state for gains loss_ema (ExponentialMovingAverageState): EMA state for losses previous_close (float): Previous period's close price Example: rsi = RSIState(period=14) # Add price data incrementally rsi_value = rsi.update(100.0) # Returns current RSI value rsi_value = rsi.update(105.0) # Updates and returns new RSI value # Check if warmed up if rsi.is_warmed_up(): current_rsi = rsi.get_current_value() """ def __init__(self, period: int = 14): """ Initialize RSI state. Args: period: Number of periods for RSI calculation (default: 14) Raises: ValueError: If period is not a positive integer """ super().__init__(period) self.gain_ema = ExponentialMovingAverageState(period) self.loss_ema = ExponentialMovingAverageState(period) self.previous_close = None self.is_initialized = True def update(self, new_close: Union[float, int]) -> float: """ Update RSI with new close price. Args: new_close: New closing price Returns: Current RSI value (0-100) Raises: ValueError: If new_close is not finite TypeError: If new_close is not numeric """ # Validate input if not isinstance(new_close, (int, float)): raise TypeError(f"new_close must be numeric, got {type(new_close)}") self.validate_input(new_close) new_close = float(new_close) if self.previous_close is None: # First value - no gain/loss to calculate self.previous_close = new_close self.values_received += 1 # Return neutral RSI for first value self._current_value = 50.0 return self._current_value # Calculate price change price_change = new_close - self.previous_close # Separate gains and losses gain = max(price_change, 0.0) loss = max(-price_change, 0.0) # Update EMAs for gains and losses avg_gain = self.gain_ema.update(gain) avg_loss = self.loss_ema.update(loss) # Calculate RSI if avg_loss == 0.0: # Avoid division by zero - all gains, no losses rsi_value = 100.0 else: rs = avg_gain / avg_loss rsi_value = 100.0 - (100.0 / (1.0 + rs)) # Store state self.previous_close = new_close self.values_received += 1 self._current_value = rsi_value return rsi_value def is_warmed_up(self) -> bool: """ Check if RSI has enough data for reliable values. Returns: True if both gain and loss EMAs are warmed up """ return self.gain_ema.is_warmed_up() and self.loss_ema.is_warmed_up() def reset(self) -> None: """Reset RSI state to initial conditions.""" self.gain_ema.reset() self.loss_ema.reset() self.previous_close = None self.values_received = 0 self._current_value = None def get_current_value(self) -> Optional[float]: """ Get current RSI value without updating. Returns: Current RSI value (0-100), or None if not enough data """ if self.values_received == 0: return None elif self.values_received == 1: return 50.0 # Neutral RSI for first value elif not self.is_warmed_up(): return self._current_value # Return current calculation even if not fully warmed up else: return self._current_value def get_state_summary(self) -> dict: """Get detailed state summary for debugging.""" base_summary = super().get_state_summary() base_summary.update({ 'previous_close': self.previous_close, 'gain_ema': self.gain_ema.get_state_summary(), 'loss_ema': self.loss_ema.get_state_summary(), 'current_rsi': self.get_current_value() }) return base_summary class SimpleRSIState(SimpleIndicatorState): """ Simple RSI implementation using simple moving averages instead of EMAs. This version uses simple moving averages for gain and loss smoothing, which matches traditional RSI implementations but requires more memory. """ def __init__(self, period: int = 14): """ Initialize simple RSI state. Args: period: Number of periods for RSI calculation (default: 14) """ super().__init__(period) from collections import deque self.gains = deque(maxlen=period) self.losses = deque(maxlen=period) self.gain_sum = 0.0 self.loss_sum = 0.0 self.previous_close = None self.is_initialized = True def update(self, new_close: Union[float, int]) -> float: """ Update simple RSI with new close price. Args: new_close: New closing price Returns: Current RSI value (0-100) """ # Validate input if not isinstance(new_close, (int, float)): raise TypeError(f"new_close must be numeric, got {type(new_close)}") self.validate_input(new_close) new_close = float(new_close) if self.previous_close is None: # First value self.previous_close = new_close self.values_received += 1 self._current_value = 50.0 return self._current_value # Calculate price change price_change = new_close - self.previous_close gain = max(price_change, 0.0) loss = max(-price_change, 0.0) # Update rolling sums if len(self.gains) == self.period: self.gain_sum -= self.gains[0] self.loss_sum -= self.losses[0] self.gains.append(gain) self.losses.append(loss) self.gain_sum += gain self.loss_sum += loss # Calculate RSI if len(self.gains) == 0: rsi_value = 50.0 else: avg_gain = self.gain_sum / len(self.gains) avg_loss = self.loss_sum / len(self.losses) if avg_loss == 0.0: rsi_value = 100.0 else: rs = avg_gain / avg_loss rsi_value = 100.0 - (100.0 / (1.0 + rs)) # Store state self.previous_close = new_close self.values_received += 1 self._current_value = rsi_value return rsi_value def is_warmed_up(self) -> bool: """Check if simple RSI is warmed up.""" return len(self.gains) >= self.period def reset(self) -> None: """Reset simple RSI state.""" self.gains.clear() self.losses.clear() self.gain_sum = 0.0 self.loss_sum = 0.0 self.previous_close = None self.values_received = 0 self._current_value = None def get_current_value(self) -> Optional[float]: """Get current simple RSI value.""" if self.values_received == 0: return None return self._current_value def get_state_summary(self) -> dict: """Get detailed state summary for debugging.""" base_summary = super().get_state_summary() base_summary.update({ 'previous_close': self.previous_close, 'gains_window_size': len(self.gains), 'losses_window_size': len(self.losses), 'gain_sum': self.gain_sum, 'loss_sum': self.loss_sum, 'current_rsi': self.get_current_value() }) return base_summary