2025-05-28 22:37:53 +08:00

11 KiB

Base Indicator Classes

Overview

All indicators in IncrementalTrader are built on a foundation of base classes that provide common functionality for incremental computation. These base classes ensure consistent behavior, memory efficiency, and real-time capability across all indicators.

Available indicators

IndicatorState

The foundation class for all indicators in the framework.

Features

  • Incremental Computation: O(1) time complexity per update
  • Constant Memory: O(1) space complexity regardless of data history
  • State Management: Maintains internal state efficiently
  • Ready State Tracking: Indicates when indicator has sufficient data

Class Definition

from IncrementalTrader.strategies.indicators import IndicatorState

class IndicatorState:
    def __init__(self, period: int):
        self.period = period
        self.data_count = 0
    
    def update(self, value: float):
        """Update indicator with new value."""
        raise NotImplementedError("Subclasses must implement update method")
    
    def get_value(self) -> float:
        """Get current indicator value."""
        raise NotImplementedError("Subclasses must implement get_value method")
    
    def is_ready(self) -> bool:
        """Check if indicator has enough data."""
        return self.data_count >= self.period
    
    def reset(self):
        """Reset indicator state."""
        self.data_count = 0

Methods

Method Description Returns
update(value: float) Update indicator with new value None
get_value() -> float Get current indicator value float
is_ready() -> bool Check if indicator has enough data bool
reset() Reset indicator state None

Usage Example

class MyCustomIndicator(IndicatorState):
    def __init__(self, period: int):
        super().__init__(period)
        self.sum = 0.0
        self.values = []
    
    def update(self, value: float):
        self.values.append(value)
        self.sum += value
        
        if len(self.values) > self.period:
            old_value = self.values.pop(0)
            self.sum -= old_value
        
        self.data_count += 1
    
    def get_value(self) -> float:
        if not self.is_ready():
            return 0.0
        return self.sum / min(len(self.values), self.period)

# Usage
indicator = MyCustomIndicator(period=10)
for price in [100, 101, 99, 102, 98]:
    indicator.update(price)
    if indicator.is_ready():
        print(f"Value: {indicator.get_value():.2f}")

SimpleIndicatorState

For indicators that only need the current value and don't require a period.

Features

  • Immediate Ready: Always ready after first update
  • No Period Requirement: Doesn't need historical data
  • Minimal State: Stores only current value

Class Definition

class SimpleIndicatorState(IndicatorState):
    def __init__(self):
        super().__init__(period=1)
        self.current_value = 0.0
    
    def update(self, value: float):
        self.current_value = value
        self.data_count = 1  # Always ready
    
    def get_value(self) -> float:
        return self.current_value

Usage Example

# Simple price tracker
price_tracker = SimpleIndicatorState()

for price in [100, 101, 99, 102]:
    price_tracker.update(price)
    print(f"Current price: {price_tracker.get_value():.2f}")

OHLCIndicatorState

For indicators that require OHLC (Open, High, Low, Close) data instead of just a single price value.

Features

  • OHLC Data Support: Handles high, low, close data
  • Flexible Updates: Can update with individual OHLC components
  • Typical Price Calculation: Built-in typical price (HLC/3) calculation

Class Definition

class OHLCIndicatorState(IndicatorState):
    def __init__(self, period: int):
        super().__init__(period)
        self.current_high = 0.0
        self.current_low = 0.0
        self.current_close = 0.0
    
    def update_ohlc(self, high: float, low: float, close: float):
        """Update with OHLC data."""
        self.current_high = high
        self.current_low = low
        self.current_close = close
        self._process_ohlc_data(high, low, close)
        self.data_count += 1
    
    def _process_ohlc_data(self, high: float, low: float, close: float):
        """Process OHLC data - to be implemented by subclasses."""
        raise NotImplementedError("Subclasses must implement _process_ohlc_data")
    
    def get_typical_price(self) -> float:
        """Calculate typical price (HLC/3)."""
        return (self.current_high + self.current_low + self.current_close) / 3.0
    
    def get_true_range(self, prev_close: float = None) -> float:
        """Calculate True Range."""
        if prev_close is None:
            return self.current_high - self.current_low
        
        return max(
            self.current_high - self.current_low,
            abs(self.current_high - prev_close),
            abs(self.current_low - prev_close)
        )

Methods

Method Description Returns
update_ohlc(high, low, close) Update with OHLC data None
get_typical_price() Get typical price (HLC/3) float
get_true_range(prev_close) Calculate True Range float

Usage Example

class MyOHLCIndicator(OHLCIndicatorState):
    def __init__(self, period: int):
        super().__init__(period)
        self.hl_sum = 0.0
        self.count = 0
    
    def _process_ohlc_data(self, high: float, low: float, close: float):
        self.hl_sum += (high - low)
        self.count += 1
    
    def get_value(self) -> float:
        if self.count == 0:
            return 0.0
        return self.hl_sum / self.count

# Usage
ohlc_indicator = MyOHLCIndicator(period=10)
ohlc_data = [(105, 95, 100), (108, 98, 102), (110, 100, 105)]

for high, low, close in ohlc_data:
    ohlc_indicator.update_ohlc(high, low, close)
    if ohlc_indicator.is_ready():
        print(f"Average Range: {ohlc_indicator.get_value():.2f}")
        print(f"Typical Price: {ohlc_indicator.get_typical_price():.2f}")

Best Practices

1. Always Check Ready State

indicator = MovingAverageState(period=20)

for price in price_data:
    indicator.update(price)
    
    # Always check if ready before using value
    if indicator.is_ready():
        value = indicator.get_value()
        # Use the value...

2. Initialize Once, Reuse Many Times

# Good: Initialize once
sma = MovingAverageState(period=20)

# Process many data points
for price in large_dataset:
    sma.update(price)
    if sma.is_ready():
        process_signal(sma.get_value())

# Bad: Don't recreate indicators
for price in large_dataset:
    sma = MovingAverageState(period=20)  # Wasteful!
    sma.update(price)

3. Handle Edge Cases

def safe_indicator_update(indicator, value):
    """Safely update indicator with error handling."""
    try:
        if value is not None and not math.isnan(value):
            indicator.update(value)
            return True
    except Exception as e:
        logger.error(f"Error updating indicator: {e}")
    return False

4. Batch Updates for Multiple Indicators

# Update all indicators together
indicators = [sma_20, ema_12, rsi_14]

for price in price_stream:
    # Update all indicators
    for indicator in indicators:
        indicator.update(price)
    
    # Check if all are ready
    if all(ind.is_ready() for ind in indicators):
        # Use all indicator values
        values = [ind.get_value() for ind in indicators]
        process_signals(values)

Performance Characteristics

Memory Usage

  • IndicatorState: O(period) memory usage
  • SimpleIndicatorState: O(1) memory usage
  • OHLCIndicatorState: O(period) memory usage

Processing Speed

  • Update Time: O(1) per data point for all base classes
  • Value Retrieval: O(1) for getting current value
  • Ready Check: O(1) for checking ready state

Scalability

# Memory usage remains constant regardless of data volume
indicator = MovingAverageState(period=20)

# Process 1 million data points - memory usage stays O(20)
for i in range(1_000_000):
    indicator.update(i)
    if indicator.is_ready():
        value = indicator.get_value()  # Always O(1)

Error Handling

Common Patterns

class RobustIndicator(IndicatorState):
    def update(self, value: float):
        try:
            # Validate input
            if value is None or math.isnan(value) or math.isinf(value):
                self.logger.warning(f"Invalid value: {value}")
                return
            
            # Process value
            self._process_value(value)
            self.data_count += 1
            
        except Exception as e:
            self.logger.error(f"Error in indicator update: {e}")
    
    def get_value(self) -> float:
        try:
            if not self.is_ready():
                return 0.0
            return self._calculate_value()
        except Exception as e:
            self.logger.error(f"Error calculating indicator value: {e}")
            return 0.0

Integration with Strategies

Strategy Usage Pattern

class MyStrategy(IncStrategyBase):
    def __init__(self, name: str, params: dict = None):
        super().__init__(name, params)
        
        # Initialize indicators
        self.sma = MovingAverageState(period=20)
        self.rsi = RSIState(period=14)
        self.atr = ATRState(period=14)
    
    def _process_aggregated_data(self, timestamp: int, ohlcv: tuple) -> IncStrategySignal:
        open_price, high, low, close, volume = ohlcv
        
        # Update all indicators
        self.sma.update(close)
        self.rsi.update(close)
        self.atr.update_ohlc(high, low, close)
        
        # Check if all indicators are ready
        if not all([self.sma.is_ready(), self.rsi.is_ready(), self.atr.is_ready()]):
            return IncStrategySignal.HOLD()
        
        # Use indicator values for signal generation
        sma_value = self.sma.get_value()
        rsi_value = self.rsi.get_value()
        atr_value = self.atr.get_value()
        
        # Generate signals based on indicator values
        return self._generate_signal(close, sma_value, rsi_value, atr_value)

The base indicator classes provide a solid foundation for building efficient, real-time indicators that maintain constant memory usage and processing time regardless of data history length.