documentation

This commit is contained in:
Ajasra
2025-05-28 22:37:53 +08:00
parent 1861c336f9
commit 5c6e0598c0
15 changed files with 7537 additions and 25 deletions

View File

@@ -0,0 +1,364 @@
# Base Indicator Classes
## Overview
All indicators in IncrementalTrader are built on a foundation of base classes that provide common functionality for incremental computation. These base classes ensure consistent behavior, memory efficiency, and real-time capability across all indicators.
## Available indicators
- [Moving Averages](moving_averages.md)
- [Volatility](volatility.md) - ATR
- [Trend](trend.md) - Supertrend
- [Oscillators](oscillators.md) - RSI
- [Bollinger Bands](bollinger_bands.md) - Bollinger Bands
## IndicatorState
The foundation class for all indicators in the framework.
### Features
- **Incremental Computation**: O(1) time complexity per update
- **Constant Memory**: O(1) space complexity regardless of data history
- **State Management**: Maintains internal state efficiently
- **Ready State Tracking**: Indicates when indicator has sufficient data
### Class Definition
```python
from IncrementalTrader.strategies.indicators import IndicatorState
class IndicatorState:
def __init__(self, period: int):
self.period = period
self.data_count = 0
def update(self, value: float):
"""Update indicator with new value."""
raise NotImplementedError("Subclasses must implement update method")
def get_value(self) -> float:
"""Get current indicator value."""
raise NotImplementedError("Subclasses must implement get_value method")
def is_ready(self) -> bool:
"""Check if indicator has enough data."""
return self.data_count >= self.period
def reset(self):
"""Reset indicator state."""
self.data_count = 0
```
### Methods
| Method | Description | Returns |
|--------|-------------|---------|
| `update(value: float)` | Update indicator with new value | None |
| `get_value() -> float` | Get current indicator value | float |
| `is_ready() -> bool` | Check if indicator has enough data | bool |
| `reset()` | Reset indicator state | None |
### Usage Example
```python
class MyCustomIndicator(IndicatorState):
def __init__(self, period: int):
super().__init__(period)
self.sum = 0.0
self.values = []
def update(self, value: float):
self.values.append(value)
self.sum += value
if len(self.values) > self.period:
old_value = self.values.pop(0)
self.sum -= old_value
self.data_count += 1
def get_value(self) -> float:
if not self.is_ready():
return 0.0
return self.sum / min(len(self.values), self.period)
# Usage
indicator = MyCustomIndicator(period=10)
for price in [100, 101, 99, 102, 98]:
indicator.update(price)
if indicator.is_ready():
print(f"Value: {indicator.get_value():.2f}")
```
## SimpleIndicatorState
For indicators that only need the current value and don't require a period.
### Features
- **Immediate Ready**: Always ready after first update
- **No Period Requirement**: Doesn't need historical data
- **Minimal State**: Stores only current value
### Class Definition
```python
class SimpleIndicatorState(IndicatorState):
def __init__(self):
super().__init__(period=1)
self.current_value = 0.0
def update(self, value: float):
self.current_value = value
self.data_count = 1 # Always ready
def get_value(self) -> float:
return self.current_value
```
### Usage Example
```python
# Simple price tracker
price_tracker = SimpleIndicatorState()
for price in [100, 101, 99, 102]:
price_tracker.update(price)
print(f"Current price: {price_tracker.get_value():.2f}")
```
## OHLCIndicatorState
For indicators that require OHLC (Open, High, Low, Close) data instead of just a single price value.
### Features
- **OHLC Data Support**: Handles high, low, close data
- **Flexible Updates**: Can update with individual OHLC components
- **Typical Price Calculation**: Built-in typical price (HLC/3) calculation
### Class Definition
```python
class OHLCIndicatorState(IndicatorState):
def __init__(self, period: int):
super().__init__(period)
self.current_high = 0.0
self.current_low = 0.0
self.current_close = 0.0
def update_ohlc(self, high: float, low: float, close: float):
"""Update with OHLC data."""
self.current_high = high
self.current_low = low
self.current_close = close
self._process_ohlc_data(high, low, close)
self.data_count += 1
def _process_ohlc_data(self, high: float, low: float, close: float):
"""Process OHLC data - to be implemented by subclasses."""
raise NotImplementedError("Subclasses must implement _process_ohlc_data")
def get_typical_price(self) -> float:
"""Calculate typical price (HLC/3)."""
return (self.current_high + self.current_low + self.current_close) / 3.0
def get_true_range(self, prev_close: float = None) -> float:
"""Calculate True Range."""
if prev_close is None:
return self.current_high - self.current_low
return max(
self.current_high - self.current_low,
abs(self.current_high - prev_close),
abs(self.current_low - prev_close)
)
```
### Methods
| Method | Description | Returns |
|--------|-------------|---------|
| `update_ohlc(high, low, close)` | Update with OHLC data | None |
| `get_typical_price()` | Get typical price (HLC/3) | float |
| `get_true_range(prev_close)` | Calculate True Range | float |
### Usage Example
```python
class MyOHLCIndicator(OHLCIndicatorState):
def __init__(self, period: int):
super().__init__(period)
self.hl_sum = 0.0
self.count = 0
def _process_ohlc_data(self, high: float, low: float, close: float):
self.hl_sum += (high - low)
self.count += 1
def get_value(self) -> float:
if self.count == 0:
return 0.0
return self.hl_sum / self.count
# Usage
ohlc_indicator = MyOHLCIndicator(period=10)
ohlc_data = [(105, 95, 100), (108, 98, 102), (110, 100, 105)]
for high, low, close in ohlc_data:
ohlc_indicator.update_ohlc(high, low, close)
if ohlc_indicator.is_ready():
print(f"Average Range: {ohlc_indicator.get_value():.2f}")
print(f"Typical Price: {ohlc_indicator.get_typical_price():.2f}")
```
## Best Practices
### 1. Always Check Ready State
```python
indicator = MovingAverageState(period=20)
for price in price_data:
indicator.update(price)
# Always check if ready before using value
if indicator.is_ready():
value = indicator.get_value()
# Use the value...
```
### 2. Initialize Once, Reuse Many Times
```python
# Good: Initialize once
sma = MovingAverageState(period=20)
# Process many data points
for price in large_dataset:
sma.update(price)
if sma.is_ready():
process_signal(sma.get_value())
# Bad: Don't recreate indicators
for price in large_dataset:
sma = MovingAverageState(period=20) # Wasteful!
sma.update(price)
```
### 3. Handle Edge Cases
```python
def safe_indicator_update(indicator, value):
"""Safely update indicator with error handling."""
try:
if value is not None and not math.isnan(value):
indicator.update(value)
return True
except Exception as e:
logger.error(f"Error updating indicator: {e}")
return False
```
### 4. Batch Updates for Multiple Indicators
```python
# Update all indicators together
indicators = [sma_20, ema_12, rsi_14]
for price in price_stream:
# Update all indicators
for indicator in indicators:
indicator.update(price)
# Check if all are ready
if all(ind.is_ready() for ind in indicators):
# Use all indicator values
values = [ind.get_value() for ind in indicators]
process_signals(values)
```
## Performance Characteristics
### Memory Usage
- **IndicatorState**: O(period) memory usage
- **SimpleIndicatorState**: O(1) memory usage
- **OHLCIndicatorState**: O(period) memory usage
### Processing Speed
- **Update Time**: O(1) per data point for all base classes
- **Value Retrieval**: O(1) for getting current value
- **Ready Check**: O(1) for checking ready state
### Scalability
```python
# Memory usage remains constant regardless of data volume
indicator = MovingAverageState(period=20)
# Process 1 million data points - memory usage stays O(20)
for i in range(1_000_000):
indicator.update(i)
if indicator.is_ready():
value = indicator.get_value() # Always O(1)
```
## Error Handling
### Common Patterns
```python
class RobustIndicator(IndicatorState):
def update(self, value: float):
try:
# Validate input
if value is None or math.isnan(value) or math.isinf(value):
self.logger.warning(f"Invalid value: {value}")
return
# Process value
self._process_value(value)
self.data_count += 1
except Exception as e:
self.logger.error(f"Error in indicator update: {e}")
def get_value(self) -> float:
try:
if not self.is_ready():
return 0.0
return self._calculate_value()
except Exception as e:
self.logger.error(f"Error calculating indicator value: {e}")
return 0.0
```
## Integration with Strategies
### Strategy Usage Pattern
```python
class MyStrategy(IncStrategyBase):
def __init__(self, name: str, params: dict = None):
super().__init__(name, params)
# Initialize indicators
self.sma = MovingAverageState(period=20)
self.rsi = RSIState(period=14)
self.atr = ATRState(period=14)
def _process_aggregated_data(self, timestamp: int, ohlcv: tuple) -> IncStrategySignal:
open_price, high, low, close, volume = ohlcv
# Update all indicators
self.sma.update(close)
self.rsi.update(close)
self.atr.update_ohlc(high, low, close)
# Check if all indicators are ready
if not all([self.sma.is_ready(), self.rsi.is_ready(), self.atr.is_ready()]):
return IncStrategySignal.HOLD()
# Use indicator values for signal generation
sma_value = self.sma.get_value()
rsi_value = self.rsi.get_value()
atr_value = self.atr.get_value()
# Generate signals based on indicator values
return self._generate_signal(close, sma_value, rsi_value, atr_value)
```
---
*The base indicator classes provide a solid foundation for building efficient, real-time indicators that maintain constant memory usage and processing time regardless of data history length.*

View File

@@ -0,0 +1,702 @@
# Bollinger Bands Indicators
## Overview
Bollinger Bands are volatility indicators that consist of a moving average (middle band) and two standard deviation bands (upper and lower bands). They help identify overbought/oversold conditions and potential breakout opportunities. IncrementalTrader provides both simple price-based and OHLC-based implementations.
## BollingerBandsState
Standard Bollinger Bands implementation using closing prices and simple moving average.
### Features
- **Three Bands**: Upper, middle (SMA), and lower bands
- **Volatility Measurement**: Bands expand/contract with volatility
- **Mean Reversion Signals**: Price touching bands indicates potential reversal
- **Breakout Detection**: Price breaking through bands signals trend continuation
### Mathematical Formula
```
Middle Band = Simple Moving Average (SMA)
Upper Band = SMA + (Standard Deviation × Multiplier)
Lower Band = SMA - (Standard Deviation × Multiplier)
Standard Deviation = √(Σ(Price - SMA)² / Period)
```
### Class Definition
```python
from IncrementalTrader.strategies.indicators import BollingerBandsState
class BollingerBandsState(IndicatorState):
def __init__(self, period: int, std_dev_multiplier: float = 2.0):
super().__init__(period)
self.std_dev_multiplier = std_dev_multiplier
self.values = []
self.sum = 0.0
self.sum_squares = 0.0
# Band values
self.middle_band = 0.0
self.upper_band = 0.0
self.lower_band = 0.0
def update(self, value: float):
self.values.append(value)
self.sum += value
self.sum_squares += value * value
if len(self.values) > self.period:
old_value = self.values.pop(0)
self.sum -= old_value
self.sum_squares -= old_value * old_value
self.data_count += 1
self._calculate_bands()
def _calculate_bands(self):
if not self.is_ready():
return
n = len(self.values)
# Calculate SMA (middle band)
self.middle_band = self.sum / n
# Calculate standard deviation
variance = (self.sum_squares / n) - (self.middle_band * self.middle_band)
std_dev = math.sqrt(max(variance, 0))
# Calculate upper and lower bands
band_width = std_dev * self.std_dev_multiplier
self.upper_band = self.middle_band + band_width
self.lower_band = self.middle_band - band_width
def get_value(self) -> float:
"""Returns middle band (SMA) value."""
return self.middle_band
def get_upper_band(self) -> float:
return self.upper_band
def get_lower_band(self) -> float:
return self.lower_band
def get_middle_band(self) -> float:
return self.middle_band
def get_band_width(self) -> float:
"""Get the width between upper and lower bands."""
return self.upper_band - self.lower_band
def get_percent_b(self, price: float) -> float:
"""Calculate %B: position of price within the bands."""
if self.get_band_width() == 0:
return 0.5
return (price - self.lower_band) / self.get_band_width()
```
### Usage Examples
#### Basic Bollinger Bands Usage
```python
# Create 20-period Bollinger Bands with 2.0 standard deviation
bb = BollingerBandsState(period=20, std_dev_multiplier=2.0)
# Price data
prices = [100, 101, 99, 102, 98, 103, 97, 104, 96, 105, 95, 106, 94, 107, 93]
for price in prices:
bb.update(price)
if bb.is_ready():
print(f"Price: {price:.2f}")
print(f" Upper: {bb.get_upper_band():.2f}")
print(f" Middle: {bb.get_middle_band():.2f}")
print(f" Lower: {bb.get_lower_band():.2f}")
print(f" %B: {bb.get_percent_b(price):.2f}")
print(f" Width: {bb.get_band_width():.2f}")
```
#### Bollinger Bands Trading Signals
```python
class BollingerBandsSignals:
def __init__(self, period: int = 20, std_dev: float = 2.0):
self.bb = BollingerBandsState(period, std_dev)
self.previous_price = None
self.previous_percent_b = None
def update(self, price: float):
self.bb.update(price)
self.previous_price = price
def get_mean_reversion_signal(self, current_price: float) -> str:
"""Get mean reversion signals based on band touches."""
if not self.bb.is_ready():
return "HOLD"
percent_b = self.bb.get_percent_b(current_price)
# Oversold: price near or below lower band
if percent_b <= 0.1:
return "BUY"
# Overbought: price near or above upper band
elif percent_b >= 0.9:
return "SELL"
# Return to middle: exit positions
elif 0.4 <= percent_b <= 0.6:
return "EXIT"
return "HOLD"
def get_breakout_signal(self, current_price: float) -> str:
"""Get breakout signals based on band penetration."""
if not self.bb.is_ready() or self.previous_price is None:
return "HOLD"
upper_band = self.bb.get_upper_band()
lower_band = self.bb.get_lower_band()
# Bullish breakout: price breaks above upper band
if self.previous_price <= upper_band and current_price > upper_band:
return "BUY_BREAKOUT"
# Bearish breakout: price breaks below lower band
elif self.previous_price >= lower_band and current_price < lower_band:
return "SELL_BREAKOUT"
return "HOLD"
def get_squeeze_condition(self) -> bool:
"""Detect Bollinger Band squeeze (low volatility)."""
if not self.bb.is_ready():
return False
# Simple squeeze detection: band width below threshold
# You might want to compare with historical band width
band_width = self.bb.get_band_width()
middle_band = self.bb.get_middle_band()
# Squeeze when band width is less than 4% of middle band
return (band_width / middle_band) < 0.04
# Usage
bb_signals = BollingerBandsSignals(period=20, std_dev=2.0)
for price in prices:
bb_signals.update(price)
mean_reversion = bb_signals.get_mean_reversion_signal(price)
breakout = bb_signals.get_breakout_signal(price)
squeeze = bb_signals.get_squeeze_condition()
if mean_reversion != "HOLD":
print(f"Mean Reversion Signal: {mean_reversion} at {price:.2f}")
if breakout != "HOLD":
print(f"Breakout Signal: {breakout} at {price:.2f}")
if squeeze:
print(f"Bollinger Band Squeeze detected at {price:.2f}")
```
### Performance Characteristics
- **Time Complexity**: O(1) per update (after initial period)
- **Space Complexity**: O(period)
- **Memory Usage**: ~8 bytes per period + constant overhead
## BollingerBandsOHLCState
OHLC-based Bollinger Bands implementation using typical price (HLC/3) for more accurate volatility measurement.
### Features
- **OHLC Data Support**: Uses high, low, close for typical price calculation
- **Better Volatility Measurement**: More accurate than close-only bands
- **Intraday Analysis**: Accounts for intraday price action
- **Enhanced Signals**: More reliable signals due to complete price information
### Mathematical Formula
```
Typical Price = (High + Low + Close) / 3
Middle Band = SMA(Typical Price)
Upper Band = Middle Band + (Standard Deviation × Multiplier)
Lower Band = Middle Band - (Standard Deviation × Multiplier)
```
### Class Definition
```python
class BollingerBandsOHLCState(OHLCIndicatorState):
def __init__(self, period: int, std_dev_multiplier: float = 2.0):
super().__init__(period)
self.std_dev_multiplier = std_dev_multiplier
self.typical_prices = []
self.sum = 0.0
self.sum_squares = 0.0
# Band values
self.middle_band = 0.0
self.upper_band = 0.0
self.lower_band = 0.0
def _process_ohlc_data(self, high: float, low: float, close: float):
# Calculate typical price
typical_price = (high + low + close) / 3.0
self.typical_prices.append(typical_price)
self.sum += typical_price
self.sum_squares += typical_price * typical_price
if len(self.typical_prices) > self.period:
old_price = self.typical_prices.pop(0)
self.sum -= old_price
self.sum_squares -= old_price * old_price
self._calculate_bands()
def _calculate_bands(self):
if not self.is_ready():
return
n = len(self.typical_prices)
# Calculate SMA (middle band)
self.middle_band = self.sum / n
# Calculate standard deviation
variance = (self.sum_squares / n) - (self.middle_band * self.middle_band)
std_dev = math.sqrt(max(variance, 0))
# Calculate upper and lower bands
band_width = std_dev * self.std_dev_multiplier
self.upper_band = self.middle_band + band_width
self.lower_band = self.middle_band - band_width
def get_value(self) -> float:
"""Returns middle band (SMA) value."""
return self.middle_band
def get_upper_band(self) -> float:
return self.upper_band
def get_lower_band(self) -> float:
return self.lower_band
def get_middle_band(self) -> float:
return self.middle_band
def get_band_width(self) -> float:
return self.upper_band - self.lower_band
def get_percent_b_ohlc(self, high: float, low: float, close: float) -> float:
"""Calculate %B using OHLC data."""
typical_price = (high + low + close) / 3.0
if self.get_band_width() == 0:
return 0.5
return (typical_price - self.lower_band) / self.get_band_width()
```
### Usage Examples
#### OHLC Bollinger Bands Analysis
```python
# Create OHLC-based Bollinger Bands
bb_ohlc = BollingerBandsOHLCState(period=20, std_dev_multiplier=2.0)
# OHLC data: (high, low, close)
ohlc_data = [
(105.0, 102.0, 104.0),
(106.0, 103.0, 105.5),
(107.0, 104.0, 106.0),
(108.0, 105.0, 107.5),
(109.0, 106.0, 108.0)
]
for high, low, close in ohlc_data:
bb_ohlc.update_ohlc(high, low, close)
if bb_ohlc.is_ready():
typical_price = (high + low + close) / 3.0
percent_b = bb_ohlc.get_percent_b_ohlc(high, low, close)
print(f"OHLC: H={high:.2f}, L={low:.2f}, C={close:.2f}")
print(f" Typical Price: {typical_price:.2f}")
print(f" Upper: {bb_ohlc.get_upper_band():.2f}")
print(f" Middle: {bb_ohlc.get_middle_band():.2f}")
print(f" Lower: {bb_ohlc.get_lower_band():.2f}")
print(f" %B: {percent_b:.2f}")
```
#### Advanced OHLC Bollinger Bands Strategy
```python
class OHLCBollingerStrategy:
def __init__(self, period: int = 20, std_dev: float = 2.0):
self.bb = BollingerBandsOHLCState(period, std_dev)
self.previous_ohlc = None
def update(self, high: float, low: float, close: float):
self.bb.update_ohlc(high, low, close)
self.previous_ohlc = (high, low, close)
def analyze_candle_position(self, high: float, low: float, close: float) -> dict:
"""Analyze candle position relative to Bollinger Bands."""
if not self.bb.is_ready():
return {"analysis": "NOT_READY"}
upper_band = self.bb.get_upper_band()
lower_band = self.bb.get_lower_band()
middle_band = self.bb.get_middle_band()
# Analyze different price levels
analysis = {
"high_above_upper": high > upper_band,
"low_below_lower": low < lower_band,
"close_above_middle": close > middle_band,
"body_outside_bands": high > upper_band and low < lower_band,
"squeeze_breakout": False,
"signal": "HOLD"
}
# Detect squeeze breakout
band_width = self.bb.get_band_width()
if band_width / middle_band < 0.03: # Very narrow bands
if high > upper_band:
analysis["squeeze_breakout"] = True
analysis["signal"] = "BUY_BREAKOUT"
elif low < lower_band:
analysis["squeeze_breakout"] = True
analysis["signal"] = "SELL_BREAKOUT"
# Mean reversion signals
percent_b = self.bb.get_percent_b_ohlc(high, low, close)
if percent_b <= 0.1 and close > low: # Bounce from lower band
analysis["signal"] = "BUY_BOUNCE"
elif percent_b >= 0.9 and close < high: # Rejection from upper band
analysis["signal"] = "SELL_REJECTION"
return analysis
def get_support_resistance_levels(self) -> dict:
"""Get dynamic support and resistance levels."""
if not self.bb.is_ready():
return {}
return {
"resistance": self.bb.get_upper_band(),
"support": self.bb.get_lower_band(),
"pivot": self.bb.get_middle_band(),
"band_width": self.bb.get_band_width()
}
# Usage
ohlc_strategy = OHLCBollingerStrategy(period=20, std_dev=2.0)
for high, low, close in ohlc_data:
ohlc_strategy.update(high, low, close)
analysis = ohlc_strategy.analyze_candle_position(high, low, close)
levels = ohlc_strategy.get_support_resistance_levels()
if analysis.get("signal") != "HOLD":
print(f"Signal: {analysis['signal']}")
print(f"Analysis: {analysis}")
print(f"S/R Levels: {levels}")
```
### Performance Characteristics
- **Time Complexity**: O(1) per update (after initial period)
- **Space Complexity**: O(period)
- **Memory Usage**: ~8 bytes per period + constant overhead
## Comparison: BollingerBandsState vs BollingerBandsOHLCState
| Aspect | BollingerBandsState | BollingerBandsOHLCState |
|--------|---------------------|-------------------------|
| **Input Data** | Close prices only | High, Low, Close |
| **Calculation Base** | Close price | Typical price (HLC/3) |
| **Accuracy** | Good for trends | Better for volatility |
| **Signal Quality** | Standard | Enhanced |
| **Data Requirements** | Minimal | Complete OHLC |
### When to Use BollingerBandsState
- **Simple Analysis**: When only closing prices are available
- **Trend Following**: For basic trend and mean reversion analysis
- **Memory Efficiency**: When OHLC data is not necessary
- **Quick Implementation**: For rapid prototyping and testing
### When to Use BollingerBandsOHLCState
- **Complete Analysis**: When full OHLC data is available
- **Volatility Trading**: For more accurate volatility measurement
- **Intraday Trading**: When intraday price action matters
- **Professional Trading**: For more sophisticated trading strategies
## Advanced Usage Patterns
### Multi-Timeframe Bollinger Bands
```python
class MultiBollingerBands:
def __init__(self):
self.bb_short = BollingerBandsState(period=10, std_dev_multiplier=2.0)
self.bb_medium = BollingerBandsState(period=20, std_dev_multiplier=2.0)
self.bb_long = BollingerBandsState(period=50, std_dev_multiplier=2.0)
def update(self, price: float):
self.bb_short.update(price)
self.bb_medium.update(price)
self.bb_long.update(price)
def get_volatility_regime(self) -> str:
"""Determine volatility regime across timeframes."""
if not all([self.bb_short.is_ready(), self.bb_medium.is_ready(), self.bb_long.is_ready()]):
return "UNKNOWN"
# Compare band widths
short_width = self.bb_short.get_band_width() / self.bb_short.get_middle_band()
medium_width = self.bb_medium.get_band_width() / self.bb_medium.get_middle_band()
long_width = self.bb_long.get_band_width() / self.bb_long.get_middle_band()
avg_width = (short_width + medium_width + long_width) / 3
if avg_width > 0.08:
return "HIGH_VOLATILITY"
elif avg_width < 0.03:
return "LOW_VOLATILITY"
else:
return "NORMAL_VOLATILITY"
def get_trend_alignment(self, price: float) -> str:
"""Check trend alignment across timeframes."""
if not all([self.bb_short.is_ready(), self.bb_medium.is_ready(), self.bb_long.is_ready()]):
return "UNKNOWN"
# Check position relative to middle bands
above_short = price > self.bb_short.get_middle_band()
above_medium = price > self.bb_medium.get_middle_band()
above_long = price > self.bb_long.get_middle_band()
if all([above_short, above_medium, above_long]):
return "STRONG_BULLISH"
elif not any([above_short, above_medium, above_long]):
return "STRONG_BEARISH"
elif above_short and above_medium:
return "BULLISH"
elif not above_short and not above_medium:
return "BEARISH"
else:
return "MIXED"
# Usage
multi_bb = MultiBollingerBands()
for price in prices:
multi_bb.update(price)
volatility_regime = multi_bb.get_volatility_regime()
trend_alignment = multi_bb.get_trend_alignment(price)
print(f"Price: {price:.2f}, Volatility: {volatility_regime}, Trend: {trend_alignment}")
```
### Bollinger Bands with RSI Confluence
```python
class BollingerRSIStrategy:
def __init__(self, bb_period: int = 20, rsi_period: int = 14):
self.bb = BollingerBandsState(bb_period, 2.0)
self.rsi = SimpleRSIState(rsi_period)
def update(self, price: float):
self.bb.update(price)
self.rsi.update(price)
def get_confluence_signal(self, price: float) -> dict:
"""Get signals based on Bollinger Bands and RSI confluence."""
if not (self.bb.is_ready() and self.rsi.is_ready()):
return {"signal": "HOLD", "confidence": 0.0}
percent_b = self.bb.get_percent_b(price)
rsi_value = self.rsi.get_value()
# Bullish confluence: oversold RSI + lower band touch
if percent_b <= 0.1 and rsi_value <= 30:
confidence = min(0.9, (30 - rsi_value) / 20 + (0.1 - percent_b) * 5)
return {
"signal": "BUY",
"confidence": confidence,
"reason": "oversold_confluence",
"percent_b": percent_b,
"rsi": rsi_value
}
# Bearish confluence: overbought RSI + upper band touch
elif percent_b >= 0.9 and rsi_value >= 70:
confidence = min(0.9, (rsi_value - 70) / 20 + (percent_b - 0.9) * 5)
return {
"signal": "SELL",
"confidence": confidence,
"reason": "overbought_confluence",
"percent_b": percent_b,
"rsi": rsi_value
}
# Exit signals: return to middle
elif 0.4 <= percent_b <= 0.6 and 40 <= rsi_value <= 60:
return {
"signal": "EXIT",
"confidence": 0.5,
"reason": "return_to_neutral",
"percent_b": percent_b,
"rsi": rsi_value
}
return {"signal": "HOLD", "confidence": 0.0}
# Usage
bb_rsi_strategy = BollingerRSIStrategy(bb_period=20, rsi_period=14)
for price in prices:
bb_rsi_strategy.update(price)
signal_info = bb_rsi_strategy.get_confluence_signal(price)
if signal_info["signal"] != "HOLD":
print(f"Confluence Signal: {signal_info['signal']}")
print(f" Confidence: {signal_info['confidence']:.2f}")
print(f" Reason: {signal_info['reason']}")
print(f" %B: {signal_info.get('percent_b', 0):.2f}")
print(f" RSI: {signal_info.get('rsi', 0):.1f}")
```
## Integration with Strategies
### Bollinger Bands Mean Reversion Strategy
```python
class BollingerMeanReversionStrategy(IncStrategyBase):
def __init__(self, name: str, params: dict = None):
super().__init__(name, params)
# Initialize Bollinger Bands
bb_period = self.params.get('bb_period', 20)
bb_std_dev = self.params.get('bb_std_dev', 2.0)
self.bb = BollingerBandsOHLCState(bb_period, bb_std_dev)
# Strategy parameters
self.entry_threshold = self.params.get('entry_threshold', 0.1) # %B threshold
self.exit_threshold = self.params.get('exit_threshold', 0.5) # Return to middle
# State tracking
self.position_type = None
def _process_aggregated_data(self, timestamp: int, ohlcv: tuple) -> IncStrategySignal:
open_price, high, low, close, volume = ohlcv
# Update Bollinger Bands
self.bb.update_ohlc(high, low, close)
# Wait for indicator to be ready
if not self.bb.is_ready():
return IncStrategySignal.HOLD()
# Calculate %B
percent_b = self.bb.get_percent_b_ohlc(high, low, close)
band_width = self.bb.get_band_width()
middle_band = self.bb.get_middle_band()
# Entry signals
if percent_b <= self.entry_threshold and self.position_type != "LONG":
# Oversold condition - buy signal
confidence = min(0.9, (self.entry_threshold - percent_b) * 5)
self.position_type = "LONG"
return IncStrategySignal.BUY(
confidence=confidence,
metadata={
'percent_b': percent_b,
'band_width': band_width,
'signal_type': 'mean_reversion_buy',
'upper_band': self.bb.get_upper_band(),
'lower_band': self.bb.get_lower_band()
}
)
elif percent_b >= (1.0 - self.entry_threshold) and self.position_type != "SHORT":
# Overbought condition - sell signal
confidence = min(0.9, (percent_b - (1.0 - self.entry_threshold)) * 5)
self.position_type = "SHORT"
return IncStrategySignal.SELL(
confidence=confidence,
metadata={
'percent_b': percent_b,
'band_width': band_width,
'signal_type': 'mean_reversion_sell',
'upper_band': self.bb.get_upper_band(),
'lower_band': self.bb.get_lower_band()
}
)
# Exit signals
elif abs(percent_b - 0.5) <= (0.5 - self.exit_threshold):
# Return to middle - exit position
if self.position_type is not None:
exit_signal = IncStrategySignal.SELL() if self.position_type == "LONG" else IncStrategySignal.BUY()
exit_signal.confidence = 0.6
exit_signal.metadata = {
'percent_b': percent_b,
'signal_type': 'mean_reversion_exit',
'previous_position': self.position_type
}
self.position_type = None
return exit_signal
return IncStrategySignal.HOLD()
```
## Performance Optimization Tips
### 1. Choose the Right Implementation
```python
# For simple price analysis
bb = BollingerBandsState(period=20, std_dev_multiplier=2.0)
# For comprehensive OHLC analysis
bb_ohlc = BollingerBandsOHLCState(period=20, std_dev_multiplier=2.0)
```
### 2. Optimize Standard Deviation Calculation
```python
# Use incremental variance calculation for better performance
def incremental_variance(sum_val: float, sum_squares: float, count: int, mean: float) -> float:
"""Calculate variance incrementally."""
if count == 0:
return 0.0
return max(0.0, (sum_squares / count) - (mean * mean))
```
### 3. Cache Band Values for Multiple Calculations
```python
class CachedBollingerBands:
def __init__(self, period: int, std_dev: float = 2.0):
self.bb = BollingerBandsState(period, std_dev)
self._cached_bands = None
self._cache_valid = False
def update(self, price: float):
self.bb.update(price)
self._cache_valid = False
def get_bands(self) -> tuple:
if not self._cache_valid:
self._cached_bands = (
self.bb.get_upper_band(),
self.bb.get_middle_band(),
self.bb.get_lower_band()
)
self._cache_valid = True
return self._cached_bands
```
---
*Bollinger Bands are versatile indicators for volatility analysis and mean reversion trading. Use BollingerBandsState for simple price analysis or BollingerBandsOHLCState for comprehensive volatility measurement with complete OHLC data.*

View File

@@ -0,0 +1,404 @@
# Moving Average Indicators
## Overview
Moving averages are fundamental trend-following indicators that smooth price data by creating a constantly updated average price. IncrementalTrader provides both Simple Moving Average (SMA) and Exponential Moving Average (EMA) implementations with O(1) time complexity.
## MovingAverageState (SMA)
Simple Moving Average that maintains a rolling window of prices.
### Features
- **O(1) Updates**: Constant time complexity per update
- **Memory Efficient**: Only stores necessary data points
- **Real-time Ready**: Immediate calculation without historical data dependency
### Mathematical Formula
```
SMA = (P₁ + P₂ + ... + Pₙ) / n
Where:
- P₁, P₂, ..., Pₙ are the last n price values
- n is the period
```
### Class Definition
```python
from IncrementalTrader.strategies.indicators import MovingAverageState
class MovingAverageState(IndicatorState):
def __init__(self, period: int):
super().__init__(period)
self.values = []
self.sum = 0.0
def update(self, value: float):
self.values.append(value)
self.sum += value
if len(self.values) > self.period:
old_value = self.values.pop(0)
self.sum -= old_value
self.data_count += 1
def get_value(self) -> float:
if not self.is_ready():
return 0.0
return self.sum / len(self.values)
```
### Usage Examples
#### Basic Usage
```python
# Create 20-period SMA
sma_20 = MovingAverageState(period=20)
# Update with price data
prices = [100, 101, 99, 102, 98, 103, 97, 104]
for price in prices:
sma_20.update(price)
if sma_20.is_ready():
print(f"SMA(20): {sma_20.get_value():.2f}")
```
#### Multiple Timeframes
```python
# Different period SMAs
sma_10 = MovingAverageState(period=10)
sma_20 = MovingAverageState(period=20)
sma_50 = MovingAverageState(period=50)
for price in price_stream:
# Update all SMAs
sma_10.update(price)
sma_20.update(price)
sma_50.update(price)
# Check for golden cross (SMA10 > SMA20)
if all([sma_10.is_ready(), sma_20.is_ready()]):
if sma_10.get_value() > sma_20.get_value():
print("Golden Cross detected!")
```
### Performance Characteristics
- **Time Complexity**: O(1) per update
- **Space Complexity**: O(period)
- **Memory Usage**: ~8 bytes per period (for float values)
## ExponentialMovingAverageState (EMA)
Exponential Moving Average that gives more weight to recent prices.
### Features
- **Exponential Weighting**: Recent prices have more influence
- **O(1) Memory**: Only stores current EMA value and multiplier
- **Responsive**: Reacts faster to price changes than SMA
### Mathematical Formula
```
EMA = (Price × α) + (Previous_EMA × (1 - α))
Where:
- α = 2 / (period + 1) (smoothing factor)
- Price is the current price
- Previous_EMA is the previous EMA value
```
### Class Definition
```python
class ExponentialMovingAverageState(IndicatorState):
def __init__(self, period: int):
super().__init__(period)
self.multiplier = 2.0 / (period + 1)
self.ema_value = 0.0
self.is_first_value = True
def update(self, value: float):
if self.is_first_value:
self.ema_value = value
self.is_first_value = False
else:
self.ema_value = (value * self.multiplier) + (self.ema_value * (1 - self.multiplier))
self.data_count += 1
def get_value(self) -> float:
return self.ema_value
```
### Usage Examples
#### Basic Usage
```python
# Create 12-period EMA
ema_12 = ExponentialMovingAverageState(period=12)
# Update with price data
for price in price_data:
ema_12.update(price)
print(f"EMA(12): {ema_12.get_value():.2f}")
```
#### MACD Calculation
```python
# MACD uses EMA12 and EMA26
ema_12 = ExponentialMovingAverageState(period=12)
ema_26 = ExponentialMovingAverageState(period=26)
macd_values = []
for price in price_data:
ema_12.update(price)
ema_26.update(price)
if ema_26.is_ready(): # EMA26 takes longer to be ready
macd = ema_12.get_value() - ema_26.get_value()
macd_values.append(macd)
print(f"MACD: {macd:.4f}")
```
### Performance Characteristics
- **Time Complexity**: O(1) per update
- **Space Complexity**: O(1)
- **Memory Usage**: ~24 bytes (constant)
## Comparison: SMA vs EMA
| Aspect | SMA | EMA |
|--------|-----|-----|
| **Responsiveness** | Slower | Faster |
| **Memory Usage** | O(period) | O(1) |
| **Smoothness** | Smoother | More volatile |
| **Lag** | Higher lag | Lower lag |
| **Noise Filtering** | Better | Moderate |
### When to Use SMA
- **Trend Identification**: Better for identifying long-term trends
- **Support/Resistance**: More reliable for support and resistance levels
- **Noise Reduction**: Better at filtering out market noise
- **Memory Constraints**: When memory usage is not a concern
### When to Use EMA
- **Quick Signals**: When you need faster response to price changes
- **Memory Efficiency**: When memory usage is critical
- **Short-term Trading**: Better for short-term trading strategies
- **Real-time Systems**: Ideal for high-frequency trading systems
## Advanced Usage Patterns
### Moving Average Crossover Strategy
```python
class MovingAverageCrossover:
def __init__(self, fast_period: int, slow_period: int):
self.fast_ma = MovingAverageState(fast_period)
self.slow_ma = MovingAverageState(slow_period)
self.previous_fast = 0.0
self.previous_slow = 0.0
def update(self, price: float):
self.previous_fast = self.fast_ma.get_value() if self.fast_ma.is_ready() else 0.0
self.previous_slow = self.slow_ma.get_value() if self.slow_ma.is_ready() else 0.0
self.fast_ma.update(price)
self.slow_ma.update(price)
def get_signal(self) -> str:
if not (self.fast_ma.is_ready() and self.slow_ma.is_ready()):
return "HOLD"
current_fast = self.fast_ma.get_value()
current_slow = self.slow_ma.get_value()
# Golden Cross: Fast MA crosses above Slow MA
if self.previous_fast <= self.previous_slow and current_fast > current_slow:
return "BUY"
# Death Cross: Fast MA crosses below Slow MA
if self.previous_fast >= self.previous_slow and current_fast < current_slow:
return "SELL"
return "HOLD"
# Usage
crossover = MovingAverageCrossover(fast_period=10, slow_period=20)
for price in price_stream:
crossover.update(price)
signal = crossover.get_signal()
if signal != "HOLD":
print(f"Signal: {signal} at price {price}")
```
### Adaptive Moving Average
```python
class AdaptiveMovingAverage:
def __init__(self, min_period: int = 5, max_period: int = 50):
self.min_period = min_period
self.max_period = max_period
self.sma_fast = MovingAverageState(min_period)
self.sma_slow = MovingAverageState(max_period)
self.current_ma = MovingAverageState(min_period)
def update(self, price: float):
self.sma_fast.update(price)
self.sma_slow.update(price)
if self.sma_slow.is_ready():
# Calculate volatility-based period
volatility = abs(self.sma_fast.get_value() - self.sma_slow.get_value())
normalized_vol = min(volatility / price, 0.1) # Cap at 10%
# Adjust period based on volatility
adaptive_period = int(self.min_period + (normalized_vol * (self.max_period - self.min_period)))
# Update current MA with adaptive period
if adaptive_period != self.current_ma.period:
self.current_ma = MovingAverageState(adaptive_period)
self.current_ma.update(price)
def get_value(self) -> float:
return self.current_ma.get_value()
def is_ready(self) -> bool:
return self.current_ma.is_ready()
```
## Error Handling and Edge Cases
### Robust Implementation
```python
class RobustMovingAverage(MovingAverageState):
def __init__(self, period: int):
if period <= 0:
raise ValueError("Period must be positive")
super().__init__(period)
def update(self, value: float):
# Validate input
if value is None:
self.logger.warning("Received None value, skipping update")
return
if math.isnan(value) or math.isinf(value):
self.logger.warning(f"Received invalid value: {value}, skipping update")
return
try:
super().update(value)
except Exception as e:
self.logger.error(f"Error updating moving average: {e}")
def get_value(self) -> float:
try:
return super().get_value()
except Exception as e:
self.logger.error(f"Error getting moving average value: {e}")
return 0.0
```
### Handling Missing Data
```python
def update_with_gap_handling(ma: MovingAverageState, value: float, timestamp: int, last_timestamp: int):
"""Update moving average with gap handling for missing data."""
# Define maximum acceptable gap (e.g., 5 minutes)
max_gap = 5 * 60 * 1000 # 5 minutes in milliseconds
if last_timestamp and (timestamp - last_timestamp) > max_gap:
# Large gap detected - reset the moving average
ma.reset()
print(f"Gap detected, resetting moving average")
ma.update(value)
```
## Integration with Strategies
### Strategy Implementation Example
```python
class MovingAverageStrategy(IncStrategyBase):
def __init__(self, name: str, params: dict = None):
super().__init__(name, params)
# Initialize moving averages
self.sma_short = MovingAverageState(self.params.get('short_period', 10))
self.sma_long = MovingAverageState(self.params.get('long_period', 20))
self.ema_signal = ExponentialMovingAverageState(self.params.get('signal_period', 5))
def _process_aggregated_data(self, timestamp: int, ohlcv: tuple) -> IncStrategySignal:
open_price, high, low, close, volume = ohlcv
# Update all moving averages
self.sma_short.update(close)
self.sma_long.update(close)
self.ema_signal.update(close)
# Wait for all indicators to be ready
if not all([self.sma_short.is_ready(), self.sma_long.is_ready(), self.ema_signal.is_ready()]):
return IncStrategySignal.HOLD()
# Get current values
sma_short_val = self.sma_short.get_value()
sma_long_val = self.sma_long.get_value()
ema_signal_val = self.ema_signal.get_value()
# Generate signals
if sma_short_val > sma_long_val and close > ema_signal_val:
confidence = min(0.9, (sma_short_val - sma_long_val) / sma_long_val * 10)
return IncStrategySignal.BUY(confidence=confidence)
elif sma_short_val < sma_long_val and close < ema_signal_val:
confidence = min(0.9, (sma_long_val - sma_short_val) / sma_long_val * 10)
return IncStrategySignal.SELL(confidence=confidence)
return IncStrategySignal.HOLD()
```
## Performance Optimization Tips
### 1. Choose the Right Moving Average
```python
# For memory-constrained environments
ema = ExponentialMovingAverageState(period=20) # O(1) memory
# For better smoothing and trend identification
sma = MovingAverageState(period=20) # O(period) memory
```
### 2. Batch Processing
```python
# Process multiple prices efficiently
def batch_update_moving_averages(mas: list, prices: list):
for price in prices:
for ma in mas:
ma.update(price)
# Return all values at once
return [ma.get_value() for ma in mas if ma.is_ready()]
```
### 3. Avoid Unnecessary Calculations
```python
# Cache ready state to avoid repeated checks
class CachedMovingAverage(MovingAverageState):
def __init__(self, period: int):
super().__init__(period)
self._is_ready_cached = False
def update(self, value: float):
super().update(value)
if not self._is_ready_cached:
self._is_ready_cached = self.data_count >= self.period
def is_ready(self) -> bool:
return self._is_ready_cached
```
---
*Moving averages are the foundation of many trading strategies. Choose SMA for smoother, more reliable signals, or EMA for faster response to price changes.*

View File

@@ -0,0 +1,615 @@
# Oscillator Indicators
## Overview
Oscillator indicators help identify overbought and oversold conditions in the market. IncrementalTrader provides RSI (Relative Strength Index) implementations that measure the speed and magnitude of price changes.
## RSIState
Full RSI implementation using Wilder's smoothing method for accurate calculation.
### Features
- **Wilder's Smoothing**: Uses the traditional RSI calculation method
- **Overbought/Oversold**: Clear signals for market extremes
- **Momentum Measurement**: Indicates price momentum strength
- **Divergence Detection**: Helps identify potential trend reversals
### Mathematical Formula
```
RS = Average Gain / Average Loss
RSI = 100 - (100 / (1 + RS))
Where:
- Average Gain = Wilder's smoothing of positive price changes
- Average Loss = Wilder's smoothing of negative price changes
- Wilder's smoothing: ((previous_average × (period - 1)) + current_value) / period
```
### Class Definition
```python
from IncrementalTrader.strategies.indicators import RSIState
class RSIState(IndicatorState):
def __init__(self, period: int):
super().__init__(period)
self.gains = []
self.losses = []
self.avg_gain = 0.0
self.avg_loss = 0.0
self.previous_close = None
self.is_first_calculation = True
def update(self, value: float):
if self.previous_close is not None:
change = value - self.previous_close
gain = max(change, 0.0)
loss = max(-change, 0.0)
if self.is_first_calculation and len(self.gains) >= self.period:
# Initial calculation using simple average
self.avg_gain = sum(self.gains[-self.period:]) / self.period
self.avg_loss = sum(self.losses[-self.period:]) / self.period
self.is_first_calculation = False
elif not self.is_first_calculation:
# Wilder's smoothing
self.avg_gain = ((self.avg_gain * (self.period - 1)) + gain) / self.period
self.avg_loss = ((self.avg_loss * (self.period - 1)) + loss) / self.period
self.gains.append(gain)
self.losses.append(loss)
# Keep only necessary history
if len(self.gains) > self.period:
self.gains.pop(0)
self.losses.pop(0)
self.previous_close = value
self.data_count += 1
def get_value(self) -> float:
if not self.is_ready() or self.avg_loss == 0:
return 50.0 # Neutral RSI
rs = self.avg_gain / self.avg_loss
rsi = 100.0 - (100.0 / (1.0 + rs))
return rsi
def is_ready(self) -> bool:
return self.data_count > self.period and not self.is_first_calculation
```
### Usage Examples
#### Basic RSI Usage
```python
# Create 14-period RSI
rsi_14 = RSIState(period=14)
# Price data
prices = [44, 44.34, 44.09, 44.15, 43.61, 44.33, 44.83, 45.85, 47.25, 47.92, 46.23, 44.18, 46.57, 46.61, 46.5]
for price in prices:
rsi_14.update(price)
if rsi_14.is_ready():
rsi_value = rsi_14.get_value()
print(f"Price: {price:.2f}, RSI(14): {rsi_value:.2f}")
```
#### RSI Trading Signals
```python
class RSISignals:
def __init__(self, period: int = 14, overbought: float = 70.0, oversold: float = 30.0):
self.rsi = RSIState(period)
self.overbought = overbought
self.oversold = oversold
self.previous_rsi = None
def update(self, price: float):
self.rsi.update(price)
def get_signal(self) -> str:
if not self.rsi.is_ready():
return "HOLD"
current_rsi = self.rsi.get_value()
# Oversold bounce signal
if (self.previous_rsi is not None and
self.previous_rsi <= self.oversold and
current_rsi > self.oversold):
signal = "BUY"
# Overbought pullback signal
elif (self.previous_rsi is not None and
self.previous_rsi >= self.overbought and
current_rsi < self.overbought):
signal = "SELL"
else:
signal = "HOLD"
self.previous_rsi = current_rsi
return signal
def get_condition(self) -> str:
"""Get current market condition based on RSI."""
if not self.rsi.is_ready():
return "UNKNOWN"
rsi_value = self.rsi.get_value()
if rsi_value >= self.overbought:
return "OVERBOUGHT"
elif rsi_value <= self.oversold:
return "OVERSOLD"
else:
return "NEUTRAL"
# Usage
rsi_signals = RSISignals(period=14, overbought=70, oversold=30)
for price in prices:
rsi_signals.update(price)
signal = rsi_signals.get_signal()
condition = rsi_signals.get_condition()
if signal != "HOLD":
print(f"RSI Signal: {signal}, Condition: {condition}, Price: {price:.2f}")
```
### Performance Characteristics
- **Time Complexity**: O(1) per update (after initial period)
- **Space Complexity**: O(period)
- **Memory Usage**: ~16 bytes per period + constant overhead
## SimpleRSIState
Simplified RSI implementation using exponential smoothing for memory efficiency.
### Features
- **O(1) Memory**: Constant memory usage regardless of period
- **Exponential Smoothing**: Uses EMA-based calculation
- **Fast Computation**: No need to maintain gain/loss history
- **Approximate RSI**: Close approximation to traditional RSI
### Mathematical Formula
```
Gain = max(price_change, 0)
Loss = max(-price_change, 0)
EMA_Gain = EMA(Gain, period)
EMA_Loss = EMA(Loss, period)
RSI = 100 - (100 / (1 + EMA_Gain / EMA_Loss))
```
### Class Definition
```python
class SimpleRSIState(IndicatorState):
def __init__(self, period: int):
super().__init__(period)
self.alpha = 2.0 / (period + 1)
self.ema_gain = 0.0
self.ema_loss = 0.0
self.previous_close = None
self.is_first_value = True
def update(self, value: float):
if self.previous_close is not None:
change = value - self.previous_close
gain = max(change, 0.0)
loss = max(-change, 0.0)
if self.is_first_value:
self.ema_gain = gain
self.ema_loss = loss
self.is_first_value = False
else:
self.ema_gain = (gain * self.alpha) + (self.ema_gain * (1 - self.alpha))
self.ema_loss = (loss * self.alpha) + (self.ema_loss * (1 - self.alpha))
self.previous_close = value
self.data_count += 1
def get_value(self) -> float:
if not self.is_ready() or self.ema_loss == 0:
return 50.0 # Neutral RSI
rs = self.ema_gain / self.ema_loss
rsi = 100.0 - (100.0 / (1.0 + rs))
return rsi
def is_ready(self) -> bool:
return self.data_count > 1 and not self.is_first_value
```
### Usage Examples
#### Memory-Efficient RSI
```python
# Create memory-efficient RSI
simple_rsi = SimpleRSIState(period=14)
# Process large amounts of data with constant memory
for i, price in enumerate(large_price_dataset):
simple_rsi.update(price)
if i % 1000 == 0 and simple_rsi.is_ready(): # Print every 1000 updates
print(f"RSI after {i} updates: {simple_rsi.get_value():.2f}")
```
#### RSI Divergence Detection
```python
class RSIDivergence:
def __init__(self, period: int = 14, lookback: int = 20):
self.rsi = SimpleRSIState(period)
self.lookback = lookback
self.price_history = []
self.rsi_history = []
def update(self, price: float):
self.rsi.update(price)
if self.rsi.is_ready():
self.price_history.append(price)
self.rsi_history.append(self.rsi.get_value())
# Keep only recent history
if len(self.price_history) > self.lookback:
self.price_history.pop(0)
self.rsi_history.pop(0)
def detect_bullish_divergence(self) -> bool:
"""Detect bullish divergence: price makes lower low, RSI makes higher low."""
if len(self.price_history) < self.lookback:
return False
# Find recent lows
price_low_idx = self.price_history.index(min(self.price_history[-10:]))
rsi_low_idx = self.rsi_history.index(min(self.rsi_history[-10:]))
# Check for divergence pattern
if (price_low_idx < len(self.price_history) - 3 and
rsi_low_idx < len(self.rsi_history) - 3):
recent_price_low = min(self.price_history[-3:])
recent_rsi_low = min(self.rsi_history[-3:])
# Bullish divergence: price lower low, RSI higher low
if (recent_price_low < self.price_history[price_low_idx] and
recent_rsi_low > self.rsi_history[rsi_low_idx]):
return True
return False
def detect_bearish_divergence(self) -> bool:
"""Detect bearish divergence: price makes higher high, RSI makes lower high."""
if len(self.price_history) < self.lookback:
return False
# Find recent highs
price_high_idx = self.price_history.index(max(self.price_history[-10:]))
rsi_high_idx = self.rsi_history.index(max(self.rsi_history[-10:]))
# Check for divergence pattern
if (price_high_idx < len(self.price_history) - 3 and
rsi_high_idx < len(self.rsi_history) - 3):
recent_price_high = max(self.price_history[-3:])
recent_rsi_high = max(self.rsi_history[-3:])
# Bearish divergence: price higher high, RSI lower high
if (recent_price_high > self.price_history[price_high_idx] and
recent_rsi_high < self.rsi_history[rsi_high_idx]):
return True
return False
# Usage
divergence_detector = RSIDivergence(period=14, lookback=20)
for price in price_data:
divergence_detector.update(price)
if divergence_detector.detect_bullish_divergence():
print(f"Bullish RSI divergence detected at price {price:.2f}")
if divergence_detector.detect_bearish_divergence():
print(f"Bearish RSI divergence detected at price {price:.2f}")
```
### Performance Characteristics
- **Time Complexity**: O(1) per update
- **Space Complexity**: O(1)
- **Memory Usage**: ~32 bytes (constant)
## Comparison: RSIState vs SimpleRSIState
| Aspect | RSIState | SimpleRSIState |
|--------|----------|----------------|
| **Memory Usage** | O(period) | O(1) |
| **Calculation Method** | Wilder's Smoothing | Exponential Smoothing |
| **Accuracy** | Higher (traditional) | Good (approximation) |
| **Responsiveness** | Standard | Slightly more responsive |
| **Historical Compatibility** | Traditional RSI | Modern approximation |
### When to Use RSIState
- **Precise Calculations**: When you need exact traditional RSI values
- **Backtesting**: For historical analysis and strategy validation
- **Research**: When studying exact RSI behavior and patterns
- **Small Periods**: When period is small (< 20) and memory isn't an issue
### When to Use SimpleRSIState
- **Memory Efficiency**: When processing large amounts of data
- **Real-time Systems**: For high-frequency trading applications
- **Approximate Analysis**: When close approximation is sufficient
- **Large Periods**: When using large RSI periods (> 50)
## Advanced Usage Patterns
### Multi-Timeframe RSI Analysis
```python
class MultiTimeframeRSI:
def __init__(self):
self.rsi_short = SimpleRSIState(period=7) # Short-term momentum
self.rsi_medium = SimpleRSIState(period=14) # Standard RSI
self.rsi_long = SimpleRSIState(period=21) # Long-term momentum
def update(self, price: float):
self.rsi_short.update(price)
self.rsi_medium.update(price)
self.rsi_long.update(price)
def get_momentum_regime(self) -> str:
"""Determine current momentum regime."""
if not all([self.rsi_short.is_ready(), self.rsi_medium.is_ready(), self.rsi_long.is_ready()]):
return "UNKNOWN"
short_rsi = self.rsi_short.get_value()
medium_rsi = self.rsi_medium.get_value()
long_rsi = self.rsi_long.get_value()
# All timeframes bullish
if all(rsi > 50 for rsi in [short_rsi, medium_rsi, long_rsi]):
return "STRONG_BULLISH"
# All timeframes bearish
elif all(rsi < 50 for rsi in [short_rsi, medium_rsi, long_rsi]):
return "STRONG_BEARISH"
# Mixed signals
elif short_rsi > 50 and medium_rsi > 50:
return "BULLISH"
elif short_rsi < 50 and medium_rsi < 50:
return "BEARISH"
else:
return "MIXED"
def get_overbought_oversold_consensus(self) -> str:
"""Get consensus on overbought/oversold conditions."""
if not all([self.rsi_short.is_ready(), self.rsi_medium.is_ready(), self.rsi_long.is_ready()]):
return "UNKNOWN"
rsi_values = [self.rsi_short.get_value(), self.rsi_medium.get_value(), self.rsi_long.get_value()]
overbought_count = sum(1 for rsi in rsi_values if rsi >= 70)
oversold_count = sum(1 for rsi in rsi_values if rsi <= 30)
if overbought_count >= 2:
return "OVERBOUGHT"
elif oversold_count >= 2:
return "OVERSOLD"
else:
return "NEUTRAL"
# Usage
multi_rsi = MultiTimeframeRSI()
for price in price_data:
multi_rsi.update(price)
regime = multi_rsi.get_momentum_regime()
consensus = multi_rsi.get_overbought_oversold_consensus()
print(f"Price: {price:.2f}, Momentum: {regime}, Condition: {consensus}")
```
### RSI with Dynamic Thresholds
```python
class AdaptiveRSI:
def __init__(self, period: int = 14, lookback: int = 50):
self.rsi = SimpleRSIState(period)
self.lookback = lookback
self.rsi_history = []
def update(self, price: float):
self.rsi.update(price)
if self.rsi.is_ready():
self.rsi_history.append(self.rsi.get_value())
# Keep only recent history
if len(self.rsi_history) > self.lookback:
self.rsi_history.pop(0)
def get_adaptive_thresholds(self) -> tuple:
"""Calculate adaptive overbought/oversold thresholds."""
if len(self.rsi_history) < 20:
return 70.0, 30.0 # Default thresholds
# Calculate percentiles for adaptive thresholds
sorted_rsi = sorted(self.rsi_history)
# Use 80th and 20th percentiles as adaptive thresholds
overbought_threshold = sorted_rsi[int(len(sorted_rsi) * 0.8)]
oversold_threshold = sorted_rsi[int(len(sorted_rsi) * 0.2)]
# Ensure minimum separation
if overbought_threshold - oversold_threshold < 20:
mid = (overbought_threshold + oversold_threshold) / 2
overbought_threshold = mid + 10
oversold_threshold = mid - 10
return overbought_threshold, oversold_threshold
def get_adaptive_signal(self) -> str:
"""Get signal using adaptive thresholds."""
if not self.rsi.is_ready() or len(self.rsi_history) < 2:
return "HOLD"
current_rsi = self.rsi.get_value()
previous_rsi = self.rsi_history[-2]
overbought, oversold = self.get_adaptive_thresholds()
# Adaptive oversold bounce
if previous_rsi <= oversold and current_rsi > oversold:
return "BUY"
# Adaptive overbought pullback
elif previous_rsi >= overbought and current_rsi < overbought:
return "SELL"
return "HOLD"
# Usage
adaptive_rsi = AdaptiveRSI(period=14, lookback=50)
for price in price_data:
adaptive_rsi.update(price)
signal = adaptive_rsi.get_adaptive_signal()
overbought, oversold = adaptive_rsi.get_adaptive_thresholds()
if signal != "HOLD":
print(f"Adaptive RSI Signal: {signal}, Thresholds: OB={overbought:.1f}, OS={oversold:.1f}")
```
## Integration with Strategies
### RSI Mean Reversion Strategy
```python
class RSIMeanReversionStrategy(IncStrategyBase):
def __init__(self, name: str, params: dict = None):
super().__init__(name, params)
# Initialize RSI
self.rsi = RSIState(self.params.get('rsi_period', 14))
# RSI parameters
self.overbought = self.params.get('overbought', 70.0)
self.oversold = self.params.get('oversold', 30.0)
self.exit_neutral = self.params.get('exit_neutral', 50.0)
# State tracking
self.previous_rsi = None
self.position_type = None
def _process_aggregated_data(self, timestamp: int, ohlcv: tuple) -> IncStrategySignal:
open_price, high, low, close, volume = ohlcv
# Update RSI
self.rsi.update(close)
# Wait for RSI to be ready
if not self.rsi.is_ready():
return IncStrategySignal.HOLD()
current_rsi = self.rsi.get_value()
# Entry signals
if self.previous_rsi is not None:
# Oversold bounce (mean reversion up)
if (self.previous_rsi <= self.oversold and
current_rsi > self.oversold and
self.position_type != "LONG"):
confidence = min(0.9, (self.oversold - self.previous_rsi) / 20.0)
self.position_type = "LONG"
return IncStrategySignal.BUY(
confidence=confidence,
metadata={
'rsi': current_rsi,
'previous_rsi': self.previous_rsi,
'signal_type': 'oversold_bounce'
}
)
# Overbought pullback (mean reversion down)
elif (self.previous_rsi >= self.overbought and
current_rsi < self.overbought and
self.position_type != "SHORT"):
confidence = min(0.9, (self.previous_rsi - self.overbought) / 20.0)
self.position_type = "SHORT"
return IncStrategySignal.SELL(
confidence=confidence,
metadata={
'rsi': current_rsi,
'previous_rsi': self.previous_rsi,
'signal_type': 'overbought_pullback'
}
)
# Exit signals (return to neutral)
elif (self.position_type == "LONG" and current_rsi >= self.exit_neutral):
self.position_type = None
return IncStrategySignal.SELL(confidence=0.5, metadata={'signal_type': 'exit_long'})
elif (self.position_type == "SHORT" and current_rsi <= self.exit_neutral):
self.position_type = None
return IncStrategySignal.BUY(confidence=0.5, metadata={'signal_type': 'exit_short'})
self.previous_rsi = current_rsi
return IncStrategySignal.HOLD()
```
## Performance Optimization Tips
### 1. Choose the Right RSI Implementation
```python
# For memory-constrained environments
rsi = SimpleRSIState(period=14) # O(1) memory
# For precise traditional RSI
rsi = RSIState(period=14) # O(period) memory
```
### 2. Batch Processing for Multiple RSIs
```python
def update_multiple_rsis(rsis: list, price: float):
"""Efficiently update multiple RSI indicators."""
for rsi in rsis:
rsi.update(price)
return [rsi.get_value() for rsi in rsis if rsi.is_ready()]
```
### 3. Cache RSI Values for Complex Calculations
```python
class CachedRSI:
def __init__(self, period: int):
self.rsi = SimpleRSIState(period)
self._cached_value = 50.0
self._cache_valid = False
def update(self, price: float):
self.rsi.update(price)
self._cache_valid = False
def get_value(self) -> float:
if not self._cache_valid:
self._cached_value = self.rsi.get_value()
self._cache_valid = True
return self._cached_value
```
---
*RSI indicators are essential for identifying momentum and overbought/oversold conditions. Use RSIState for traditional analysis or SimpleRSIState for memory efficiency in high-frequency applications.*

View File

@@ -0,0 +1,577 @@
# Trend Indicators
## Overview
Trend indicators help identify the direction and strength of market trends. IncrementalTrader provides Supertrend implementations that combine price action with volatility to generate clear trend signals.
## SupertrendState
Individual Supertrend indicator that tracks trend direction and provides support/resistance levels.
### Features
- **Trend Direction**: Clear bullish/bearish trend identification
- **Dynamic Support/Resistance**: Adaptive levels based on volatility
- **ATR-Based**: Uses Average True Range for volatility adjustment
- **Real-time Updates**: Incremental calculation for live trading
### Mathematical Formula
```
Basic Upper Band = (High + Low) / 2 + (Multiplier × ATR)
Basic Lower Band = (High + Low) / 2 - (Multiplier × ATR)
Final Upper Band = Basic Upper Band < Previous Final Upper Band OR Previous Close > Previous Final Upper Band
? Basic Upper Band : Previous Final Upper Band
Final Lower Band = Basic Lower Band > Previous Final Lower Band OR Previous Close < Previous Final Lower Band
? Basic Lower Band : Previous Final Lower Band
Supertrend = Close <= Final Lower Band ? Final Lower Band : Final Upper Band
Trend = Close <= Final Lower Band ? DOWN : UP
```
### Class Definition
```python
from IncrementalTrader.strategies.indicators import SupertrendState
class SupertrendState(OHLCIndicatorState):
def __init__(self, period: int, multiplier: float):
super().__init__(period)
self.multiplier = multiplier
self.atr = SimpleATRState(period)
# Supertrend state
self.supertrend_value = 0.0
self.trend = 1 # 1 for up, -1 for down
self.final_upper_band = 0.0
self.final_lower_band = 0.0
self.previous_close = 0.0
def _process_ohlc_data(self, high: float, low: float, close: float):
# Update ATR
self.atr.update_ohlc(high, low, close)
if not self.atr.is_ready():
return
# Calculate basic bands
hl2 = (high + low) / 2.0
atr_value = self.atr.get_value()
basic_upper_band = hl2 + (self.multiplier * atr_value)
basic_lower_band = hl2 - (self.multiplier * atr_value)
# Calculate final bands
if self.data_count == 1:
self.final_upper_band = basic_upper_band
self.final_lower_band = basic_lower_band
else:
# Final upper band logic
if basic_upper_band < self.final_upper_band or self.previous_close > self.final_upper_band:
self.final_upper_band = basic_upper_band
# Final lower band logic
if basic_lower_band > self.final_lower_band or self.previous_close < self.final_lower_band:
self.final_lower_band = basic_lower_band
# Determine trend and supertrend value
if close <= self.final_lower_band:
self.trend = -1 # Downtrend
self.supertrend_value = self.final_lower_band
else:
self.trend = 1 # Uptrend
self.supertrend_value = self.final_upper_band
self.previous_close = close
def get_value(self) -> float:
return self.supertrend_value
def get_trend(self) -> int:
"""Get current trend direction: 1 for up, -1 for down."""
return self.trend
def is_bullish(self) -> bool:
"""Check if current trend is bullish."""
return self.trend == 1
def is_bearish(self) -> bool:
"""Check if current trend is bearish."""
return self.trend == -1
```
### Usage Examples
#### Basic Supertrend Usage
```python
# Create Supertrend with 10-period ATR and 3.0 multiplier
supertrend = SupertrendState(period=10, multiplier=3.0)
# OHLC data: (high, low, close)
ohlc_data = [
(105.0, 102.0, 104.0),
(106.0, 103.0, 105.5),
(107.0, 104.0, 106.0),
(108.0, 105.0, 107.5)
]
for high, low, close in ohlc_data:
supertrend.update_ohlc(high, low, close)
if supertrend.is_ready():
trend_direction = "BULLISH" if supertrend.is_bullish() else "BEARISH"
print(f"Supertrend: {supertrend.get_value():.2f}, Trend: {trend_direction}")
```
#### Trend Change Detection
```python
class SupertrendSignals:
def __init__(self, period: int = 10, multiplier: float = 3.0):
self.supertrend = SupertrendState(period, multiplier)
self.previous_trend = None
def update(self, high: float, low: float, close: float):
self.supertrend.update_ohlc(high, low, close)
def get_signal(self) -> str:
if not self.supertrend.is_ready():
return "HOLD"
current_trend = self.supertrend.get_trend()
# Check for trend change
if self.previous_trend is not None and self.previous_trend != current_trend:
if current_trend == 1:
signal = "BUY" # Trend changed to bullish
else:
signal = "SELL" # Trend changed to bearish
else:
signal = "HOLD"
self.previous_trend = current_trend
return signal
def get_support_resistance(self) -> float:
"""Get current support/resistance level."""
return self.supertrend.get_value()
# Usage
signals = SupertrendSignals(period=10, multiplier=3.0)
for high, low, close in ohlc_data:
signals.update(high, low, close)
signal = signals.get_signal()
support_resistance = signals.get_support_resistance()
if signal != "HOLD":
print(f"Signal: {signal} at {close:.2f}, S/R: {support_resistance:.2f}")
```
### Performance Characteristics
- **Time Complexity**: O(1) per update
- **Space Complexity**: O(ATR_period)
- **Memory Usage**: ~8 bytes per ATR period + constant overhead
## SupertrendCollection
Collection of multiple Supertrend indicators for meta-trend analysis.
### Features
- **Multiple Timeframes**: Combines different Supertrend configurations
- **Consensus Signals**: Requires agreement among multiple indicators
- **Trend Strength**: Measures trend strength through consensus
- **Flexible Configuration**: Customizable periods and multipliers
### Class Definition
```python
class SupertrendCollection:
def __init__(self, configs: list):
"""
Initialize with list of (period, multiplier) tuples.
Example: [(10, 3.0), (14, 2.0), (21, 1.5)]
"""
self.supertrendss = []
for period, multiplier in configs:
self.supertrendss.append(SupertrendState(period, multiplier))
self.configs = configs
def update_ohlc(self, high: float, low: float, close: float):
"""Update all Supertrend indicators."""
for st in self.supertrendss:
st.update_ohlc(high, low, close)
def is_ready(self) -> bool:
"""Check if all indicators are ready."""
return all(st.is_ready() for st in self.supertrendss)
def get_consensus_trend(self) -> int:
"""Get consensus trend: 1 for bullish, -1 for bearish, 0 for mixed."""
if not self.is_ready():
return 0
trends = [st.get_trend() for st in self.supertrendss]
bullish_count = sum(1 for trend in trends if trend == 1)
bearish_count = sum(1 for trend in trends if trend == -1)
if bullish_count > bearish_count:
return 1
elif bearish_count > bullish_count:
return -1
else:
return 0
def get_trend_strength(self) -> float:
"""Get trend strength as percentage of indicators agreeing."""
if not self.is_ready():
return 0.0
consensus_trend = self.get_consensus_trend()
if consensus_trend == 0:
return 0.0
trends = [st.get_trend() for st in self.supertrendss]
agreeing_count = sum(1 for trend in trends if trend == consensus_trend)
return agreeing_count / len(trends)
def get_supertrend_values(self) -> list:
"""Get all Supertrend values."""
return [st.get_value() for st in self.supertrendss if st.is_ready()]
def get_average_supertrend(self) -> float:
"""Get average Supertrend value."""
values = self.get_supertrend_values()
return sum(values) / len(values) if values else 0.0
```
### Usage Examples
#### Multi-Timeframe Trend Analysis
```python
# Create collection with different configurations
configs = [
(10, 3.0), # Fast Supertrend
(14, 2.5), # Medium Supertrend
(21, 2.0) # Slow Supertrend
]
supertrend_collection = SupertrendCollection(configs)
for high, low, close in ohlc_data:
supertrend_collection.update_ohlc(high, low, close)
if supertrend_collection.is_ready():
consensus = supertrend_collection.get_consensus_trend()
strength = supertrend_collection.get_trend_strength()
avg_supertrend = supertrend_collection.get_average_supertrend()
trend_name = {1: "BULLISH", -1: "BEARISH", 0: "MIXED"}[consensus]
print(f"Consensus: {trend_name}, Strength: {strength:.1%}, Avg S/R: {avg_supertrend:.2f}")
```
#### Meta-Trend Strategy
```python
class MetaTrendStrategy:
def __init__(self):
# Multiple Supertrend configurations
self.supertrend_collection = SupertrendCollection([
(10, 3.0), # Fast
(14, 2.5), # Medium
(21, 2.0), # Slow
(28, 1.5) # Very slow
])
self.previous_consensus = None
def update(self, high: float, low: float, close: float):
self.supertrend_collection.update_ohlc(high, low, close)
def get_meta_signal(self) -> dict:
if not self.supertrend_collection.is_ready():
return {"signal": "HOLD", "confidence": 0.0, "strength": 0.0}
current_consensus = self.supertrend_collection.get_consensus_trend()
strength = self.supertrend_collection.get_trend_strength()
# Check for consensus change
signal = "HOLD"
if self.previous_consensus is not None and self.previous_consensus != current_consensus:
if current_consensus == 1:
signal = "BUY"
elif current_consensus == -1:
signal = "SELL"
# Calculate confidence based on strength and consensus
confidence = strength if current_consensus != 0 else 0.0
self.previous_consensus = current_consensus
return {
"signal": signal,
"confidence": confidence,
"strength": strength,
"consensus": current_consensus,
"avg_supertrend": self.supertrend_collection.get_average_supertrend()
}
# Usage
meta_strategy = MetaTrendStrategy()
for high, low, close in ohlc_data:
meta_strategy.update(high, low, close)
result = meta_strategy.get_meta_signal()
if result["signal"] != "HOLD":
print(f"Meta Signal: {result['signal']}, Confidence: {result['confidence']:.1%}")
```
### Performance Characteristics
- **Time Complexity**: O(n) per update (where n is number of Supertrends)
- **Space Complexity**: O(sum of all ATR periods)
- **Memory Usage**: Scales with number of indicators
## Advanced Usage Patterns
### Adaptive Supertrend
```python
class AdaptiveSupertrend:
def __init__(self, base_period: int = 14, base_multiplier: float = 2.0):
self.base_period = base_period
self.base_multiplier = base_multiplier
# Volatility measurement for adaptation
self.atr_short = SimpleATRState(period=5)
self.atr_long = SimpleATRState(period=20)
# Current adaptive Supertrend
self.current_supertrend = SupertrendState(base_period, base_multiplier)
# Adaptation parameters
self.min_multiplier = 1.0
self.max_multiplier = 4.0
def update_ohlc(self, high: float, low: float, close: float):
# Update volatility measurements
self.atr_short.update_ohlc(high, low, close)
self.atr_long.update_ohlc(high, low, close)
# Calculate adaptive multiplier
if self.atr_long.is_ready() and self.atr_short.is_ready():
volatility_ratio = self.atr_short.get_value() / self.atr_long.get_value()
# Adjust multiplier based on volatility
adaptive_multiplier = self.base_multiplier * volatility_ratio
adaptive_multiplier = max(self.min_multiplier, min(self.max_multiplier, adaptive_multiplier))
# Update Supertrend if multiplier changed significantly
if abs(adaptive_multiplier - self.current_supertrend.multiplier) > 0.1:
self.current_supertrend = SupertrendState(self.base_period, adaptive_multiplier)
# Update current Supertrend
self.current_supertrend.update_ohlc(high, low, close)
def get_value(self) -> float:
return self.current_supertrend.get_value()
def get_trend(self) -> int:
return self.current_supertrend.get_trend()
def is_ready(self) -> bool:
return self.current_supertrend.is_ready()
def get_current_multiplier(self) -> float:
return self.current_supertrend.multiplier
# Usage
adaptive_st = AdaptiveSupertrend(base_period=14, base_multiplier=2.0)
for high, low, close in ohlc_data:
adaptive_st.update_ohlc(high, low, close)
if adaptive_st.is_ready():
trend = "BULLISH" if adaptive_st.get_trend() == 1 else "BEARISH"
multiplier = adaptive_st.get_current_multiplier()
print(f"Adaptive Supertrend: {adaptive_st.get_value():.2f}, "
f"Trend: {trend}, Multiplier: {multiplier:.2f}")
```
### Supertrend with Stop Loss Management
```python
class SupertrendStopLoss:
def __init__(self, period: int = 14, multiplier: float = 2.0, buffer_percent: float = 0.5):
self.supertrend = SupertrendState(period, multiplier)
self.buffer_percent = buffer_percent / 100.0
self.current_position = None # "LONG", "SHORT", or None
self.entry_price = 0.0
self.stop_loss = 0.0
def update(self, high: float, low: float, close: float):
previous_trend = self.supertrend.get_trend() if self.supertrend.is_ready() else None
self.supertrend.update_ohlc(high, low, close)
if not self.supertrend.is_ready():
return
current_trend = self.supertrend.get_trend()
supertrend_value = self.supertrend.get_value()
# Check for trend change (entry signal)
if previous_trend is not None and previous_trend != current_trend:
if current_trend == 1: # Bullish trend
self.enter_long(close, supertrend_value)
else: # Bearish trend
self.enter_short(close, supertrend_value)
# Update stop loss for existing position
if self.current_position:
self.update_stop_loss(supertrend_value)
def enter_long(self, price: float, supertrend_value: float):
self.current_position = "LONG"
self.entry_price = price
self.stop_loss = supertrend_value * (1 - self.buffer_percent)
print(f"LONG entry at {price:.2f}, Stop: {self.stop_loss:.2f}")
def enter_short(self, price: float, supertrend_value: float):
self.current_position = "SHORT"
self.entry_price = price
self.stop_loss = supertrend_value * (1 + self.buffer_percent)
print(f"SHORT entry at {price:.2f}, Stop: {self.stop_loss:.2f}")
def update_stop_loss(self, supertrend_value: float):
if self.current_position == "LONG":
new_stop = supertrend_value * (1 - self.buffer_percent)
if new_stop > self.stop_loss: # Only move stop up
self.stop_loss = new_stop
elif self.current_position == "SHORT":
new_stop = supertrend_value * (1 + self.buffer_percent)
if new_stop < self.stop_loss: # Only move stop down
self.stop_loss = new_stop
def check_stop_loss(self, current_price: float) -> bool:
"""Check if stop loss is hit."""
if not self.current_position:
return False
if self.current_position == "LONG" and current_price <= self.stop_loss:
print(f"LONG stop loss hit at {current_price:.2f}")
self.current_position = None
return True
elif self.current_position == "SHORT" and current_price >= self.stop_loss:
print(f"SHORT stop loss hit at {current_price:.2f}")
self.current_position = None
return True
return False
# Usage
st_stop_loss = SupertrendStopLoss(period=14, multiplier=2.0, buffer_percent=0.5)
for high, low, close in ohlc_data:
st_stop_loss.update(high, low, close)
# Check stop loss on each update
if st_stop_loss.check_stop_loss(close):
print("Position closed due to stop loss")
```
## Integration with Strategies
### Supertrend Strategy Example
```python
class SupertrendStrategy(IncStrategyBase):
def __init__(self, name: str, params: dict = None):
super().__init__(name, params)
# Initialize Supertrend collection
configs = self.params.get('supertrend_configs', [(10, 3.0), (14, 2.5), (21, 2.0)])
self.supertrend_collection = SupertrendCollection(configs)
# Strategy parameters
self.min_strength = self.params.get('min_strength', 0.75)
self.previous_consensus = None
def _process_aggregated_data(self, timestamp: int, ohlcv: tuple) -> IncStrategySignal:
open_price, high, low, close, volume = ohlcv
# Update Supertrend collection
self.supertrend_collection.update_ohlc(high, low, close)
# Wait for indicators to be ready
if not self.supertrend_collection.is_ready():
return IncStrategySignal.HOLD()
# Get consensus and strength
current_consensus = self.supertrend_collection.get_consensus_trend()
strength = self.supertrend_collection.get_trend_strength()
# Check for strong consensus change
if (self.previous_consensus is not None and
self.previous_consensus != current_consensus and
strength >= self.min_strength):
if current_consensus == 1:
# Strong bullish consensus
return IncStrategySignal.BUY(
confidence=strength,
metadata={
'consensus': current_consensus,
'strength': strength,
'avg_supertrend': self.supertrend_collection.get_average_supertrend()
}
)
elif current_consensus == -1:
# Strong bearish consensus
return IncStrategySignal.SELL(
confidence=strength,
metadata={
'consensus': current_consensus,
'strength': strength,
'avg_supertrend': self.supertrend_collection.get_average_supertrend()
}
)
self.previous_consensus = current_consensus
return IncStrategySignal.HOLD()
```
## Performance Optimization Tips
### 1. Choose Appropriate Configurations
```python
# For fast signals (more noise)
fast_configs = [(7, 3.0), (10, 2.5)]
# For balanced signals
balanced_configs = [(10, 3.0), (14, 2.5), (21, 2.0)]
# For slow, reliable signals
slow_configs = [(14, 2.0), (21, 1.5), (28, 1.0)]
```
### 2. Optimize Memory Usage
```python
# Use SimpleATRState for memory efficiency
class MemoryEfficientSupertrend(SupertrendState):
def __init__(self, period: int, multiplier: float):
super().__init__(period, multiplier)
# Replace ATRState with SimpleATRState
self.atr = SimpleATRState(period)
```
### 3. Batch Processing
```python
def update_multiple_supertrends(supertrends: list, high: float, low: float, close: float):
"""Efficiently update multiple Supertrend indicators."""
for st in supertrends:
st.update_ohlc(high, low, close)
return [(st.get_value(), st.get_trend()) for st in supertrends if st.is_ready()]
```
---
*Supertrend indicators provide clear trend direction and dynamic support/resistance levels. Use single Supertrend for simple trend following or SupertrendCollection for robust meta-trend analysis.*

View File

@@ -0,0 +1,546 @@
# Volatility Indicators
## Overview
Volatility indicators measure the rate of price change and market uncertainty. IncrementalTrader provides Average True Range (ATR) implementations that help assess market volatility and set appropriate stop-loss levels.
## ATRState (Average True Range)
Full ATR implementation that maintains a moving average of True Range values.
### Features
- **True Range Calculation**: Accounts for gaps between trading sessions
- **Volatility Measurement**: Provides absolute volatility measurement
- **Stop-Loss Guidance**: Helps set dynamic stop-loss levels
- **Trend Strength**: Indicates trend strength through volatility
### Mathematical Formula
```
True Range = max(
High - Low,
|High - Previous_Close|,
|Low - Previous_Close|
)
ATR = Moving_Average(True_Range, period)
```
### Class Definition
```python
from IncrementalTrader.strategies.indicators import ATRState
class ATRState(OHLCIndicatorState):
def __init__(self, period: int):
super().__init__(period)
self.true_ranges = []
self.tr_sum = 0.0
self.previous_close = None
def _process_ohlc_data(self, high: float, low: float, close: float):
# Calculate True Range
if self.previous_close is not None:
tr = max(
high - low,
abs(high - self.previous_close),
abs(low - self.previous_close)
)
else:
tr = high - low
# Update True Range moving average
self.true_ranges.append(tr)
self.tr_sum += tr
if len(self.true_ranges) > self.period:
old_tr = self.true_ranges.pop(0)
self.tr_sum -= old_tr
self.previous_close = close
def get_value(self) -> float:
if not self.is_ready():
return 0.0
return self.tr_sum / len(self.true_ranges)
```
### Usage Examples
#### Basic ATR Calculation
```python
# Create 14-period ATR
atr_14 = ATRState(period=14)
# OHLC data: (high, low, close)
ohlc_data = [
(105.0, 102.0, 104.0),
(106.0, 103.0, 105.5),
(107.0, 104.0, 106.0),
(108.0, 105.0, 107.5)
]
for high, low, close in ohlc_data:
atr_14.update_ohlc(high, low, close)
if atr_14.is_ready():
print(f"ATR(14): {atr_14.get_value():.2f}")
```
#### Dynamic Stop-Loss with ATR
```python
class ATRStopLoss:
def __init__(self, atr_period: int = 14, atr_multiplier: float = 2.0):
self.atr = ATRState(atr_period)
self.atr_multiplier = atr_multiplier
def update(self, high: float, low: float, close: float):
self.atr.update_ohlc(high, low, close)
def get_stop_loss(self, entry_price: float, position_type: str) -> float:
if not self.atr.is_ready():
return entry_price * 0.95 if position_type == "LONG" else entry_price * 1.05
atr_value = self.atr.get_value()
if position_type == "LONG":
return entry_price - (atr_value * self.atr_multiplier)
else: # SHORT
return entry_price + (atr_value * self.atr_multiplier)
def get_position_size(self, account_balance: float, risk_percent: float, entry_price: float, position_type: str) -> float:
"""Calculate position size based on ATR risk."""
if not self.atr.is_ready():
return 0.0
risk_amount = account_balance * (risk_percent / 100)
stop_loss = self.get_stop_loss(entry_price, position_type)
risk_per_share = abs(entry_price - stop_loss)
if risk_per_share == 0:
return 0.0
return risk_amount / risk_per_share
# Usage
atr_stop = ATRStopLoss(atr_period=14, atr_multiplier=2.0)
for high, low, close in ohlc_stream:
atr_stop.update(high, low, close)
# Calculate stop loss for a long position
entry_price = close
stop_loss = atr_stop.get_stop_loss(entry_price, "LONG")
position_size = atr_stop.get_position_size(10000, 2.0, entry_price, "LONG")
print(f"Entry: {entry_price:.2f}, Stop: {stop_loss:.2f}, Size: {position_size:.0f}")
```
### Performance Characteristics
- **Time Complexity**: O(1) per update
- **Space Complexity**: O(period)
- **Memory Usage**: ~8 bytes per period + constant overhead
## SimpleATRState
Simplified ATR implementation using exponential smoothing instead of simple moving average.
### Features
- **O(1) Memory**: Constant memory usage regardless of period
- **Exponential Smoothing**: Uses Wilder's smoothing method
- **Faster Computation**: No need to maintain historical True Range values
- **Traditional ATR**: Follows Wilder's original ATR calculation
### Mathematical Formula
```
True Range = max(
High - Low,
|High - Previous_Close|,
|Low - Previous_Close|
)
ATR = (Previous_ATR × (period - 1) + True_Range) / period
```
### Class Definition
```python
class SimpleATRState(OHLCIndicatorState):
def __init__(self, period: int):
super().__init__(period)
self.atr_value = 0.0
self.previous_close = None
self.is_first_value = True
def _process_ohlc_data(self, high: float, low: float, close: float):
# Calculate True Range
if self.previous_close is not None:
tr = max(
high - low,
abs(high - self.previous_close),
abs(low - self.previous_close)
)
else:
tr = high - low
# Update ATR using Wilder's smoothing
if self.is_first_value:
self.atr_value = tr
self.is_first_value = False
else:
self.atr_value = ((self.atr_value * (self.period - 1)) + tr) / self.period
self.previous_close = close
def get_value(self) -> float:
return self.atr_value
```
### Usage Examples
#### Memory-Efficient ATR
```python
# Create memory-efficient ATR
simple_atr = SimpleATRState(period=14)
# Process large amounts of data with constant memory
for i, (high, low, close) in enumerate(large_ohlc_dataset):
simple_atr.update_ohlc(high, low, close)
if i % 1000 == 0: # Print every 1000 updates
print(f"ATR after {i} updates: {simple_atr.get_value():.4f}")
```
#### Volatility Breakout Strategy
```python
class VolatilityBreakout:
def __init__(self, atr_period: int = 14, breakout_multiplier: float = 1.5):
self.atr = SimpleATRState(atr_period)
self.breakout_multiplier = breakout_multiplier
self.previous_close = None
def update(self, high: float, low: float, close: float):
self.atr.update_ohlc(high, low, close)
self.previous_close = close
def get_breakout_levels(self, current_close: float) -> tuple:
"""Get upper and lower breakout levels."""
if not self.atr.is_ready() or self.previous_close is None:
return current_close * 1.01, current_close * 0.99
atr_value = self.atr.get_value()
breakout_distance = atr_value * self.breakout_multiplier
upper_breakout = self.previous_close + breakout_distance
lower_breakout = self.previous_close - breakout_distance
return upper_breakout, lower_breakout
def check_breakout(self, current_high: float, current_low: float, current_close: float) -> str:
"""Check if current price breaks out of volatility range."""
upper_level, lower_level = self.get_breakout_levels(current_close)
if current_high > upper_level:
return "BULLISH_BREAKOUT"
elif current_low < lower_level:
return "BEARISH_BREAKOUT"
return "NO_BREAKOUT"
# Usage
breakout_detector = VolatilityBreakout(atr_period=14, breakout_multiplier=1.5)
for high, low, close in ohlc_data:
breakout_detector.update(high, low, close)
breakout_signal = breakout_detector.check_breakout(high, low, close)
if breakout_signal != "NO_BREAKOUT":
print(f"Breakout detected: {breakout_signal} at {close:.2f}")
```
### Performance Characteristics
- **Time Complexity**: O(1) per update
- **Space Complexity**: O(1)
- **Memory Usage**: ~32 bytes (constant)
## Comparison: ATRState vs SimpleATRState
| Aspect | ATRState | SimpleATRState |
|--------|----------|----------------|
| **Memory Usage** | O(period) | O(1) |
| **Calculation Method** | Simple Moving Average | Exponential Smoothing |
| **Accuracy** | Higher (true SMA) | Good (Wilder's method) |
| **Responsiveness** | Moderate | Slightly more responsive |
| **Historical Compatibility** | Modern | Traditional (Wilder's) |
### When to Use ATRState
- **Precise Calculations**: When you need exact simple moving average of True Range
- **Backtesting**: For historical analysis where memory isn't constrained
- **Research**: When studying exact ATR behavior
- **Small Periods**: When period is small (< 20) and memory isn't an issue
### When to Use SimpleATRState
- **Memory Efficiency**: When processing large amounts of data
- **Real-time Systems**: For high-frequency trading applications
- **Traditional Analysis**: When following Wilder's original methodology
- **Large Periods**: When using large ATR periods (> 50)
## Advanced Usage Patterns
### Multi-Timeframe ATR Analysis
```python
class MultiTimeframeATR:
def __init__(self):
self.atr_short = SimpleATRState(period=7) # Short-term volatility
self.atr_medium = SimpleATRState(period=14) # Medium-term volatility
self.atr_long = SimpleATRState(period=28) # Long-term volatility
def update(self, high: float, low: float, close: float):
self.atr_short.update_ohlc(high, low, close)
self.atr_medium.update_ohlc(high, low, close)
self.atr_long.update_ohlc(high, low, close)
def get_volatility_regime(self) -> str:
"""Determine current volatility regime."""
if not all([self.atr_short.is_ready(), self.atr_medium.is_ready(), self.atr_long.is_ready()]):
return "UNKNOWN"
short_atr = self.atr_short.get_value()
medium_atr = self.atr_medium.get_value()
long_atr = self.atr_long.get_value()
# Compare short-term to long-term volatility
volatility_ratio = short_atr / long_atr if long_atr > 0 else 1.0
if volatility_ratio > 1.5:
return "HIGH_VOLATILITY"
elif volatility_ratio < 0.7:
return "LOW_VOLATILITY"
else:
return "NORMAL_VOLATILITY"
def get_adaptive_stop_multiplier(self) -> float:
"""Get adaptive stop-loss multiplier based on volatility regime."""
regime = self.get_volatility_regime()
if regime == "HIGH_VOLATILITY":
return 2.5 # Wider stops in high volatility
elif regime == "LOW_VOLATILITY":
return 1.5 # Tighter stops in low volatility
else:
return 2.0 # Standard stops in normal volatility
# Usage
multi_atr = MultiTimeframeATR()
for high, low, close in ohlc_data:
multi_atr.update(high, low, close)
regime = multi_atr.get_volatility_regime()
stop_multiplier = multi_atr.get_adaptive_stop_multiplier()
print(f"Volatility Regime: {regime}, Stop Multiplier: {stop_multiplier:.1f}")
```
### ATR-Based Position Sizing
```python
class ATRPositionSizer:
def __init__(self, atr_period: int = 14):
self.atr = SimpleATRState(atr_period)
self.price_history = []
def update(self, high: float, low: float, close: float):
self.atr.update_ohlc(high, low, close)
self.price_history.append(close)
# Keep only recent price history
if len(self.price_history) > 100:
self.price_history.pop(0)
def calculate_position_size(self, account_balance: float, risk_percent: float,
entry_price: float, stop_loss_atr_multiplier: float = 2.0) -> dict:
"""Calculate position size based on ATR risk management."""
if not self.atr.is_ready():
return {"position_size": 0, "risk_amount": 0, "stop_loss": entry_price * 0.95}
atr_value = self.atr.get_value()
risk_amount = account_balance * (risk_percent / 100)
# Calculate stop loss based on ATR
stop_loss = entry_price - (atr_value * stop_loss_atr_multiplier)
risk_per_share = entry_price - stop_loss
# Calculate position size
if risk_per_share > 0:
position_size = risk_amount / risk_per_share
else:
position_size = 0
return {
"position_size": position_size,
"risk_amount": risk_amount,
"stop_loss": stop_loss,
"atr_value": atr_value,
"risk_per_share": risk_per_share
}
def get_volatility_percentile(self) -> float:
"""Get current ATR percentile compared to recent history."""
if not self.atr.is_ready() or len(self.price_history) < 20:
return 50.0 # Default to median
current_atr = self.atr.get_value()
# Calculate ATR for recent periods
recent_atrs = []
for i in range(len(self.price_history) - 14):
if i + 14 < len(self.price_history):
# Simplified ATR calculation for comparison
price_range = max(self.price_history[i:i+14]) - min(self.price_history[i:i+14])
recent_atrs.append(price_range)
if not recent_atrs:
return 50.0
# Calculate percentile
sorted_atrs = sorted(recent_atrs)
position = sum(1 for atr in sorted_atrs if atr <= current_atr)
percentile = (position / len(sorted_atrs)) * 100
return percentile
# Usage
position_sizer = ATRPositionSizer(atr_period=14)
for high, low, close in ohlc_data:
position_sizer.update(high, low, close)
# Calculate position for a potential trade
trade_info = position_sizer.calculate_position_size(
account_balance=10000,
risk_percent=2.0,
entry_price=close,
stop_loss_atr_multiplier=2.0
)
volatility_percentile = position_sizer.get_volatility_percentile()
print(f"Price: {close:.2f}, Position Size: {trade_info['position_size']:.0f}, "
f"ATR Percentile: {volatility_percentile:.1f}%")
```
## Integration with Strategies
### ATR-Enhanced Strategy Example
```python
class ATRTrendStrategy(IncStrategyBase):
def __init__(self, name: str, params: dict = None):
super().__init__(name, params)
# Initialize indicators
self.atr = SimpleATRState(self.params.get('atr_period', 14))
self.sma = MovingAverageState(self.params.get('sma_period', 20))
# ATR parameters
self.atr_stop_multiplier = self.params.get('atr_stop_multiplier', 2.0)
self.atr_entry_multiplier = self.params.get('atr_entry_multiplier', 0.5)
def _process_aggregated_data(self, timestamp: int, ohlcv: tuple) -> IncStrategySignal:
open_price, high, low, close, volume = ohlcv
# Update indicators
self.atr.update_ohlc(high, low, close)
self.sma.update(close)
# Wait for indicators to be ready
if not all([self.atr.is_ready(), self.sma.is_ready()]):
return IncStrategySignal.HOLD()
atr_value = self.atr.get_value()
sma_value = self.sma.get_value()
# Calculate dynamic entry threshold based on ATR
entry_threshold = atr_value * self.atr_entry_multiplier
# Generate signals based on trend and volatility
if close > sma_value + entry_threshold:
# Strong uptrend with sufficient volatility
confidence = min(0.9, (close - sma_value) / atr_value * 0.1)
# Calculate stop loss
stop_loss = close - (atr_value * self.atr_stop_multiplier)
return IncStrategySignal.BUY(
confidence=confidence,
metadata={
'atr_value': atr_value,
'sma_value': sma_value,
'stop_loss': stop_loss,
'entry_threshold': entry_threshold
}
)
elif close < sma_value - entry_threshold:
# Strong downtrend with sufficient volatility
confidence = min(0.9, (sma_value - close) / atr_value * 0.1)
# Calculate stop loss
stop_loss = close + (atr_value * self.atr_stop_multiplier)
return IncStrategySignal.SELL(
confidence=confidence,
metadata={
'atr_value': atr_value,
'sma_value': sma_value,
'stop_loss': stop_loss,
'entry_threshold': entry_threshold
}
)
return IncStrategySignal.HOLD()
```
## Performance Optimization Tips
### 1. Choose the Right ATR Implementation
```python
# For memory-constrained environments
atr = SimpleATRState(period=14) # O(1) memory
# For precise calculations
atr = ATRState(period=14) # O(period) memory
```
### 2. Batch Processing for Multiple ATRs
```python
def update_multiple_atrs(atrs: list, high: float, low: float, close: float):
"""Efficiently update multiple ATR indicators."""
for atr in atrs:
atr.update_ohlc(high, low, close)
return [atr.get_value() for atr in atrs if atr.is_ready()]
```
### 3. Cache ATR Values for Complex Calculations
```python
class CachedATR:
def __init__(self, period: int):
self.atr = SimpleATRState(period)
self._cached_value = 0.0
self._cache_valid = False
def update_ohlc(self, high: float, low: float, close: float):
self.atr.update_ohlc(high, low, close)
self._cache_valid = False
def get_value(self) -> float:
if not self._cache_valid:
self._cached_value = self.atr.get_value()
self._cache_valid = True
return self._cached_value
```
---
*ATR indicators are essential for risk management and volatility analysis. Use ATRState for precise calculations or SimpleATRState for memory efficiency in high-frequency applications.*