""" Market Data Integration for Chart Layers This module provides seamless integration between database market data and indicator layer calculations, handling data format conversions, validation, and optimization for real-time chart updates. """ import pandas as pd from datetime import datetime, timezone, timedelta from typing import List, Dict, Any, Optional, Union, Tuple from decimal import Decimal from dataclasses import dataclass from database.operations import get_database_operations, DatabaseOperationError from data.common.data_types import OHLCVCandle from data.common.indicators import TechnicalIndicators, IndicatorResult from components.charts.config.indicator_defs import convert_database_candles_to_ohlcv from utils.logger import get_logger # Initialize logger logger = get_logger() @dataclass class DataIntegrationConfig: """Configuration for market data integration""" default_days_back: int = 7 min_candles_required: int = 50 max_candles_limit: int = 1000 cache_timeout_minutes: int = 5 enable_data_validation: bool = True enable_sparse_data_handling: bool = True class MarketDataIntegrator: """ Integrates market data from database with indicator calculations. This class handles: - Fetching market data from database - Converting to indicator-compatible formats - Caching for performance - Data validation and error handling - Sparse data handling (gaps in time series) """ def __init__(self, config: DataIntegrationConfig = None): """ Initialize market data integrator. Args: config: Integration configuration """ self.config = config or DataIntegrationConfig() self.logger = logger self.db_ops = get_database_operations(self.logger) self.indicators = TechnicalIndicators() # Simple in-memory cache for recent data self._cache: Dict[str, Dict[str, Any]] = {} def get_market_data_for_indicators( self, symbol: str, timeframe: str, days_back: Optional[int] = None, exchange: str = "okx" ) -> Tuple[List[Dict[str, Any]], List[OHLCVCandle]]: """ Fetch and prepare market data for indicator calculations. Args: symbol: Trading pair (e.g., 'BTC-USDT') timeframe: Timeframe (e.g., '1h', '1d') days_back: Number of days to look back exchange: Exchange name Returns: Tuple of (raw_candles, ohlcv_candles) for different use cases """ try: # Use default or provided days_back days_back = days_back or self.config.default_days_back # Check cache first cache_key = f"{symbol}_{timeframe}_{days_back}_{exchange}" cached_data = self._get_cached_data(cache_key) if cached_data: self.logger.debug(f"Data Integration: Using cached data for {cache_key}") return cached_data['raw_candles'], cached_data['ohlcv_candles'] # Fetch from database end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=days_back) raw_candles = self.db_ops.market_data.get_candles( symbol=symbol, timeframe=timeframe, start_time=start_time, end_time=end_time, exchange=exchange ) if not raw_candles: self.logger.warning(f"Data Integration: No market data found for {symbol} {timeframe}") return [], [] # Validate data if enabled if self.config.enable_data_validation: raw_candles = self._validate_and_clean_data(raw_candles) # Handle sparse data if enabled if self.config.enable_sparse_data_handling: raw_candles = self._handle_sparse_data(raw_candles, timeframe) # Convert to OHLCV format for indicators ohlcv_candles = convert_database_candles_to_ohlcv(raw_candles) # Cache the results self._cache_data(cache_key, { 'raw_candles': raw_candles, 'ohlcv_candles': ohlcv_candles, 'timestamp': datetime.now(timezone.utc) }) self.logger.debug(f"Data Integration: Fetched {len(raw_candles)} candles for {symbol} {timeframe}") return raw_candles, ohlcv_candles except DatabaseOperationError as e: self.logger.error(f"Data Integration: Database error fetching market data: {e}") return [], [] except Exception as e: self.logger.error(f"Data Integration: Unexpected error fetching market data: {e}") return [], [] def calculate_indicators_for_symbol( self, symbol: str, timeframe: str, indicator_configs: List[Dict[str, Any]], days_back: Optional[int] = None, exchange: str = "okx" ) -> Dict[str, List[IndicatorResult]]: """ Calculate multiple indicators for a symbol. Args: symbol: Trading pair timeframe: Timeframe indicator_configs: List of indicator configurations days_back: Number of days to look back exchange: Exchange name Returns: Dictionary mapping indicator names to their results """ try: # Get market data raw_candles, ohlcv_candles = self.get_market_data_for_indicators( symbol, timeframe, days_back, exchange ) if not ohlcv_candles: self.logger.warning(f"Data Integration: No data available for indicator calculations: {symbol} {timeframe}") return {} # Check minimum data requirements if len(ohlcv_candles) < self.config.min_candles_required: self.logger.warning( f"Insufficient data for reliable indicators: {len(ohlcv_candles)} < {self.config.min_candles_required}" ) # Calculate indicators results = {} for config in indicator_configs: indicator_name = config.get('name', 'unknown') indicator_type = config.get('type', 'unknown') parameters = config.get('parameters', {}) try: indicator_results = self._calculate_single_indicator( indicator_type, ohlcv_candles, parameters ) if indicator_results: results[indicator_name] = indicator_results self.logger.debug(f"Calculated {indicator_name}: {len(indicator_results)} points") else: self.logger.warning(f"Data Integration: No results for indicator {indicator_name}") except Exception as e: self.logger.error(f"Data Integration: Error calculating indicator {indicator_name}: {e}") continue return results except Exception as e: self.logger.error(f"Data Integration: Error calculating indicators for {symbol}: {e}") return {} def get_latest_market_data( self, symbol: str, timeframe: str, limit: int = 100, exchange: str = "okx" ) -> Tuple[List[Dict[str, Any]], List[OHLCVCandle]]: """ Get the most recent market data for real-time updates. Args: symbol: Trading pair timeframe: Timeframe limit: Maximum number of candles to fetch exchange: Exchange name Returns: Tuple of (raw_candles, ohlcv_candles) """ try: # Calculate time range based on limit and timeframe end_time = datetime.now(timezone.utc) # Estimate time range based on timeframe timeframe_minutes = self._parse_timeframe_to_minutes(timeframe) start_time = end_time - timedelta(minutes=timeframe_minutes * limit * 2) # Buffer for sparse data raw_candles = self.db_ops.market_data.get_candles( symbol=symbol, timeframe=timeframe, start_time=start_time, end_time=end_time, exchange=exchange ) # Limit to most recent candles if len(raw_candles) > limit: raw_candles = raw_candles[-limit:] # Convert to OHLCV format ohlcv_candles = convert_database_candles_to_ohlcv(raw_candles) self.logger.debug(f"Fetched latest {len(raw_candles)} candles for {symbol} {timeframe}") return raw_candles, ohlcv_candles except Exception as e: self.logger.error(f"Data Integration: Error fetching latest market data: {e}") return [], [] def check_data_availability( self, symbol: str, timeframe: str, exchange: str = "okx" ) -> Dict[str, Any]: """ Check data availability and quality for a symbol/timeframe. Args: symbol: Trading pair timeframe: Timeframe exchange: Exchange name Returns: Dictionary with availability information """ try: # Get latest candle latest_candle = self.db_ops.market_data.get_latest_candle(symbol, timeframe, exchange) if not latest_candle: return { 'available': False, 'latest_timestamp': None, 'data_age_minutes': None, 'sufficient_for_indicators': False, 'message': f"No data available for {symbol} {timeframe}" } # Calculate data age latest_time = latest_candle['timestamp'] if latest_time.tzinfo is None: latest_time = latest_time.replace(tzinfo=timezone.utc) data_age = datetime.now(timezone.utc) - latest_time data_age_minutes = data_age.total_seconds() / 60 # Check if we have sufficient data for indicators end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(days=1) # Check last day recent_candles = self.db_ops.market_data.get_candles( symbol=symbol, timeframe=timeframe, start_time=start_time, end_time=end_time, exchange=exchange ) sufficient_data = len(recent_candles) >= self.config.min_candles_required return { 'available': True, 'latest_timestamp': latest_time, 'data_age_minutes': data_age_minutes, 'recent_candle_count': len(recent_candles), 'sufficient_for_indicators': sufficient_data, 'is_recent': data_age_minutes < 60, # Less than 1 hour old 'message': f"Latest: {latest_time.strftime('%Y-%m-%d %H:%M:%S UTC')}, {len(recent_candles)} recent candles" } except Exception as e: self.logger.error(f"Data Integration: Error checking data availability: {e}") return { 'available': False, 'latest_timestamp': None, 'data_age_minutes': None, 'sufficient_for_indicators': False, 'message': f"Error checking data: {str(e)}" } def _calculate_single_indicator( self, indicator_type: str, candles: List[OHLCVCandle], parameters: Dict[str, Any] ) -> List[IndicatorResult]: """Calculate a single indicator with given parameters.""" try: if indicator_type == 'sma': period = parameters.get('period', 20) return self.indicators.sma(candles, period) elif indicator_type == 'ema': period = parameters.get('period', 20) return self.indicators.ema(candles, period) elif indicator_type == 'rsi': period = parameters.get('period', 14) return self.indicators.rsi(candles, period) elif indicator_type == 'macd': fast = parameters.get('fast_period', 12) slow = parameters.get('slow_period', 26) signal = parameters.get('signal_period', 9) return self.indicators.macd(candles, fast, slow, signal) elif indicator_type == 'bollinger_bands': period = parameters.get('period', 20) std_dev = parameters.get('std_dev', 2) return self.indicators.bollinger_bands(candles, period, std_dev) else: self.logger.warning(f"Unknown indicator type: {indicator_type}") return [] except Exception as e: self.logger.error(f"Data Integration: Error calculating {indicator_type}: {e}") return [] def _validate_and_clean_data(self, candles: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Validate and clean market data.""" cleaned_candles = [] for i, candle in enumerate(candles): try: # Check required fields required_fields = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] if not all(field in candle for field in required_fields): self.logger.warning(f"Data Integration: Missing fields in candle {i}") continue # Validate OHLC relationships o, h, l, c = float(candle['open']), float(candle['high']), float(candle['low']), float(candle['close']) if not (h >= max(o, c) and l <= min(o, c)): self.logger.warning(f"Data Integration: Invalid OHLC relationship in candle {i}") continue # Validate positive values if any(val <= 0 for val in [o, h, l, c]): self.logger.warning(f"Data Integration: Non-positive price in candle {i}") continue cleaned_candles.append(candle) except (ValueError, TypeError) as e: self.logger.warning(f"Data Integration: Error validating candle {i}: {e}") continue removed_count = len(candles) - len(cleaned_candles) if removed_count > 0: self.logger.info(f"Data Integration: Removed {removed_count} invalid candles during validation") return cleaned_candles def _handle_sparse_data(self, candles: List[Dict[str, Any]], timeframe: str) -> List[Dict[str, Any]]: """Handle sparse data by detecting and logging gaps.""" if len(candles) < 2: return candles # Calculate expected interval timeframe_minutes = self._parse_timeframe_to_minutes(timeframe) expected_interval = timedelta(minutes=timeframe_minutes) gaps_detected = 0 for i in range(1, len(candles)): prev_time = candles[i-1]['timestamp'] curr_time = candles[i]['timestamp'] if isinstance(prev_time, str): prev_time = datetime.fromisoformat(prev_time.replace('Z', '+00:00')) if isinstance(curr_time, str): curr_time = datetime.fromisoformat(curr_time.replace('Z', '+00:00')) actual_interval = curr_time - prev_time if actual_interval > expected_interval * 1.5: # Allow 50% tolerance gaps_detected += 1 return candles def _parse_timeframe_to_minutes(self, timeframe: str) -> int: """Parse timeframe string to minutes.""" timeframe_map = { '1s': 1/60, '5s': 5/60, '10s': 10/60, '15s': 15/60, '30s': 30/60, '1m': 1, '5m': 5, '15m': 15, '30m': 30, '1h': 60, '2h': 120, '4h': 240, '6h': 360, '12h': 720, '1d': 1440, '3d': 4320, '1w': 10080 } return timeframe_map.get(timeframe, 60) # Default to 1 hour def _get_cached_data(self, cache_key: str) -> Optional[Dict[str, Any]]: """Get data from cache if still valid.""" if cache_key not in self._cache: return None cached_item = self._cache[cache_key] cache_age = datetime.now(timezone.utc) - cached_item['timestamp'] if cache_age.total_seconds() > self.config.cache_timeout_minutes * 60: del self._cache[cache_key] return None return cached_item def _cache_data(self, cache_key: str, data: Dict[str, Any]) -> None: """Cache data with timestamp.""" # Simple cache size management if len(self._cache) > 50: # Limit cache size # Remove oldest entries oldest_key = min(self._cache.keys(), key=lambda k: self._cache[k]['timestamp']) del self._cache[oldest_key] self._cache[cache_key] = data def clear_cache(self) -> None: """Clear the data cache.""" self._cache.clear() self.logger.debug("Data Integration: Data cache cleared") def get_indicator_data( self, main_df: pd.DataFrame, main_timeframe: str, indicator_configs: List['IndicatorLayerConfig'], indicator_manager: 'IndicatorManager', symbol: str, exchange: str = "okx" ) -> Dict[str, pd.DataFrame]: """ Get indicator data for chart display. Returns a dict mapping indicator IDs to DataFrames. """ indicator_data_map = {} if main_df.empty: return indicator_data_map for config in indicator_configs: indicator_id = config.id indicator = indicator_manager.load_indicator(indicator_id) if not indicator: logger.warning(f"Data Integrator: Could not load indicator with ID: {indicator_id}") continue try: # Determine the timeframe and data to use target_timeframe = indicator.timeframe if target_timeframe and target_timeframe != main_timeframe: # Custom timeframe: fetch new data days_back = (main_df.index.max() - main_df.index.min()).days + 2 # Add buffer raw_candles, _ = self.get_market_data_for_indicators( symbol=symbol, timeframe=target_timeframe, days_back=days_back, exchange=exchange ) if not raw_candles: self.logger.warning(f"No data for indicator '{indicator.name}' on timeframe {target_timeframe}") continue from components.charts.utils import prepare_chart_data indicator_df = prepare_chart_data(raw_candles) else: # Use main chart's dataframe indicator_df = main_df # Calculate the indicator (now returns DataFrame) result_df = self.indicators.calculate( indicator.type, indicator_df, **indicator.parameters ) if result_df is not None and not result_df.empty: # Ensure timezone consistency before reindexing if result_df.index.tz is None: result_df = result_df.tz_localize('UTC') result_df = result_df.tz_convert(main_df.index.tz) # Align data to main_df's index to handle different timeframes if not result_df.index.equals(main_df.index): aligned_df = result_df.reindex(main_df.index, method='ffill') indicator_data_map[indicator.id] = aligned_df else: indicator_data_map[indicator.id] = result_df else: self.logger.warning(f"No data returned for indicator '{indicator.name}'") except Exception as e: self.logger.error(f"Error calculating indicator '{indicator.name}': {e}", exc_info=True) return indicator_data_map # Convenience functions for common operations def get_market_data_integrator(config: DataIntegrationConfig = None) -> MarketDataIntegrator: """Get a configured market data integrator instance.""" return MarketDataIntegrator(config) def fetch_indicator_data( symbol: str, timeframe: str, indicator_configs: List[Dict[str, Any]], days_back: int = 7, exchange: str = "okx" ) -> Dict[str, List[IndicatorResult]]: """ Convenience function to fetch and calculate indicators. Args: symbol: Trading pair timeframe: Timeframe indicator_configs: List of indicator configurations days_back: Number of days to look back exchange: Exchange name Returns: Dictionary mapping indicator names to results """ integrator = get_market_data_integrator() return integrator.calculate_indicators_for_symbol( symbol, timeframe, indicator_configs, days_back, exchange ) def check_symbol_data_quality( symbol: str, timeframe: str, exchange: str = "okx" ) -> Dict[str, Any]: """ Convenience function to check data quality for a symbol. Args: symbol: Trading pair timeframe: Timeframe exchange: Exchange name Returns: Data quality information """ integrator = get_market_data_integrator() return integrator.check_data_availability(symbol, timeframe, exchange)