indicators refactor

This commit is contained in:
Ajasra 2025-06-07 14:01:20 +08:00
parent b29af1e0e6
commit d92a48cd7e
8 changed files with 528 additions and 190 deletions

View File

@ -0,0 +1,106 @@
"""
Base classes and interfaces for technical indicators.
This module provides the foundation for all technical indicators
with common functionality and type definitions.
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any
import pandas as pd
from utils.logger import get_logger
from .result import IndicatorResult
from ..data_types import OHLCVCandle
class BaseIndicator(ABC):
"""
Abstract base class for all technical indicators.
Provides common functionality and enforces consistent interface
across all indicator implementations.
"""
def __init__(self, logger=None):
"""
Initialize base indicator.
Args:
logger: Optional logger instance
"""
if logger is None:
self.logger = get_logger(__name__)
self.logger = logger
def prepare_dataframe(self, candles: List[OHLCVCandle]) -> pd.DataFrame:
"""
Convert OHLCV candles to pandas DataFrame for calculations.
Args:
candles: List of OHLCV candles (can be sparse)
Returns:
DataFrame with OHLCV data, sorted by timestamp
"""
if not candles:
return pd.DataFrame()
# Convert to DataFrame
data = []
for candle in candles:
data.append({
'timestamp': candle.end_time, # Right-aligned timestamp
'symbol': candle.symbol,
'timeframe': candle.timeframe,
'open': float(candle.open),
'high': float(candle.high),
'low': float(candle.low),
'close': float(candle.close),
'volume': float(candle.volume),
'trade_count': candle.trade_count
})
df = pd.DataFrame(data)
# Sort by timestamp to ensure proper order
df = df.sort_values('timestamp').reset_index(drop=True)
# Set timestamp as index for time-series operations
df.set_index('timestamp', inplace=True)
return df
@abstractmethod
def calculate(self, df: pd.DataFrame, **kwargs) -> List[IndicatorResult]:
"""
Calculate the indicator values.
Args:
df: DataFrame with OHLCV data
**kwargs: Additional parameters specific to each indicator
Returns:
List of indicator results
"""
pass
def validate_dataframe(self, df: pd.DataFrame, min_periods: int) -> bool:
"""
Validate that DataFrame has sufficient data for calculation.
Args:
df: DataFrame to validate
min_periods: Minimum number of periods required
Returns:
True if DataFrame is valid, False otherwise
"""
if df.empty or len(df) < min_periods:
if self.logger:
self.logger.warning(
f"Insufficient data: got {len(df)} periods, need {min_periods}"
)
return False
return True

View File

@ -0,0 +1,20 @@
"""
Technical indicator implementations package.
This package contains individual implementations of technical indicators,
each in its own module for better maintainability and separation of concerns.
"""
from .sma import SMAIndicator
from .ema import EMAIndicator
from .rsi import RSIIndicator
from .macd import MACDIndicator
from .bollinger import BollingerBandsIndicator
__all__ = [
'SMAIndicator',
'EMAIndicator',
'RSIIndicator',
'MACDIndicator',
'BollingerBandsIndicator'
]

View File

@ -0,0 +1,81 @@
"""
Bollinger Bands indicator implementation.
"""
from typing import List
import pandas as pd
from ..base import BaseIndicator
from ..result import IndicatorResult
class BollingerBandsIndicator(BaseIndicator):
"""
Bollinger Bands technical indicator.
Calculates a set of lines plotted two standard deviations away from a simple moving average.
Handles sparse data appropriately without interpolation.
"""
def calculate(self, df: pd.DataFrame, period: int = 20,
std_dev: float = 2.0, price_column: str = 'close') -> List[IndicatorResult]:
"""
Calculate Bollinger Bands.
Args:
df: DataFrame with OHLCV data
period: Number of periods for moving average (default 20)
std_dev: Number of standard deviations (default 2.0)
price_column: Price column to use ('open', 'high', 'low', 'close')
Returns:
List of indicator results with upper band, middle band (SMA), and lower band
"""
# Validate input data
if not self.validate_dataframe(df, period):
return []
try:
# Calculate middle band (SMA)
df['middle_band'] = df[price_column].rolling(window=period, min_periods=period).mean()
# Calculate standard deviation
df['std'] = df[price_column].rolling(window=period, min_periods=period).std()
# Calculate upper and lower bands
df['upper_band'] = df['middle_band'] + (std_dev * df['std'])
df['lower_band'] = df['middle_band'] - (std_dev * df['std'])
# Calculate bandwidth and %B
df['bandwidth'] = (df['upper_band'] - df['lower_band']) / df['middle_band']
df['percent_b'] = (df[price_column] - df['lower_band']) / (df['upper_band'] - df['lower_band'])
# Convert results to IndicatorResult objects
results = []
for timestamp, row in df.iterrows():
if not pd.isna(row['middle_band']):
result = IndicatorResult(
timestamp=timestamp,
symbol=row['symbol'],
timeframe=row['timeframe'],
values={
'upper_band': row['upper_band'],
'middle_band': row['middle_band'],
'lower_band': row['lower_band'],
'bandwidth': row['bandwidth'],
'percent_b': row['percent_b']
},
metadata={
'period': period,
'std_dev': std_dev,
'price_column': price_column
}
)
results.append(result)
return results
except Exception as e:
if self.logger:
self.logger.error(f"Error calculating Bollinger Bands: {e}")
return []

View File

@ -0,0 +1,60 @@
"""
Exponential Moving Average (EMA) indicator implementation.
"""
from typing import List
import pandas as pd
from ..base import BaseIndicator
from ..result import IndicatorResult
class EMAIndicator(BaseIndicator):
"""
Exponential Moving Average (EMA) technical indicator.
Calculates weighted moving average giving more weight to recent prices.
Handles sparse data appropriately without interpolation.
"""
def calculate(self, df: pd.DataFrame, period: int = 20,
price_column: str = 'close') -> List[IndicatorResult]:
"""
Calculate Exponential Moving Average (EMA).
Args:
df: DataFrame with OHLCV data
period: Number of periods for moving average (default: 20)
price_column: Price column to use ('open', 'high', 'low', 'close')
Returns:
List of indicator results with EMA values
"""
# Validate input data
if not self.validate_dataframe(df, period):
return []
try:
# Calculate EMA using pandas exponential weighted moving average
df['ema'] = df[price_column].ewm(span=period, adjust=False).mean()
# Convert results to IndicatorResult objects
results = []
for i, (timestamp, row) in enumerate(df.iterrows()):
# Only return results after minimum period
if i >= period - 1 and not pd.isna(row['ema']):
result = IndicatorResult(
timestamp=timestamp,
symbol=row['symbol'],
timeframe=row['timeframe'],
values={'ema': row['ema']},
metadata={'period': period, 'price_column': price_column}
)
results.append(result)
return results
except Exception as e:
if self.logger:
self.logger.error(f"Error calculating EMA: {e}")
return []

View File

@ -0,0 +1,84 @@
"""
Moving Average Convergence Divergence (MACD) indicator implementation.
"""
from typing import List
import pandas as pd
from ..base import BaseIndicator
from ..result import IndicatorResult
class MACDIndicator(BaseIndicator):
"""
Moving Average Convergence Divergence (MACD) technical indicator.
Calculates trend-following momentum indicator that shows the relationship
between two moving averages of a security's price.
Handles sparse data appropriately without interpolation.
"""
def calculate(self, df: pd.DataFrame, fast_period: int = 12,
slow_period: int = 26, signal_period: int = 9,
price_column: str = 'close') -> List[IndicatorResult]:
"""
Calculate Moving Average Convergence Divergence (MACD).
Args:
df: DataFrame with OHLCV data
fast_period: Fast EMA period (default 12)
slow_period: Slow EMA period (default 26)
signal_period: Signal line EMA period (default 9)
price_column: Price column to use ('open', 'high', 'low', 'close')
Returns:
List of indicator results with MACD, signal, and histogram values
"""
# Validate input data
if not self.validate_dataframe(df, slow_period):
return []
try:
# Calculate fast and slow EMAs
df['ema_fast'] = df[price_column].ewm(span=fast_period, adjust=False).mean()
df['ema_slow'] = df[price_column].ewm(span=slow_period, adjust=False).mean()
# Calculate MACD line
df['macd'] = df['ema_fast'] - df['ema_slow']
# Calculate signal line (EMA of MACD)
df['signal'] = df['macd'].ewm(span=signal_period, adjust=False).mean()
# Calculate histogram
df['histogram'] = df['macd'] - df['signal']
# Convert results to IndicatorResult objects
results = []
for i, (timestamp, row) in enumerate(df.iterrows()):
# Only return results after minimum period
if i >= slow_period - 1:
if not (pd.isna(row['macd']) or pd.isna(row['signal']) or pd.isna(row['histogram'])):
result = IndicatorResult(
timestamp=timestamp,
symbol=row['symbol'],
timeframe=row['timeframe'],
values={
'macd': row['macd'],
'signal': row['signal'],
'histogram': row['histogram']
},
metadata={
'fast_period': fast_period,
'slow_period': slow_period,
'signal_period': signal_period,
'price_column': price_column
}
)
results.append(result)
return results
except Exception as e:
if self.logger:
self.logger.error(f"Error calculating MACD: {e}")
return []

View File

@ -0,0 +1,75 @@
"""
Relative Strength Index (RSI) indicator implementation.
"""
from typing import List
import pandas as pd
from ..base import BaseIndicator
from ..result import IndicatorResult
class RSIIndicator(BaseIndicator):
"""
Relative Strength Index (RSI) technical indicator.
Measures momentum by comparing the magnitude of recent gains to recent losses.
Handles sparse data appropriately without interpolation.
"""
def calculate(self, df: pd.DataFrame, period: int = 14,
price_column: str = 'close') -> List[IndicatorResult]:
"""
Calculate Relative Strength Index (RSI).
Args:
df: DataFrame with OHLCV data
period: Number of periods for RSI calculation (default: 14)
price_column: Price column to use ('open', 'high', 'low', 'close')
Returns:
List of indicator results with RSI values
"""
# Validate input data
if not self.validate_dataframe(df, period + 1): # Need extra period for diff
return []
try:
# Calculate price changes
df['price_change'] = df[price_column].diff()
# Separate gains and losses
df['gain'] = df['price_change'].where(df['price_change'] > 0, 0)
df['loss'] = (-df['price_change']).where(df['price_change'] < 0, 0)
# Calculate average gain and loss using EMA
df['avg_gain'] = df['gain'].ewm(span=period, adjust=False).mean()
df['avg_loss'] = df['loss'].ewm(span=period, adjust=False).mean()
# Calculate RS and RSI
df['rs'] = df['avg_gain'] / df['avg_loss']
df['rsi'] = 100 - (100 / (1 + df['rs']))
# Handle division by zero
df['rsi'] = df['rsi'].fillna(50) # Neutral RSI when no losses
# Convert results to IndicatorResult objects
results = []
for i, (timestamp, row) in enumerate(df.iterrows()):
# Only return results after minimum period
if i >= period and not pd.isna(row['rsi']):
result = IndicatorResult(
timestamp=timestamp,
symbol=row['symbol'],
timeframe=row['timeframe'],
values={'rsi': row['rsi']},
metadata={'period': period, 'price_column': price_column}
)
results.append(result)
return results
except Exception as e:
if self.logger:
self.logger.error(f"Error calculating RSI: {e}")
return []

View File

@ -0,0 +1,59 @@
"""
Simple Moving Average (SMA) indicator implementation.
"""
from typing import List
import pandas as pd
from ..base import BaseIndicator
from ..result import IndicatorResult
class SMAIndicator(BaseIndicator):
"""
Simple Moving Average (SMA) technical indicator.
Calculates the unweighted mean of previous n periods.
Handles sparse data appropriately without interpolation.
"""
def calculate(self, df: pd.DataFrame, period: int = 20,
price_column: str = 'close') -> List[IndicatorResult]:
"""
Calculate Simple Moving Average (SMA).
Args:
df: DataFrame with OHLCV data
period: Number of periods for moving average (default: 20)
price_column: Price column to use ('open', 'high', 'low', 'close')
Returns:
List of indicator results with SMA values
"""
# Validate input data
if not self.validate_dataframe(df, period):
return []
try:
# Calculate SMA using pandas rolling window
df['sma'] = df[price_column].rolling(window=period, min_periods=period).mean()
# Convert results to IndicatorResult objects
results = []
for timestamp, row in df.iterrows():
if not pd.isna(row['sma']):
result = IndicatorResult(
timestamp=timestamp,
symbol=row['symbol'],
timeframe=row['timeframe'],
values={'sma': row['sma']},
metadata={'period': period, 'price_column': price_column}
)
results.append(result)
return results
except Exception as e:
if self.logger:
self.logger.error(f"Error calculating SMA: {e}")
return []

View File

@ -25,6 +25,14 @@ import numpy as np
from .result import IndicatorResult
from ..data_types import OHLCVCandle
from .base import BaseIndicator
from .implementations import (
SMAIndicator,
EMAIndicator,
RSIIndicator,
MACDIndicator,
BollingerBandsIndicator
)
class TechnicalIndicators:
@ -51,6 +59,13 @@ class TechnicalIndicators:
"""
self.logger = logger
# Initialize individual indicator calculators
self._sma = SMAIndicator(logger)
self._ema = EMAIndicator(logger)
self._rsi = RSIIndicator(logger)
self._macd = MACDIndicator(logger)
self._bollinger = BollingerBandsIndicator(logger)
if self.logger:
self.logger.info("TechnicalIndicators: Initialized indicator calculator")
@ -66,31 +81,8 @@ class TechnicalIndicators:
"""
if not candles:
return pd.DataFrame()
# Convert to DataFrame
data = []
for candle in candles:
data.append({
'timestamp': candle.end_time, # Right-aligned timestamp
'symbol': candle.symbol,
'timeframe': candle.timeframe,
'open': float(candle.open),
'high': float(candle.high),
'low': float(candle.low),
'close': float(candle.close),
'volume': float(candle.volume),
'trade_count': candle.trade_count
})
df = pd.DataFrame(data)
# Sort by timestamp to ensure proper order
df = df.sort_values('timestamp').reset_index(drop=True)
# Set timestamp as index for time-series operations
df.set_index('timestamp', inplace=True)
return df
return self._sma.prepare_dataframe(candles)
def sma(self, df: pd.DataFrame, period: int,
price_column: str = 'close') -> List[IndicatorResult]:
@ -105,26 +97,7 @@ class TechnicalIndicators:
Returns:
List of indicator results with SMA values
"""
if df.empty or len(df) < period:
return []
# Calculate SMA using pandas rolling window
df['sma'] = df[price_column].rolling(window=period, min_periods=period).mean()
# Convert results back to IndicatorResult objects
results = []
for timestamp, row in df.iterrows():
if not pd.isna(row['sma']):
result = IndicatorResult(
timestamp=timestamp,
symbol=row['symbol'],
timeframe=row['timeframe'],
values={'sma': row['sma']},
metadata={'period': period, 'price_column': price_column}
)
results.append(result)
return results
return self._sma.calculate(df, period=period, price_column=price_column)
def ema(self, df: pd.DataFrame, period: int,
price_column: str = 'close') -> List[IndicatorResult]:
@ -139,27 +112,7 @@ class TechnicalIndicators:
Returns:
List of indicator results with EMA values
"""
if df.empty or len(df) < period:
return []
# Calculate EMA using pandas exponential weighted moving average
df['ema'] = df[price_column].ewm(span=period, adjust=False).mean()
# Convert results back to IndicatorResult objects
results = []
for i, (timestamp, row) in enumerate(df.iterrows()):
# Only return results after minimum period
if i >= period - 1 and not pd.isna(row['ema']):
result = IndicatorResult(
timestamp=timestamp,
symbol=row['symbol'],
timeframe=row['timeframe'],
values={'ema': row['ema']},
metadata={'period': period, 'price_column': price_column}
)
results.append(result)
return results
return self._ema.calculate(df, period=period, price_column=price_column)
def rsi(self, df: pd.DataFrame, period: int = 14,
price_column: str = 'close') -> List[IndicatorResult]:
@ -174,42 +127,7 @@ class TechnicalIndicators:
Returns:
List of indicator results with RSI values
"""
if df.empty or len(df) < period + 1:
return []
# Calculate price changes
df['price_change'] = df[price_column].diff()
# Separate gains and losses
df['gain'] = df['price_change'].where(df['price_change'] > 0, 0)
df['loss'] = (-df['price_change']).where(df['price_change'] < 0, 0)
# Calculate average gain and loss using EMA
df['avg_gain'] = df['gain'].ewm(span=period, adjust=False).mean()
df['avg_loss'] = df['loss'].ewm(span=period, adjust=False).mean()
# Calculate RS and RSI
df['rs'] = df['avg_gain'] / df['avg_loss']
df['rsi'] = 100 - (100 / (1 + df['rs']))
# Handle division by zero
df['rsi'] = df['rsi'].fillna(50) # Neutral RSI when no losses
# Convert results back to IndicatorResult objects
results = []
for i, (timestamp, row) in enumerate(df.iterrows()):
# Only return results after minimum period
if i >= period and not pd.isna(row['rsi']):
result = IndicatorResult(
timestamp=timestamp,
symbol=row['symbol'],
timeframe=row['timeframe'],
values={'rsi': row['rsi']},
metadata={'period': period, 'price_column': price_column}
)
results.append(result)
return results
return self._rsi.calculate(df, period=period, price_column=price_column)
def macd(self, df: pd.DataFrame,
fast_period: int = 12, slow_period: int = 26, signal_period: int = 9,
@ -227,47 +145,13 @@ class TechnicalIndicators:
Returns:
List of indicator results with MACD, signal, and histogram values
"""
if df.empty or len(df) < slow_period:
return []
# Calculate fast and slow EMAs
df['ema_fast'] = df[price_column].ewm(span=fast_period, adjust=False).mean()
df['ema_slow'] = df[price_column].ewm(span=slow_period, adjust=False).mean()
# Calculate MACD line
df['macd'] = df['ema_fast'] - df['ema_slow']
# Calculate signal line (EMA of MACD)
df['signal'] = df['macd'].ewm(span=signal_period, adjust=False).mean()
# Calculate histogram
df['histogram'] = df['macd'] - df['signal']
# Convert results back to IndicatorResult objects
results = []
for i, (timestamp, row) in enumerate(df.iterrows()):
# Only return results after minimum period
if i >= slow_period - 1:
if not (pd.isna(row['macd']) or pd.isna(row['signal']) or pd.isna(row['histogram'])):
result = IndicatorResult(
timestamp=timestamp,
symbol=row['symbol'],
timeframe=row['timeframe'],
values={
'macd': row['macd'],
'signal': row['signal'],
'histogram': row['histogram']
},
metadata={
'fast_period': fast_period,
'slow_period': slow_period,
'signal_period': signal_period,
'price_column': price_column
}
)
results.append(result)
return results
return self._macd.calculate(
df,
fast_period=fast_period,
slow_period=slow_period,
signal_period=signal_period,
price_column=price_column
)
def bollinger_bands(self, df: pd.DataFrame, period: int = 20,
std_dev: float = 2.0, price_column: str = 'close') -> List[IndicatorResult]:
@ -283,47 +167,12 @@ class TechnicalIndicators:
Returns:
List of indicator results with upper band, middle band (SMA), and lower band
"""
if df.empty or len(df) < period:
return []
# Calculate middle band (SMA)
df['middle_band'] = df[price_column].rolling(window=period, min_periods=period).mean()
# Calculate standard deviation
df['std'] = df[price_column].rolling(window=period, min_periods=period).std()
# Calculate upper and lower bands
df['upper_band'] = df['middle_band'] + (std_dev * df['std'])
df['lower_band'] = df['middle_band'] - (std_dev * df['std'])
# Calculate bandwidth and %B
df['bandwidth'] = (df['upper_band'] - df['lower_band']) / df['middle_band']
df['percent_b'] = (df[price_column] - df['lower_band']) / (df['upper_band'] - df['lower_band'])
# Convert results back to IndicatorResult objects
results = []
for timestamp, row in df.iterrows():
if not pd.isna(row['middle_band']):
result = IndicatorResult(
timestamp=timestamp,
symbol=row['symbol'],
timeframe=row['timeframe'],
values={
'upper_band': row['upper_band'],
'middle_band': row['middle_band'],
'lower_band': row['lower_band'],
'bandwidth': row['bandwidth'],
'percent_b': row['percent_b']
},
metadata={
'period': period,
'std_dev': std_dev,
'price_column': price_column
}
)
results.append(result)
return results
return self._bollinger.calculate(
df,
period=period,
std_dev=std_dev,
price_column=price_column
)
def calculate_multiple_indicators(self, df: pd.DataFrame,
indicators_config: Dict[str, Dict[str, Any]]) -> Dict[str, List[IndicatorResult]]:
@ -370,22 +219,26 @@ class TechnicalIndicators:
slow_period = config.get('slow_period', 26)
signal_period = config.get('signal_period', 9)
price_column = config.get('price_column', 'close')
results[indicator_name] = self.macd(df, fast_period, slow_period, signal_period, price_column)
results[indicator_name] = self.macd(
df, fast_period, slow_period, signal_period, price_column
)
elif indicator_type == 'bollinger_bands':
period = config.get('period', 20)
std_dev = config.get('std_dev', 2.0)
price_column = config.get('price_column', 'close')
results[indicator_name] = self.bollinger_bands(df, period, std_dev, price_column)
results[indicator_name] = self.bollinger_bands(
df, period, std_dev, price_column
)
else:
if self.logger:
self.logger.warning(f"TechnicalIndicators: Unknown indicator type: {indicator_type}")
self.logger.warning(f"Unknown indicator type: {indicator_type}")
results[indicator_name] = []
except Exception as e:
if self.logger:
self.logger.error(f"TechnicalIndicators: Error calculating {indicator_name}: {e}")
self.logger.error(f"Error calculating {indicator_name}: {e}")
results[indicator_name] = []
return results
@ -406,7 +259,7 @@ class TechnicalIndicators:
indicator_method = getattr(self, indicator_type, None)
if not indicator_method:
if self.logger:
self.logger.error(f"TechnicalIndicators: Unknown indicator type '{indicator_type}'")
self.logger.error(f"Unknown indicator type '{indicator_type}'")
return None
try:
@ -429,5 +282,5 @@ class TechnicalIndicators:
except Exception as e:
if self.logger:
self.logger.error(f"TechnicalIndicators: Error calculating {indicator_type}: {e}")
self.logger.error(f"Error calculating {indicator_type}: {e}")
return None