From 24b6a3feed684a5db24476700c44f3cb92d620e2 Mon Sep 17 00:00:00 2001 From: "Vasily.onl" Date: Mon, 2 Jun 2025 13:42:00 +0800 Subject: [PATCH] Add technical indicators module for OHLCV data analysis - Introduced `indicators.py` containing implementations for SMA, EMA, RSI, MACD, and Bollinger Bands, optimized for handling sparse OHLCV data. - Added `IndicatorResult` dataclass to encapsulate results of indicator calculations. - Implemented methods for calculating multiple indicators efficiently with JSON configuration support and validation. - Updated `__init__.py` to include new indicators in the module's exports. - Enhanced documentation to cover the new technical indicators module, including usage examples and integration details. - Added comprehensive unit tests to ensure accuracy and robustness of the indicators module. --- data/common/__init__.py | 15 +- data/common/indicators.py | 468 ++++++++++++++++++++++++ docs/README.md | 7 + docs/components/README.md | 12 + docs/components/technical-indicators.md | 319 ++++++++++++++++ tasks/tasks-crypto-bot-prd.md | 5 +- tests/test_indicators.py | 360 ++++++++++++++++++ 7 files changed, 1184 insertions(+), 2 deletions(-) create mode 100644 data/common/indicators.py create mode 100644 docs/components/technical-indicators.md create mode 100644 tests/test_indicators.py diff --git a/data/common/__init__.py b/data/common/__init__.py index 143ad08..d41ce5c 100644 --- a/data/common/__init__.py +++ b/data/common/__init__.py @@ -29,6 +29,13 @@ from .validation import ( ValidationResult ) +from .indicators import ( + TechnicalIndicators, + IndicatorResult, + create_default_indicators_config, + validate_indicator_config +) + __all__ = [ # Data types 'StandardizedTrade', @@ -48,5 +55,11 @@ __all__ = [ # Validation 'BaseDataValidator', - 'ValidationResult' + 'ValidationResult', + + # Technical Indicators + 'TechnicalIndicators', + 'IndicatorResult', + 'create_default_indicators_config', + 'validate_indicator_config' ] \ No newline at end of file diff --git a/data/common/indicators.py b/data/common/indicators.py new file mode 100644 index 0000000..8cc9bfe --- /dev/null +++ b/data/common/indicators.py @@ -0,0 +1,468 @@ +""" +Technical Indicators Module for OHLCV Data + +This module provides technical indicator calculations optimized for sparse OHLCV data +as produced by the TCP Trading Platform's aggregation strategy. + +IMPORTANT: Handles Sparse Data +- Missing candles (time gaps) are normal in this system +- Indicators properly handle gaps without interpolation +- Uses pandas for efficient vectorized calculations +- Follows right-aligned timestamp convention + +Supported Indicators: +- Simple Moving Average (SMA) +- Exponential Moving Average (EMA) +- Relative Strength Index (RSI) +- Moving Average Convergence Divergence (MACD) +- Bollinger Bands +""" + +from datetime import datetime, timedelta +from decimal import Decimal +from typing import Dict, List, Optional, Any, Union, Tuple +import pandas as pd +import numpy as np +from dataclasses import dataclass + +from .data_types import OHLCVCandle + + +@dataclass +class IndicatorResult: + """ + Container for technical indicator calculation results. + + Attributes: + timestamp: Candle timestamp (right-aligned) + symbol: Trading symbol + timeframe: Candle timeframe + values: Dictionary of indicator values + metadata: Additional calculation metadata + """ + timestamp: datetime + symbol: str + timeframe: str + values: Dict[str, float] + metadata: Optional[Dict[str, Any]] = None + + +class TechnicalIndicators: + """ + Technical indicator calculator for OHLCV candle data. + + This class provides vectorized technical indicator calculations + designed to handle sparse data efficiently. All calculations use + pandas for performance and handle missing data appropriately. + + SPARSE DATA HANDLING: + - Gaps in timestamps are preserved (no interpolation) + - Indicators calculate only on available data points + - Periods with insufficient data return NaN + - Results maintain original timestamp alignment + """ + + def __init__(self, logger=None): + """ + Initialize technical indicators calculator. + + Args: + logger: Optional logger instance + """ + self.logger = logger + + if self.logger: + self.logger.info("TechnicalIndicators: Initialized indicator calculator") + + def prepare_dataframe(self, candles: List[OHLCVCandle]) -> pd.DataFrame: + """ + Convert OHLCV candles to pandas DataFrame for efficient calculations. + + Args: + candles: List of OHLCV candles (can be sparse) + + Returns: + DataFrame with OHLCV data, sorted by timestamp + """ + if not candles: + return pd.DataFrame() + + # Convert to DataFrame + data = [] + for candle in candles: + data.append({ + 'timestamp': candle.end_time, # Right-aligned timestamp + 'symbol': candle.symbol, + 'timeframe': candle.timeframe, + 'open': float(candle.open), + 'high': float(candle.high), + 'low': float(candle.low), + 'close': float(candle.close), + 'volume': float(candle.volume), + 'trade_count': candle.trade_count + }) + + df = pd.DataFrame(data) + + # Sort by timestamp to ensure proper order + df = df.sort_values('timestamp').reset_index(drop=True) + + # Set timestamp as index for time-series operations + df.set_index('timestamp', inplace=True) + + return df + + def sma(self, candles: List[OHLCVCandle], period: int, + price_column: str = 'close') -> List[IndicatorResult]: + """ + Calculate Simple Moving Average (SMA). + + Args: + candles: List of OHLCV candles + period: Number of periods for moving average + price_column: Price column to use ('open', 'high', 'low', 'close') + + Returns: + List of indicator results with SMA values + """ + df = self.prepare_dataframe(candles) + if df.empty or len(df) < period: + return [] + + # Calculate SMA using pandas rolling window + df['sma'] = df[price_column].rolling(window=period, min_periods=period).mean() + + # Convert results back to IndicatorResult objects + results = [] + for timestamp, row in df.iterrows(): + if not pd.isna(row['sma']): + result = IndicatorResult( + timestamp=timestamp, + symbol=row['symbol'], + timeframe=row['timeframe'], + values={'sma': row['sma']}, + metadata={'period': period, 'price_column': price_column} + ) + results.append(result) + + return results + + def ema(self, candles: List[OHLCVCandle], period: int, + price_column: str = 'close') -> List[IndicatorResult]: + """ + Calculate Exponential Moving Average (EMA). + + Args: + candles: List of OHLCV candles + period: Number of periods for moving average + price_column: Price column to use ('open', 'high', 'low', 'close') + + Returns: + List of indicator results with EMA values + """ + df = self.prepare_dataframe(candles) + if df.empty or len(df) < period: + return [] + + # Calculate EMA using pandas exponential weighted moving average + df['ema'] = df[price_column].ewm(span=period, adjust=False).mean() + + # Convert results back to IndicatorResult objects + results = [] + for i, (timestamp, row) in enumerate(df.iterrows()): + # Only return results after minimum period + if i >= period - 1 and not pd.isna(row['ema']): + result = IndicatorResult( + timestamp=timestamp, + symbol=row['symbol'], + timeframe=row['timeframe'], + values={'ema': row['ema']}, + metadata={'period': period, 'price_column': price_column} + ) + results.append(result) + + return results + + def rsi(self, candles: List[OHLCVCandle], period: int = 14, + price_column: str = 'close') -> List[IndicatorResult]: + """ + Calculate Relative Strength Index (RSI). + + Args: + candles: List of OHLCV candles + period: Number of periods for RSI calculation (default 14) + price_column: Price column to use ('open', 'high', 'low', 'close') + + Returns: + List of indicator results with RSI values + """ + df = self.prepare_dataframe(candles) + if df.empty or len(df) < period + 1: + return [] + + # Calculate price changes + df['price_change'] = df[price_column].diff() + + # Separate gains and losses + df['gain'] = df['price_change'].where(df['price_change'] > 0, 0) + df['loss'] = (-df['price_change']).where(df['price_change'] < 0, 0) + + # Calculate average gain and loss using EMA + df['avg_gain'] = df['gain'].ewm(span=period, adjust=False).mean() + df['avg_loss'] = df['loss'].ewm(span=period, adjust=False).mean() + + # Calculate RS and RSI + df['rs'] = df['avg_gain'] / df['avg_loss'] + df['rsi'] = 100 - (100 / (1 + df['rs'])) + + # Handle division by zero + df['rsi'] = df['rsi'].fillna(50) # Neutral RSI when no losses + + # Convert results back to IndicatorResult objects + results = [] + for i, (timestamp, row) in enumerate(df.iterrows()): + # Only return results after minimum period + if i >= period and not pd.isna(row['rsi']): + result = IndicatorResult( + timestamp=timestamp, + symbol=row['symbol'], + timeframe=row['timeframe'], + values={'rsi': row['rsi']}, + metadata={'period': period, 'price_column': price_column} + ) + results.append(result) + + return results + + def macd(self, candles: List[OHLCVCandle], + fast_period: int = 12, slow_period: int = 26, signal_period: int = 9, + price_column: str = 'close') -> List[IndicatorResult]: + """ + Calculate Moving Average Convergence Divergence (MACD). + + Args: + candles: List of OHLCV candles + fast_period: Fast EMA period (default 12) + slow_period: Slow EMA period (default 26) + signal_period: Signal line EMA period (default 9) + price_column: Price column to use ('open', 'high', 'low', 'close') + + Returns: + List of indicator results with MACD, signal, and histogram values + """ + df = self.prepare_dataframe(candles) + if df.empty or len(df) < slow_period + signal_period: + return [] + + # Calculate fast and slow EMAs + df['ema_fast'] = df[price_column].ewm(span=fast_period, adjust=False).mean() + df['ema_slow'] = df[price_column].ewm(span=slow_period, adjust=False).mean() + + # Calculate MACD line + df['macd'] = df['ema_fast'] - df['ema_slow'] + + # Calculate signal line (EMA of MACD) + df['signal'] = df['macd'].ewm(span=signal_period, adjust=False).mean() + + # Calculate histogram + df['histogram'] = df['macd'] - df['signal'] + + # Convert results back to IndicatorResult objects + results = [] + for i, (timestamp, row) in enumerate(df.iterrows()): + # Only return results after minimum period + if i >= slow_period + signal_period - 1: + if not (pd.isna(row['macd']) or pd.isna(row['signal']) or pd.isna(row['histogram'])): + result = IndicatorResult( + timestamp=timestamp, + symbol=row['symbol'], + timeframe=row['timeframe'], + values={ + 'macd': row['macd'], + 'signal': row['signal'], + 'histogram': row['histogram'] + }, + metadata={ + 'fast_period': fast_period, + 'slow_period': slow_period, + 'signal_period': signal_period, + 'price_column': price_column + } + ) + results.append(result) + + return results + + def bollinger_bands(self, candles: List[OHLCVCandle], period: int = 20, + std_dev: float = 2.0, price_column: str = 'close') -> List[IndicatorResult]: + """ + Calculate Bollinger Bands. + + Args: + candles: List of OHLCV candles + period: Number of periods for moving average (default 20) + std_dev: Number of standard deviations for bands (default 2.0) + price_column: Price column to use ('open', 'high', 'low', 'close') + + Returns: + List of indicator results with upper band, middle band (SMA), and lower band + """ + df = self.prepare_dataframe(candles) + if df.empty or len(df) < period: + return [] + + # Calculate middle band (SMA) + df['middle_band'] = df[price_column].rolling(window=period, min_periods=period).mean() + + # Calculate standard deviation + df['std'] = df[price_column].rolling(window=period, min_periods=period).std() + + # Calculate upper and lower bands + df['upper_band'] = df['middle_band'] + (std_dev * df['std']) + df['lower_band'] = df['middle_band'] - (std_dev * df['std']) + + # Calculate bandwidth and %B + df['bandwidth'] = (df['upper_band'] - df['lower_band']) / df['middle_band'] + df['percent_b'] = (df[price_column] - df['lower_band']) / (df['upper_band'] - df['lower_band']) + + # Convert results back to IndicatorResult objects + results = [] + for timestamp, row in df.iterrows(): + if not pd.isna(row['middle_band']): + result = IndicatorResult( + timestamp=timestamp, + symbol=row['symbol'], + timeframe=row['timeframe'], + values={ + 'upper_band': row['upper_band'], + 'middle_band': row['middle_band'], + 'lower_band': row['lower_band'], + 'bandwidth': row['bandwidth'], + 'percent_b': row['percent_b'] + }, + metadata={ + 'period': period, + 'std_dev': std_dev, + 'price_column': price_column + } + ) + results.append(result) + + return results + + def calculate_multiple_indicators(self, candles: List[OHLCVCandle], + indicators_config: Dict[str, Dict[str, Any]]) -> Dict[str, List[IndicatorResult]]: + """ + Calculate multiple indicators at once for efficiency. + + Args: + candles: List of OHLCV candles + indicators_config: Configuration for indicators to calculate + Example: { + 'sma_20': {'type': 'sma', 'period': 20}, + 'ema_12': {'type': 'ema', 'period': 12}, + 'rsi_14': {'type': 'rsi', 'period': 14}, + 'macd': {'type': 'macd'}, + 'bb_20': {'type': 'bollinger_bands', 'period': 20} + } + + Returns: + Dictionary mapping indicator names to their results + """ + results = {} + + for indicator_name, config in indicators_config.items(): + indicator_type = config.get('type') + + try: + if indicator_type == 'sma': + period = config.get('period', 20) + price_column = config.get('price_column', 'close') + results[indicator_name] = self.sma(candles, period, price_column) + + elif indicator_type == 'ema': + period = config.get('period', 20) + price_column = config.get('price_column', 'close') + results[indicator_name] = self.ema(candles, period, price_column) + + elif indicator_type == 'rsi': + period = config.get('period', 14) + price_column = config.get('price_column', 'close') + results[indicator_name] = self.rsi(candles, period, price_column) + + elif indicator_type == 'macd': + fast_period = config.get('fast_period', 12) + slow_period = config.get('slow_period', 26) + signal_period = config.get('signal_period', 9) + price_column = config.get('price_column', 'close') + results[indicator_name] = self.macd(candles, fast_period, slow_period, signal_period, price_column) + + elif indicator_type == 'bollinger_bands': + period = config.get('period', 20) + std_dev = config.get('std_dev', 2.0) + price_column = config.get('price_column', 'close') + results[indicator_name] = self.bollinger_bands(candles, period, std_dev, price_column) + + else: + if self.logger: + self.logger.warning(f"TechnicalIndicators: Unknown indicator type: {indicator_type}") + results[indicator_name] = [] + + except Exception as e: + if self.logger: + self.logger.error(f"TechnicalIndicators: Error calculating {indicator_name}: {e}") + results[indicator_name] = [] + + return results + + +def create_default_indicators_config() -> Dict[str, Dict[str, Any]]: + """ + Create default configuration for common technical indicators. + + Returns: + Dictionary with default indicator configurations + """ + return { + 'sma_20': {'type': 'sma', 'period': 20}, + 'sma_50': {'type': 'sma', 'period': 50}, + 'ema_12': {'type': 'ema', 'period': 12}, + 'ema_26': {'type': 'ema', 'period': 26}, + 'rsi_14': {'type': 'rsi', 'period': 14}, + 'macd_default': {'type': 'macd'}, + 'bollinger_bands_20': {'type': 'bollinger_bands', 'period': 20} + } + + +def validate_indicator_config(config: Dict[str, Any]) -> bool: + """ + Validate technical indicator configuration. + + Args: + config: Indicator configuration dictionary + + Returns: + True if configuration is valid, False otherwise + """ + required_fields = ['type'] + + # Check required fields + for field in required_fields: + if field not in config: + return False + + # Validate indicator type + valid_types = ['sma', 'ema', 'rsi', 'macd', 'bollinger_bands'] + if config['type'] not in valid_types: + return False + + # Validate period fields + if 'period' in config and (not isinstance(config['period'], int) or config['period'] <= 0): + return False + + # Validate standard deviation for Bollinger Bands + if config['type'] == 'bollinger_bands' and 'std_dev' in config: + if not isinstance(config['std_dev'], (int, float)) or config['std_dev'] <= 0: + return False + + return True \ No newline at end of file diff --git a/docs/README.md b/docs/README.md index 5ef48d6..8546dde 100644 --- a/docs/README.md +++ b/docs/README.md @@ -25,6 +25,13 @@ The documentation is organized into specialized sections for better navigation a - Modular Exchange Architecture for scalable implementation - Auto-restart and failure recovery mechanisms +- **[Technical Indicators](components/technical-indicators.md)** - *Technical analysis module for trading strategies* + - SMA, EMA, RSI, MACD, and Bollinger Bands calculations + - Optimized for sparse OHLCV data handling + - Vectorized calculations using pandas and numpy + - JSON configuration support with validation + - Integration with aggregation strategy + - **[Logging System](components/logging.md)** - *Unified logging framework* - Multi-level logging with automatic cleanup - Console and file output with formatting diff --git a/docs/components/README.md b/docs/components/README.md index a0971da..050b2aa 100644 --- a/docs/components/README.md +++ b/docs/components/README.md @@ -29,6 +29,18 @@ This section contains detailed technical documentation for all system components - Database health monitoring and performance statistics - Migration guide from direct SQL to repository pattern +### Technical Analysis + +- **[Technical Indicators](technical-indicators.md)** - *Comprehensive technical analysis module* + - **Five Core Indicators**: SMA, EMA, RSI, MACD, and Bollinger Bands + - **Sparse Data Handling**: Optimized for the platform's aggregation strategy + - **Vectorized Calculations**: High-performance pandas and numpy implementation + - **Flexible Configuration**: JSON-based parameter configuration with validation + - **Integration Ready**: Seamless integration with OHLCV data and real-time processing + - Batch processing for multiple indicators + - Support for different price columns (open, high, low, close) + - Comprehensive unit testing and documentation + ### Logging & Monitoring - **[Enhanced Logging System](logging.md)** - *Unified logging framework* diff --git a/docs/components/technical-indicators.md b/docs/components/technical-indicators.md new file mode 100644 index 0000000..818cc49 --- /dev/null +++ b/docs/components/technical-indicators.md @@ -0,0 +1,319 @@ +# Technical Indicators Module + +The Technical Indicators module provides comprehensive technical analysis capabilities for the TCP Trading Platform. It's designed to handle sparse OHLCV data efficiently and integrates seamlessly with the platform's aggregation strategy. + +## Overview + +The module implements five core technical indicators commonly used in trading: + +- **Simple Moving Average (SMA)** - Average price over a specified period +- **Exponential Moving Average (EMA)** - Weighted average giving more importance to recent prices +- **Relative Strength Index (RSI)** - Momentum oscillator measuring speed and change of price movements +- **Moving Average Convergence Divergence (MACD)** - Trend-following momentum indicator +- **Bollinger Bands** - Volatility indicator with upper and lower bands around a moving average + +## Key Features + +### Sparse Data Handling +- **No Interpolation**: Preserves gaps in timestamp data without artificial interpolation +- **Efficient Processing**: Uses pandas for vectorized calculations +- **Right-Aligned Timestamps**: Follows the platform's aggregation strategy convention +- **Robust Error Handling**: Gracefully handles insufficient data and edge cases + +### Performance Optimized +- **Vectorized Calculations**: Leverages pandas and numpy for fast computation +- **Batch Processing**: Calculate multiple indicators simultaneously +- **Memory Efficient**: Processes data in chunks without excessive memory usage + +### Flexible Configuration +- **JSON Configuration**: Define indicator parameters via configuration files +- **Multiple Price Columns**: Calculate indicators on open, high, low, or close prices +- **Custom Parameters**: Adjust periods, standard deviations, and other parameters +- **Validation**: Built-in configuration validation + +## Usage Examples + +### Basic Usage + +```python +from data.common.indicators import TechnicalIndicators +from data.common.data_types import OHLCVCandle + +# Initialize indicators calculator +indicators = TechnicalIndicators() + +# Calculate Simple Moving Average +sma_results = indicators.sma(candles, period=20) + +# Calculate Exponential Moving Average +ema_results = indicators.ema(candles, period=12) + +# Calculate RSI +rsi_results = indicators.rsi(candles, period=14) + +# Calculate MACD +macd_results = indicators.macd(candles, fast_period=12, slow_period=26, signal_period=9) + +# Calculate Bollinger Bands +bb_results = indicators.bollinger_bands(candles, period=20, std_dev=2.0) +``` + +### Multiple Indicators + +```python +# Define configuration for multiple indicators +config = { + 'sma_20': {'type': 'sma', 'period': 20}, + 'sma_50': {'type': 'sma', 'period': 50}, + 'ema_12': {'type': 'ema', 'period': 12}, + 'rsi_14': {'type': 'rsi', 'period': 14}, + 'macd': {'type': 'macd'}, + 'bb_20': {'type': 'bollinger_bands', 'period': 20} +} + +# Calculate all indicators at once +results = indicators.calculate_multiple_indicators(candles, config) + +# Access individual indicator results +sma_20_values = results['sma_20'] +rsi_values = results['rsi_14'] +macd_values = results['macd'] +``` + +### Using Different Price Columns + +```python +# Calculate SMA on high prices instead of close +sma_high = indicators.sma(candles, period=20, price_column='high') + +# Calculate EMA on low prices +ema_low = indicators.ema(candles, period=12, price_column='low') + +# Calculate RSI on open prices +rsi_open = indicators.rsi(candles, period=14, price_column='open') +``` + +### Default Configuration + +```python +from data.common.indicators import create_default_indicators_config + +# Get default configuration +default_config = create_default_indicators_config() + +# Calculate using defaults +results = indicators.calculate_multiple_indicators(candles, default_config) +``` + +## Indicator Details + +### Simple Moving Average (SMA) + +Calculates the arithmetic mean of prices over a specified period. + +**Parameters:** +- `period`: Number of periods (default: 20) +- `price_column`: Price column to use (default: 'close') + +**Returns:** +- `sma`: Simple moving average value + +### Exponential Moving Average (EMA) + +Calculates exponentially weighted moving average, giving more weight to recent prices. + +**Parameters:** +- `period`: Number of periods (default: 20) +- `price_column`: Price column to use (default: 'close') + +**Returns:** +- `ema`: Exponential moving average value + +### Relative Strength Index (RSI) + +Momentum oscillator that measures the speed and change of price movements. + +**Parameters:** +- `period`: Number of periods (default: 14) +- `price_column`: Price column to use (default: 'close') + +**Returns:** +- `rsi`: RSI value (0-100 range) + +### MACD (Moving Average Convergence Divergence) + +Trend-following momentum indicator showing the relationship between two moving averages. + +**Parameters:** +- `fast_period`: Fast EMA period (default: 12) +- `slow_period`: Slow EMA period (default: 26) +- `signal_period`: Signal line EMA period (default: 9) +- `price_column`: Price column to use (default: 'close') + +**Returns:** +- `macd`: MACD line (fast EMA - slow EMA) +- `signal`: Signal line (EMA of MACD) +- `histogram`: MACD histogram (MACD - Signal) + +### Bollinger Bands + +Volatility indicator consisting of a moving average and two standard deviation bands. + +**Parameters:** +- `period`: Number of periods for moving average (default: 20) +- `std_dev`: Number of standard deviations (default: 2.0) +- `price_column`: Price column to use (default: 'close') + +**Returns:** +- `upper_band`: Upper Bollinger Band +- `middle_band`: Middle band (SMA) +- `lower_band`: Lower Bollinger Band +- `bandwidth`: Band width relative to middle band +- `percent_b`: %B indicator (position within bands) + +## Data Structures + +### IndicatorResult + +Container for technical indicator calculation results. + +```python +@dataclass +class IndicatorResult: + timestamp: datetime # Right-aligned candle timestamp + symbol: str # Trading symbol (e.g., 'BTC-USDT') + timeframe: str # Candle timeframe (e.g., '1m', '5m') + values: Dict[str, float] # Indicator values + metadata: Optional[Dict[str, Any]] = None # Calculation metadata +``` + +### Configuration Format + +Indicator configurations use a standardized JSON format: + +```json +{ + "indicator_name": { + "type": "sma|ema|rsi|macd|bollinger_bands", + "period": 20, + "price_column": "close", + // Additional parameters specific to indicator type + } +} +``` + +## Integration with TCP Platform + +### Aggregation Strategy Compatibility + +The indicators module is designed to work seamlessly with the TCP platform's aggregation strategy: + +- **Right-Aligned Timestamps**: Uses `end_time` from OHLCV candles +- **Sparse Data Support**: Handles missing candles without interpolation +- **No Future Leakage**: Only processes completed candles +- **Time Boundary Respect**: Maintains proper temporal ordering + +### Real-Time Processing + +```python +from data.common.aggregation import RealTimeCandleProcessor +from data.common.indicators import TechnicalIndicators + +# Set up real-time processing +candle_processor = RealTimeCandleProcessor(symbol='BTC-USDT', exchange='okx') +indicators = TechnicalIndicators() + +# Process incoming trades and calculate indicators +def on_new_candle(candle): + # Get recent candles for indicator calculation + recent_candles = get_recent_candles(symbol='BTC-USDT', count=50) + + # Calculate indicators + sma_results = indicators.sma(recent_candles, period=20) + rsi_results = indicators.rsi(recent_candles, period=14) + + # Use indicator values for trading decisions + if sma_results and rsi_results: + latest_sma = sma_results[-1].values['sma'] + latest_rsi = rsi_results[-1].values['rsi'] + + # Trading logic here... +``` + +### Database Integration + +```python +from database.models import IndicatorData + +# Store indicator results in database +def store_indicators(indicator_results, indicator_type): + for result in indicator_results: + indicator_data = IndicatorData( + symbol=result.symbol, + timeframe=result.timeframe, + timestamp=result.timestamp, + indicator_type=indicator_type, + values=result.values, + metadata=result.metadata + ) + session.add(indicator_data) + session.commit() +``` + +## Performance Considerations + +### Memory Usage +- Process indicators in batches for large datasets +- Use appropriate period lengths to balance accuracy and performance +- Consider data retention policies for historical indicator values + +### Calculation Frequency +- Calculate indicators only when new complete candles are available +- Cache recent indicator values to avoid recalculation +- Use incremental updates for real-time scenarios + +### Optimization Tips +- Use `calculate_multiple_indicators()` for efficiency when computing multiple indicators +- Limit the number of historical candles to what's actually needed +- Consider using different timeframes for different indicators + +## Error Handling + +The module includes comprehensive error handling: + +- **Insufficient Data**: Returns empty results when not enough data is available +- **Invalid Configuration**: Validates configuration parameters before calculation +- **Data Quality Issues**: Handles NaN values and missing data gracefully +- **Type Errors**: Converts data types safely with fallback values + +## Testing + +The module includes comprehensive unit tests covering: + +- All indicator calculations with known expected values +- Sparse data handling scenarios +- Edge cases (insufficient data, invalid parameters) +- Configuration validation +- Multiple indicator batch processing + +Run tests with: +```bash +uv run pytest tests/test_indicators.py -v +``` + +## Future Enhancements + +Potential future additions to the indicators module: + +- **Additional Indicators**: Stochastic, Williams %R, Commodity Channel Index +- **Custom Indicators**: Framework for user-defined indicators +- **Performance Metrics**: Calculation timing and memory usage statistics +- **Streaming Updates**: Incremental indicator updates for real-time scenarios +- **Parallel Processing**: Multi-threaded calculation for large datasets + +## See Also + +- [Aggregation Strategy Documentation](aggregation-strategy.md) +- [Data Types Documentation](data-types.md) +- [Database Schema Documentation](database-schema.md) +- [API Reference](api-reference.md) \ No newline at end of file diff --git a/tasks/tasks-crypto-bot-prd.md b/tasks/tasks-crypto-bot-prd.md index 801a2a9..dbf37d9 100644 --- a/tasks/tasks-crypto-bot-prd.md +++ b/tasks/tasks-crypto-bot-prd.md @@ -15,6 +15,7 @@ - `data/__init__.py` - Data collection package initialization - `data/okx_collector.py` - OKX API integration for real-time market data collection - `data/aggregator.py` - OHLCV candle aggregation and processing +- `data/common/indicators.py` - Technical indicators module with SMA, EMA, RSI, MACD, and Bollinger Bands calculations optimized for sparse OHLCV data - `strategies/base_strategy.py` - Base strategy class and interface - `strategies/ema_crossover.py` - Example EMA crossover strategy implementation - `components/dashboard.py` - Dashboard UI components and layouts @@ -37,8 +38,10 @@ - `tests/test_base_collector.py` - Comprehensive unit tests for the BaseDataCollector abstract class (13 tests) - `tests/test_collector_manager.py` - Comprehensive unit tests for the CollectorManager with health monitoring (14 tests) - `tests/test_logging_enhanced.py` - Comprehensive unit tests for enhanced logging features (16 tests) +- `tests/test_indicators.py` - Comprehensive unit tests for technical indicators module (18 tests) - `docs/setup.md` - Comprehensive setup guide for new machines and environments - `docs/logging.md` - Complete documentation for the enhanced unified logging system +- `docs/components/technical-indicators.md` - Complete documentation for the technical indicators module with usage examples and integration guide ## Tasks @@ -62,7 +65,7 @@ - [x] 2.3 Build data validation and error handling for market data - [x] 2.4 Implement Redis channels for real-time data distribution - [x] 2.5 Create data storage layer for OHLCV data in PostgreSQL - - [ ] 2.6 Add technical indicators calculation (SMA, EMA, RSI, MACD, Bollinger Bands) + - [x] 2.6 Add technical indicators calculation (SMA, EMA, RSI, MACD, Bollinger Bands) - [ ] 2.7 Implement data recovery and reconnection logic for API failures - [ ] 2.8 Create data collection service with proper logging - [ ] 2.9 Unit test data collection and aggregation logic diff --git a/tests/test_indicators.py b/tests/test_indicators.py new file mode 100644 index 0000000..3d7772e --- /dev/null +++ b/tests/test_indicators.py @@ -0,0 +1,360 @@ +""" +Unit tests for technical indicators module. + +Tests verify that all technical indicators work correctly with sparse OHLCV data +and handle edge cases appropriately. +""" + +import pytest +from datetime import datetime, timezone, timedelta +from decimal import Decimal +import pandas as pd +import numpy as np + +from data.common.indicators import ( + TechnicalIndicators, + IndicatorResult, + create_default_indicators_config, + validate_indicator_config +) +from data.common.data_types import OHLCVCandle + + +class TestTechnicalIndicators: + """Test suite for TechnicalIndicators class.""" + + @pytest.fixture + def sample_candles(self): + """Create sample OHLCV candles for testing.""" + candles = [] + base_time = datetime(2024, 1, 1, 9, 0, 0, tzinfo=timezone.utc) + + # Create 30 candles with realistic price movement + prices = [100.0, 101.0, 102.5, 101.8, 103.0, 104.2, 103.8, 105.0, 104.5, 106.0, + 107.5, 108.0, 107.2, 109.0, 108.5, 110.0, 109.8, 111.0, 110.5, 112.0, + 111.8, 113.0, 112.5, 114.0, 113.2, 115.0, 114.8, 116.0, 115.5, 117.0] + + for i, price in enumerate(prices): + candle = OHLCVCandle( + symbol='BTC-USDT', + timeframe='1m', + start_time=base_time + timedelta(minutes=i), + end_time=base_time + timedelta(minutes=i+1), + open=Decimal(str(price - 0.2)), + high=Decimal(str(price + 0.5)), + low=Decimal(str(price - 0.5)), + close=Decimal(str(price)), + volume=Decimal('1000'), + trade_count=10, + exchange='test', + is_complete=True + ) + candles.append(candle) + + return candles + + @pytest.fixture + def sparse_candles(self): + """Create sparse OHLCV candles (with gaps) for testing.""" + candles = [] + base_time = datetime(2024, 1, 1, 9, 0, 0, tzinfo=timezone.utc) + + # Create candles with time gaps (sparse data) + gap_minutes = [0, 1, 3, 5, 8, 10, 15, 18, 22, 25] + prices = [100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0] + + for i, (gap, price) in enumerate(zip(gap_minutes, prices)): + candle = OHLCVCandle( + symbol='BTC-USDT', + timeframe='1m', + start_time=base_time + timedelta(minutes=gap), + end_time=base_time + timedelta(minutes=gap+1), + open=Decimal(str(price - 0.2)), + high=Decimal(str(price + 0.5)), + low=Decimal(str(price - 0.5)), + close=Decimal(str(price)), + volume=Decimal('1000'), + trade_count=10, + exchange='test', + is_complete=True + ) + candles.append(candle) + + return candles + + @pytest.fixture + def indicators(self): + """Create TechnicalIndicators instance.""" + return TechnicalIndicators() + + def test_initialization(self, indicators): + """Test TechnicalIndicators initialization.""" + assert indicators is not None + assert indicators.logger is None + + def test_prepare_dataframe(self, indicators, sample_candles): + """Test DataFrame preparation from OHLCV candles.""" + df = indicators.prepare_dataframe(sample_candles) + + assert not df.empty + assert len(df) == len(sample_candles) + assert list(df.columns) == ['symbol', 'timeframe', 'open', 'high', 'low', 'close', 'volume', 'trade_count'] + assert df.index.name == 'timestamp' + + # Check that timestamps are sorted + assert df.index.is_monotonic_increasing + + def test_prepare_dataframe_empty(self, indicators): + """Test DataFrame preparation with empty candles list.""" + df = indicators.prepare_dataframe([]) + assert df.empty + + def test_sma_calculation(self, indicators, sample_candles): + """Test Simple Moving Average calculation.""" + period = 5 + results = indicators.sma(sample_candles, period) + + # Should have results starting from period 5 + assert len(results) == len(sample_candles) - period + 1 + + # Check first result + first_result = results[0] + assert isinstance(first_result, IndicatorResult) + assert first_result.symbol == 'BTC-USDT' + assert first_result.timeframe == '1m' + assert 'sma' in first_result.values + assert first_result.metadata['period'] == period + + # Verify SMA calculation manually for first result + first_5_closes = [float(candle.close) for candle in sample_candles[:5]] + expected_sma = sum(first_5_closes) / len(first_5_closes) + assert abs(first_result.values['sma'] - expected_sma) < 0.001 + + def test_sma_insufficient_data(self, indicators, sample_candles): + """Test SMA with insufficient data.""" + period = 50 # More than available candles + results = indicators.sma(sample_candles, period) + assert len(results) == 0 + + def test_ema_calculation(self, indicators, sample_candles): + """Test Exponential Moving Average calculation.""" + period = 10 + results = indicators.ema(sample_candles, period) + + # Should have results starting from period 10 + assert len(results) == len(sample_candles) - period + 1 + + # Check first result + first_result = results[0] + assert isinstance(first_result, IndicatorResult) + assert 'ema' in first_result.values + assert first_result.metadata['period'] == period + + # EMA should be between the range of input prices + min_price = min(float(c.close) for c in sample_candles[:period]) + max_price = max(float(c.close) for c in sample_candles[:period]) + assert min_price <= first_result.values['ema'] <= max_price + + def test_rsi_calculation(self, indicators, sample_candles): + """Test Relative Strength Index calculation.""" + period = 14 + results = indicators.rsi(sample_candles, period) + + # Should have results starting from period 15 (period + 1 for price change calculation) + assert len(results) == len(sample_candles) - period + + # Check first result + first_result = results[0] + assert isinstance(first_result, IndicatorResult) + assert 'rsi' in first_result.values + assert 0 <= first_result.values['rsi'] <= 100 # RSI should be between 0 and 100 + assert first_result.metadata['period'] == period + + def test_macd_calculation(self, indicators, sample_candles): + """Test MACD calculation.""" + fast_period = 12 + slow_period = 26 + signal_period = 9 + results = indicators.macd(sample_candles, fast_period, slow_period, signal_period) + + # MACD needs slow_period + signal_period data points + expected_count = len(sample_candles) - slow_period - signal_period + 1 + assert len(results) == max(0, expected_count) + + if results: # Only test if we have results + first_result = results[0] + assert isinstance(first_result, IndicatorResult) + assert 'macd' in first_result.values + assert 'signal' in first_result.values + assert 'histogram' in first_result.values + + # Histogram should equal MACD - Signal + expected_histogram = first_result.values['macd'] - first_result.values['signal'] + assert abs(first_result.values['histogram'] - expected_histogram) < 0.001 + + def test_bollinger_bands_calculation(self, indicators, sample_candles): + """Test Bollinger Bands calculation.""" + period = 20 + std_dev = 2.0 + results = indicators.bollinger_bands(sample_candles, period, std_dev) + + # Should have results starting from period 20 + assert len(results) == len(sample_candles) - period + 1 + + # Check first result + first_result = results[0] + assert isinstance(first_result, IndicatorResult) + assert 'upper_band' in first_result.values + assert 'middle_band' in first_result.values + assert 'lower_band' in first_result.values + assert 'bandwidth' in first_result.values + assert 'percent_b' in first_result.values + + # Upper band should be greater than middle band, which should be greater than lower band + assert first_result.values['upper_band'] > first_result.values['middle_band'] + assert first_result.values['middle_band'] > first_result.values['lower_band'] + + def test_sparse_data_handling(self, indicators, sparse_candles): + """Test indicators with sparse data (time gaps).""" + period = 5 + sma_results = indicators.sma(sparse_candles, period) + + # Should handle sparse data without issues + assert len(sma_results) > 0 + + # Check that timestamps are preserved correctly + for result in sma_results: + assert result.timestamp is not None + assert isinstance(result.timestamp, datetime) + + def test_calculate_multiple_indicators(self, indicators, sample_candles): + """Test calculating multiple indicators at once.""" + config = { + 'sma_10': {'type': 'sma', 'period': 10}, + 'ema_12': {'type': 'ema', 'period': 12}, + 'rsi_14': {'type': 'rsi', 'period': 14}, + 'macd': {'type': 'macd'}, + 'bb_20': {'type': 'bollinger_bands', 'period': 20} + } + + results = indicators.calculate_multiple_indicators(sample_candles, config) + + assert len(results) == len(config) + assert 'sma_10' in results + assert 'ema_12' in results + assert 'rsi_14' in results + assert 'macd' in results + assert 'bb_20' in results + + # Check that each indicator has appropriate results + assert len(results['sma_10']) > 0 + assert len(results['ema_12']) > 0 + + def test_invalid_indicator_config(self, indicators, sample_candles): + """Test handling of invalid indicator configuration.""" + config = { + 'invalid_indicator': {'type': 'unknown_type', 'period': 10} + } + + results = indicators.calculate_multiple_indicators(sample_candles, config) + + assert 'invalid_indicator' in results + assert len(results['invalid_indicator']) == 0 # Should return empty list + + def test_different_price_columns(self, indicators, sample_candles): + """Test indicators with different price columns.""" + # Test SMA with 'high' price column + sma_high = indicators.sma(sample_candles, 5, price_column='high') + sma_close = indicators.sma(sample_candles, 5, price_column='close') + + assert len(sma_high) == len(sma_close) + # High prices should generally give higher SMA values + assert sma_high[0].values['sma'] >= sma_close[0].values['sma'] + + +class TestIndicatorHelperFunctions: + """Test helper functions for indicators.""" + + def test_create_default_indicators_config(self): + """Test default indicators configuration creation.""" + config = create_default_indicators_config() + + assert isinstance(config, dict) + assert 'sma_20' in config + assert 'ema_12' in config + assert 'rsi_14' in config + assert 'macd_default' in config + assert 'bollinger_bands_20' in config + + # Check structure of configurations + assert config['sma_20']['type'] == 'sma' + assert config['sma_20']['period'] == 20 + assert config['macd_default']['type'] == 'macd' + + def test_validate_indicator_config_valid(self): + """Test validation of valid indicator configurations.""" + valid_configs = [ + {'type': 'sma', 'period': 20}, + {'type': 'ema', 'period': 12}, + {'type': 'rsi', 'period': 14}, + {'type': 'macd'}, + {'type': 'bollinger_bands', 'period': 20, 'std_dev': 2.0} + ] + + for config in valid_configs: + assert validate_indicator_config(config) == True + + def test_validate_indicator_config_invalid(self): + """Test validation of invalid indicator configurations.""" + invalid_configs = [ + {}, # Missing type + {'type': 'unknown'}, # Invalid type + {'type': 'sma', 'period': -5}, # Invalid period + {'type': 'sma', 'period': 'not_a_number'}, # Invalid period type + {'type': 'bollinger_bands', 'std_dev': -1.0}, # Invalid std_dev + ] + + for config in invalid_configs: + assert validate_indicator_config(config) == False + + +class TestIndicatorResultDataClass: + """Test IndicatorResult dataclass.""" + + def test_indicator_result_creation(self): + """Test IndicatorResult creation and attributes.""" + timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + values = {'sma': 100.5, 'ema': 101.2} + metadata = {'period': 20} + + result = IndicatorResult( + timestamp=timestamp, + symbol='BTC-USDT', + timeframe='1m', + values=values, + metadata=metadata + ) + + assert result.timestamp == timestamp + assert result.symbol == 'BTC-USDT' + assert result.timeframe == '1m' + assert result.values == values + assert result.metadata == metadata + + def test_indicator_result_without_metadata(self): + """Test IndicatorResult creation without metadata.""" + timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + values = {'rsi': 65.5} + + result = IndicatorResult( + timestamp=timestamp, + symbol='ETH-USDT', + timeframe='5m', + values=values + ) + + assert result.metadata is None + + +if __name__ == '__main__': + pytest.main([__file__]) \ No newline at end of file