- Enhanced the `UserIndicator` class to include an optional `timeframe` attribute for custom indicator timeframes. - Updated the `get_indicator_data` method in `MarketDataIntegrator` to fetch and calculate indicators based on the specified timeframe, ensuring proper data alignment and handling. - Modified the `ChartBuilder` to pass the correct DataFrame for plotting indicators with different timeframes. - Added UI elements in the indicator modal for selecting timeframes, improving user experience. - Updated relevant JSON templates to include the new `timeframe` field for all indicators. - Refactored the `prepare_chart_data` function to ensure it returns a DataFrame with a `DatetimeIndex` for consistent calculations. This commit enhances the flexibility and usability of the indicator system, allowing users to analyze data across various timeframes.
513 lines
19 KiB
Python
513 lines
19 KiB
Python
"""
|
|
Technical Indicators Module for OHLCV Data
|
|
|
|
This module provides technical indicator calculations optimized for sparse OHLCV data
|
|
as produced by the TCP Trading Platform's aggregation strategy.
|
|
|
|
IMPORTANT: Handles Sparse Data
|
|
- Missing candles (time gaps) are normal in this system
|
|
- Indicators properly handle gaps without interpolation
|
|
- Uses pandas for efficient vectorized calculations
|
|
- Follows right-aligned timestamp convention
|
|
|
|
Supported Indicators:
|
|
- Simple Moving Average (SMA)
|
|
- Exponential Moving Average (EMA)
|
|
- Relative Strength Index (RSI)
|
|
- Moving Average Convergence Divergence (MACD)
|
|
- Bollinger Bands
|
|
"""
|
|
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from typing import Dict, List, Optional, Any, Union, Tuple
|
|
import pandas as pd
|
|
import numpy as np
|
|
from dataclasses import dataclass
|
|
|
|
from .data_types import OHLCVCandle
|
|
|
|
|
|
@dataclass
|
|
class IndicatorResult:
|
|
"""
|
|
Container for technical indicator calculation results.
|
|
|
|
Attributes:
|
|
timestamp: Candle timestamp (right-aligned)
|
|
symbol: Trading symbol
|
|
timeframe: Candle timeframe
|
|
values: Dictionary of indicator values
|
|
metadata: Additional calculation metadata
|
|
"""
|
|
timestamp: datetime
|
|
symbol: str
|
|
timeframe: str
|
|
values: Dict[str, float]
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class TechnicalIndicators:
|
|
"""
|
|
Technical indicator calculator for OHLCV candle data.
|
|
|
|
This class provides vectorized technical indicator calculations
|
|
designed to handle sparse data efficiently. All calculations use
|
|
pandas for performance and handle missing data appropriately.
|
|
|
|
SPARSE DATA HANDLING:
|
|
- Gaps in timestamps are preserved (no interpolation)
|
|
- Indicators calculate only on available data points
|
|
- Periods with insufficient data return NaN
|
|
- Results maintain original timestamp alignment
|
|
"""
|
|
|
|
def __init__(self, logger=None):
|
|
"""
|
|
Initialize technical indicators calculator.
|
|
|
|
Args:
|
|
logger: Optional logger instance
|
|
"""
|
|
self.logger = logger
|
|
|
|
if self.logger:
|
|
self.logger.info("TechnicalIndicators: Initialized indicator calculator")
|
|
|
|
def _prepare_dataframe_from_list(self, candles: List[OHLCVCandle]) -> pd.DataFrame:
|
|
"""
|
|
Convert OHLCV candles to pandas DataFrame for efficient calculations.
|
|
|
|
Args:
|
|
candles: List of OHLCV candles (can be sparse)
|
|
|
|
Returns:
|
|
DataFrame with OHLCV data, sorted by timestamp
|
|
"""
|
|
if not candles:
|
|
return pd.DataFrame()
|
|
|
|
# Convert to DataFrame
|
|
data = []
|
|
for candle in candles:
|
|
data.append({
|
|
'timestamp': candle.end_time, # Right-aligned timestamp
|
|
'symbol': candle.symbol,
|
|
'timeframe': candle.timeframe,
|
|
'open': float(candle.open),
|
|
'high': float(candle.high),
|
|
'low': float(candle.low),
|
|
'close': float(candle.close),
|
|
'volume': float(candle.volume),
|
|
'trade_count': candle.trade_count
|
|
})
|
|
|
|
df = pd.DataFrame(data)
|
|
|
|
# Sort by timestamp to ensure proper order
|
|
df = df.sort_values('timestamp').reset_index(drop=True)
|
|
|
|
# Set timestamp as index for time-series operations
|
|
df.set_index('timestamp', inplace=True)
|
|
|
|
return df
|
|
|
|
def sma(self, df: pd.DataFrame, period: int,
|
|
price_column: str = 'close') -> List[IndicatorResult]:
|
|
"""
|
|
Calculate Simple Moving Average (SMA).
|
|
|
|
Args:
|
|
df: DataFrame with OHLCV data
|
|
period: Number of periods for moving average
|
|
price_column: Price column to use ('open', 'high', 'low', 'close')
|
|
|
|
Returns:
|
|
List of indicator results with SMA values
|
|
"""
|
|
if df.empty or len(df) < period:
|
|
return []
|
|
|
|
# Calculate SMA using pandas rolling window
|
|
df['sma'] = df[price_column].rolling(window=period, min_periods=period).mean()
|
|
|
|
# Convert results back to IndicatorResult objects
|
|
results = []
|
|
for timestamp, row in df.iterrows():
|
|
if not pd.isna(row['sma']):
|
|
result = IndicatorResult(
|
|
timestamp=timestamp,
|
|
symbol=row['symbol'],
|
|
timeframe=row['timeframe'],
|
|
values={'sma': row['sma']},
|
|
metadata={'period': period, 'price_column': price_column}
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def ema(self, df: pd.DataFrame, period: int,
|
|
price_column: str = 'close') -> List[IndicatorResult]:
|
|
"""
|
|
Calculate Exponential Moving Average (EMA).
|
|
|
|
Args:
|
|
df: DataFrame with OHLCV data
|
|
period: Number of periods for moving average
|
|
price_column: Price column to use ('open', 'high', 'low', 'close')
|
|
|
|
Returns:
|
|
List of indicator results with EMA values
|
|
"""
|
|
if df.empty or len(df) < period:
|
|
return []
|
|
|
|
# Calculate EMA using pandas exponential weighted moving average
|
|
df['ema'] = df[price_column].ewm(span=period, adjust=False).mean()
|
|
|
|
# Convert results back to IndicatorResult objects
|
|
results = []
|
|
for i, (timestamp, row) in enumerate(df.iterrows()):
|
|
# Only return results after minimum period
|
|
if i >= period - 1 and not pd.isna(row['ema']):
|
|
result = IndicatorResult(
|
|
timestamp=timestamp,
|
|
symbol=row['symbol'],
|
|
timeframe=row['timeframe'],
|
|
values={'ema': row['ema']},
|
|
metadata={'period': period, 'price_column': price_column}
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def rsi(self, df: pd.DataFrame, period: int = 14,
|
|
price_column: str = 'close') -> List[IndicatorResult]:
|
|
"""
|
|
Calculate Relative Strength Index (RSI).
|
|
|
|
Args:
|
|
df: DataFrame with OHLCV data
|
|
period: Number of periods for RSI calculation (default 14)
|
|
price_column: Price column to use ('open', 'high', 'low', 'close')
|
|
|
|
Returns:
|
|
List of indicator results with RSI values
|
|
"""
|
|
if df.empty or len(df) < period + 1:
|
|
return []
|
|
|
|
# Calculate price changes
|
|
df['price_change'] = df[price_column].diff()
|
|
|
|
# Separate gains and losses
|
|
df['gain'] = df['price_change'].where(df['price_change'] > 0, 0)
|
|
df['loss'] = (-df['price_change']).where(df['price_change'] < 0, 0)
|
|
|
|
# Calculate average gain and loss using EMA
|
|
df['avg_gain'] = df['gain'].ewm(span=period, adjust=False).mean()
|
|
df['avg_loss'] = df['loss'].ewm(span=period, adjust=False).mean()
|
|
|
|
# Calculate RS and RSI
|
|
df['rs'] = df['avg_gain'] / df['avg_loss']
|
|
df['rsi'] = 100 - (100 / (1 + df['rs']))
|
|
|
|
# Handle division by zero
|
|
df['rsi'] = df['rsi'].fillna(50) # Neutral RSI when no losses
|
|
|
|
# Convert results back to IndicatorResult objects
|
|
results = []
|
|
for i, (timestamp, row) in enumerate(df.iterrows()):
|
|
# Only return results after minimum period
|
|
if i >= period and not pd.isna(row['rsi']):
|
|
result = IndicatorResult(
|
|
timestamp=timestamp,
|
|
symbol=row['symbol'],
|
|
timeframe=row['timeframe'],
|
|
values={'rsi': row['rsi']},
|
|
metadata={'period': period, 'price_column': price_column}
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def macd(self, df: pd.DataFrame,
|
|
fast_period: int = 12, slow_period: int = 26, signal_period: int = 9,
|
|
price_column: str = 'close') -> List[IndicatorResult]:
|
|
"""
|
|
Calculate Moving Average Convergence Divergence (MACD).
|
|
|
|
Args:
|
|
df: DataFrame with OHLCV data
|
|
fast_period: Fast EMA period (default 12)
|
|
slow_period: Slow EMA period (default 26)
|
|
signal_period: Signal line EMA period (default 9)
|
|
price_column: Price column to use ('open', 'high', 'low', 'close')
|
|
|
|
Returns:
|
|
List of indicator results with MACD, signal, and histogram values
|
|
"""
|
|
if df.empty or len(df) < slow_period:
|
|
return []
|
|
|
|
# Calculate fast and slow EMAs
|
|
df['ema_fast'] = df[price_column].ewm(span=fast_period, adjust=False).mean()
|
|
df['ema_slow'] = df[price_column].ewm(span=slow_period, adjust=False).mean()
|
|
|
|
# Calculate MACD line
|
|
df['macd'] = df['ema_fast'] - df['ema_slow']
|
|
|
|
# Calculate signal line (EMA of MACD)
|
|
df['signal'] = df['macd'].ewm(span=signal_period, adjust=False).mean()
|
|
|
|
# Calculate histogram
|
|
df['histogram'] = df['macd'] - df['signal']
|
|
|
|
# Convert results back to IndicatorResult objects
|
|
results = []
|
|
for i, (timestamp, row) in enumerate(df.iterrows()):
|
|
# Only return results after minimum period
|
|
if i >= slow_period - 1:
|
|
if not (pd.isna(row['macd']) or pd.isna(row['signal']) or pd.isna(row['histogram'])):
|
|
result = IndicatorResult(
|
|
timestamp=timestamp,
|
|
symbol=row['symbol'],
|
|
timeframe=row['timeframe'],
|
|
values={
|
|
'macd': row['macd'],
|
|
'signal': row['signal'],
|
|
'histogram': row['histogram']
|
|
},
|
|
metadata={
|
|
'fast_period': fast_period,
|
|
'slow_period': slow_period,
|
|
'signal_period': signal_period,
|
|
'price_column': price_column
|
|
}
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def bollinger_bands(self, df: pd.DataFrame, period: int = 20,
|
|
std_dev: float = 2.0, price_column: str = 'close') -> List[IndicatorResult]:
|
|
"""
|
|
Calculate Bollinger Bands.
|
|
|
|
Args:
|
|
df: DataFrame with OHLCV data
|
|
period: Number of periods for moving average (default 20)
|
|
std_dev: Number of standard deviations (default 2.0)
|
|
price_column: Price column to use ('open', 'high', 'low', 'close')
|
|
|
|
Returns:
|
|
List of indicator results with upper band, middle band (SMA), and lower band
|
|
"""
|
|
if df.empty or len(df) < period:
|
|
return []
|
|
|
|
# Calculate middle band (SMA)
|
|
df['middle_band'] = df[price_column].rolling(window=period, min_periods=period).mean()
|
|
|
|
# Calculate standard deviation
|
|
df['std'] = df[price_column].rolling(window=period, min_periods=period).std()
|
|
|
|
# Calculate upper and lower bands
|
|
df['upper_band'] = df['middle_band'] + (std_dev * df['std'])
|
|
df['lower_band'] = df['middle_band'] - (std_dev * df['std'])
|
|
|
|
# Calculate bandwidth and %B
|
|
df['bandwidth'] = (df['upper_band'] - df['lower_band']) / df['middle_band']
|
|
df['percent_b'] = (df[price_column] - df['lower_band']) / (df['upper_band'] - df['lower_band'])
|
|
|
|
# Convert results back to IndicatorResult objects
|
|
results = []
|
|
for timestamp, row in df.iterrows():
|
|
if not pd.isna(row['middle_band']):
|
|
result = IndicatorResult(
|
|
timestamp=timestamp,
|
|
symbol=row['symbol'],
|
|
timeframe=row['timeframe'],
|
|
values={
|
|
'upper_band': row['upper_band'],
|
|
'middle_band': row['middle_band'],
|
|
'lower_band': row['lower_band'],
|
|
'bandwidth': row['bandwidth'],
|
|
'percent_b': row['percent_b']
|
|
},
|
|
metadata={
|
|
'period': period,
|
|
'std_dev': std_dev,
|
|
'price_column': price_column
|
|
}
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def calculate_multiple_indicators(self, candles: List[OHLCVCandle],
|
|
indicators_config: Dict[str, Dict[str, Any]]) -> Dict[str, List[IndicatorResult]]:
|
|
"""
|
|
Calculate multiple indicators at once for efficiency.
|
|
|
|
Args:
|
|
candles: List of OHLCV candles
|
|
indicators_config: Configuration for indicators to calculate
|
|
Example: {
|
|
'sma_20': {'type': 'sma', 'period': 20},
|
|
'ema_12': {'type': 'ema', 'period': 12},
|
|
'rsi_14': {'type': 'rsi', 'period': 14},
|
|
'macd': {'type': 'macd'},
|
|
'bb_20': {'type': 'bollinger_bands', 'period': 20}
|
|
}
|
|
|
|
Returns:
|
|
Dictionary mapping indicator names to their results
|
|
"""
|
|
results = {}
|
|
|
|
for indicator_name, config in indicators_config.items():
|
|
indicator_type = config.get('type')
|
|
|
|
try:
|
|
if indicator_type == 'sma':
|
|
period = config.get('period', 20)
|
|
price_column = config.get('price_column', 'close')
|
|
results[indicator_name] = self.sma(candles, period, price_column)
|
|
|
|
elif indicator_type == 'ema':
|
|
period = config.get('period', 20)
|
|
price_column = config.get('price_column', 'close')
|
|
results[indicator_name] = self.ema(candles, period, price_column)
|
|
|
|
elif indicator_type == 'rsi':
|
|
period = config.get('period', 14)
|
|
price_column = config.get('price_column', 'close')
|
|
results[indicator_name] = self.rsi(candles, period, price_column)
|
|
|
|
elif indicator_type == 'macd':
|
|
fast_period = config.get('fast_period', 12)
|
|
slow_period = config.get('slow_period', 26)
|
|
signal_period = config.get('signal_period', 9)
|
|
price_column = config.get('price_column', 'close')
|
|
results[indicator_name] = self.macd(candles, fast_period, slow_period, signal_period, price_column)
|
|
|
|
elif indicator_type == 'bollinger_bands':
|
|
period = config.get('period', 20)
|
|
std_dev = config.get('std_dev', 2.0)
|
|
price_column = config.get('price_column', 'close')
|
|
results[indicator_name] = self.bollinger_bands(candles, period, std_dev, price_column)
|
|
|
|
else:
|
|
if self.logger:
|
|
self.logger.warning(f"TechnicalIndicators: Unknown indicator type: {indicator_type}")
|
|
results[indicator_name] = []
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"TechnicalIndicators: Error calculating {indicator_name}: {e}")
|
|
results[indicator_name] = []
|
|
|
|
return results
|
|
|
|
def calculate(self, indicator_type: str, candles: Union[pd.DataFrame, List[OHLCVCandle]], **kwargs) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Calculate a single indicator with dynamic dispatch.
|
|
|
|
Args:
|
|
indicator_type: Name of the indicator (e.g., 'sma', 'ema')
|
|
candles: List of OHLCV candles or a pre-prepared DataFrame
|
|
**kwargs: Indicator-specific parameters (e.g., period=20)
|
|
|
|
Returns:
|
|
A dictionary containing the indicator results, or None if the type is unknown.
|
|
"""
|
|
# Get the indicator calculation method
|
|
indicator_method = getattr(self, indicator_type, None)
|
|
if not indicator_method:
|
|
if self.logger:
|
|
self.logger.error(f"TechnicalIndicators: Unknown indicator type '{indicator_type}'")
|
|
return None
|
|
|
|
try:
|
|
# Prepare DataFrame if input is a list of candles
|
|
if isinstance(candles, list):
|
|
df = self._prepare_dataframe_from_list(candles)
|
|
elif isinstance(candles, pd.DataFrame):
|
|
df = candles
|
|
else:
|
|
raise TypeError("Input 'candles' must be a list of OHLCVCandle objects or a pandas DataFrame.")
|
|
|
|
if df.empty:
|
|
return {'data': [], 'metadata': {}}
|
|
|
|
# Call the indicator method
|
|
raw_result = indicator_method(df, **kwargs)
|
|
|
|
# Extract metadata from the first result if available
|
|
metadata = raw_result[0].metadata if raw_result else {}
|
|
|
|
# The methods return List[IndicatorResult], let's package that
|
|
if raw_result:
|
|
return {
|
|
"data": raw_result,
|
|
"metadata": metadata
|
|
}
|
|
return None
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"TechnicalIndicators: Error calculating {indicator_type}: {e}")
|
|
return None
|
|
|
|
|
|
def create_default_indicators_config() -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Create default configuration for common technical indicators.
|
|
|
|
Returns:
|
|
Dictionary with default indicator configurations
|
|
"""
|
|
return {
|
|
'sma_20': {'type': 'sma', 'period': 20},
|
|
'sma_50': {'type': 'sma', 'period': 50},
|
|
'ema_12': {'type': 'ema', 'period': 12},
|
|
'ema_26': {'type': 'ema', 'period': 26},
|
|
'rsi_14': {'type': 'rsi', 'period': 14},
|
|
'macd_default': {'type': 'macd'},
|
|
'bollinger_bands_20': {'type': 'bollinger_bands', 'period': 20}
|
|
}
|
|
|
|
|
|
def validate_indicator_config(config: Dict[str, Any]) -> bool:
|
|
"""
|
|
Validate technical indicator configuration.
|
|
|
|
Args:
|
|
config: Indicator configuration dictionary
|
|
|
|
Returns:
|
|
True if configuration is valid, False otherwise
|
|
"""
|
|
required_fields = ['type']
|
|
|
|
# Check required fields
|
|
for field in required_fields:
|
|
if field not in config:
|
|
return False
|
|
|
|
# Validate indicator type
|
|
valid_types = ['sma', 'ema', 'rsi', 'macd', 'bollinger_bands']
|
|
if config['type'] not in valid_types:
|
|
return False
|
|
|
|
# Validate period fields
|
|
if 'period' in config and (not isinstance(config['period'], int) or config['period'] <= 0):
|
|
return False
|
|
|
|
# Validate standard deviation for Bollinger Bands
|
|
if config['type'] == 'bollinger_bands' and 'std_dev' in config:
|
|
if not isinstance(config['std_dev'], (int, float)) or config['std_dev'] <= 0:
|
|
return False
|
|
|
|
return True |