diff --git a/data/base_collector.py b/data/base_collector.py index 1cf26fb..6263642 100644 --- a/data/base_collector.py +++ b/data/base_collector.py @@ -117,7 +117,9 @@ class BaseDataCollector(ABC): data_types: Optional[List[DataType]] = None, component_name: Optional[str] = None, auto_restart: bool = True, - health_check_interval: float = 30.0): + health_check_interval: float = 30.0, + logger = None, + log_errors_only: bool = False): """ Initialize the base data collector. @@ -128,16 +130,21 @@ class BaseDataCollector(ABC): component_name: Name for logging (default: based on exchange_name) auto_restart: Enable automatic restart on failures (default: True) health_check_interval: Seconds between health checks (default: 30.0) + logger: Logger instance. If None, no logging will be performed. + log_errors_only: If True and logger is provided, only log error-level messages """ self.exchange_name = exchange_name.lower() self.symbols = set(symbols) self.data_types = data_types or [DataType.CANDLE] self.auto_restart = auto_restart self.health_check_interval = health_check_interval + self.log_errors_only = log_errors_only - # Initialize logger - component = component_name or f"{self.exchange_name}_collector" - self.logger = get_logger(component, verbose=True) + # Initialize logger based on parameters + if logger is not None: + self.logger = logger + else: + self.logger = None # Collector state self.status = CollectorStatus.STOPPED @@ -174,7 +181,39 @@ class BaseDataCollector(ABC): 'last_restart_time': None } - self.logger.info(f"Initialized {self.exchange_name} data collector for symbols: {', '.join(symbols)}") + # Log initialization if logger is available + if self.logger: + component = component_name or f"{self.exchange_name}_collector" + self.component_name = component + if not self.log_errors_only: + self.logger.info(f"{self.component_name}: Initialized {self.exchange_name} data collector for symbols: {', '.join(symbols)}") + else: + self.component_name = component_name or f"{self.exchange_name}_collector" + + def _log_debug(self, message: str) -> None: + """Log debug message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.debug(message) + + def _log_info(self, message: str) -> None: + """Log info message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.info(message) + + def _log_warning(self, message: str) -> None: + """Log warning message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.warning(message) + + def _log_error(self, message: str, exc_info: bool = False) -> None: + """Log error message if logger is available (always logs errors regardless of log_errors_only).""" + if self.logger: + self.logger.error(message, exc_info=exc_info) + + def _log_critical(self, message: str, exc_info: bool = False) -> None: + """Log critical message if logger is available (always logs critical regardless of log_errors_only).""" + if self.logger: + self.logger.critical(message, exc_info=exc_info) @abstractmethod async def connect(self) -> bool: @@ -239,186 +278,189 @@ class BaseDataCollector(ABC): Returns: True if started successfully, False otherwise """ + # Check if already running or starting if self.status in [CollectorStatus.RUNNING, CollectorStatus.STARTING]: - self.logger.warning("Data collector is already running or starting") + self._log_warning("Data collector is already running or starting") return True - self.logger.info(f"Starting {self.exchange_name} data collector") + self._log_info(f"Starting {self.exchange_name} data collector") self.status = CollectorStatus.STARTING self._should_be_running = True try: # Connect to data source if not await self.connect(): + self._log_error("Failed to connect to data source") self.status = CollectorStatus.ERROR - self.logger.error("Failed to connect to data source") return False # Subscribe to data streams if not await self.subscribe_to_data(list(self.symbols), self.data_types): + self._log_error("Failed to subscribe to data streams") self.status = CollectorStatus.ERROR - self.logger.error("Failed to subscribe to data streams") await self.disconnect() return False - # Start message processing + # Start background tasks self._running = True self.status = CollectorStatus.RUNNING - self._stats['connection_uptime'] = datetime.now(timezone.utc) - self._last_heartbeat = datetime.now(timezone.utc) - # Create background task for message processing + # Start message processing task message_task = asyncio.create_task(self._message_loop()) self._tasks.add(message_task) message_task.add_done_callback(self._tasks.discard) - # Start health monitoring - if self.auto_restart: + # Start health monitoring task + if self.health_check_interval > 0: health_task = asyncio.create_task(self._health_monitor()) self._tasks.add(health_task) health_task.add_done_callback(self._tasks.discard) - self.logger.info(f"{self.exchange_name} data collector started successfully") + self._log_info(f"{self.exchange_name} data collector started successfully") return True except Exception as e: + self._log_error(f"Failed to start data collector: {e}") self.status = CollectorStatus.ERROR - self._stats['last_error'] = str(e) - self.logger.error(f"Failed to start data collector: {e}") - await self.disconnect() + self._should_be_running = False return False async def stop(self, force: bool = False) -> None: """ - Stop the data collector. + Stop the data collector and cleanup resources. Args: - force: If True, don't restart automatically even if auto_restart is enabled + force: Force stop even if not graceful """ if self.status == CollectorStatus.STOPPED: - self.logger.warning("Data collector is already stopped") + self._log_warning("Data collector is already stopped") return - self.logger.info(f"Stopping {self.exchange_name} data collector") + self._log_info(f"Stopping {self.exchange_name} data collector") self.status = CollectorStatus.STOPPING - self._running = False - - if force: - self._should_be_running = False + self._should_be_running = False try: + # Stop background tasks + self._running = False + # Cancel all tasks for task in list(self._tasks): - task.cancel() + if not task.done(): + task.cancel() + if not force: + try: + await task + except asyncio.CancelledError: + pass - # Wait for tasks to complete - if self._tasks: - await asyncio.gather(*self._tasks, return_exceptions=True) + self._tasks.clear() # Unsubscribe and disconnect await self.unsubscribe_from_data(list(self.symbols), self.data_types) await self.disconnect() self.status = CollectorStatus.STOPPED - self.logger.info(f"{self.exchange_name} data collector stopped") + self._log_info(f"{self.exchange_name} data collector stopped") except Exception as e: + self._log_error(f"Error stopping data collector: {e}") self.status = CollectorStatus.ERROR - self._stats['last_error'] = str(e) - self.logger.error(f"Error stopping data collector: {e}") async def restart(self) -> bool: """ Restart the data collector. Returns: - True if restart successful, False otherwise + True if restarted successfully, False otherwise """ - self.logger.info(f"Restarting {self.exchange_name} data collector") + self._log_info(f"Restarting {self.exchange_name} data collector") self._stats['restarts'] += 1 self._stats['last_restart_time'] = datetime.now(timezone.utc) - # Stop without disabling auto-restart - await self.stop(force=False) + # Stop first + await self.stop() - # Wait a bit before restart - await asyncio.sleep(2.0) - - # Reset reconnection attempts - self._reconnect_attempts = 0 + # Wait a bit before restarting + await asyncio.sleep(self._reconnect_delay) # Start again return await self.start() async def _message_loop(self) -> None: """Main message processing loop.""" - self.logger.debug("Starting message processing loop") - - while self._running: - try: - # This should be implemented by subclasses to handle their specific message loop - await self._handle_messages() - - # Update heartbeat - self._last_heartbeat = datetime.now(timezone.utc) - - except asyncio.CancelledError: - self.logger.debug("Message loop cancelled") - break - except Exception as e: - self._stats['errors'] += 1 - self._stats['last_error'] = str(e) - self.logger.error(f"Error in message loop: {e}") - - # Attempt reconnection if connection lost - if not await self._handle_connection_error(): + try: + self._log_debug("Starting message processing loop") + + while self._running: + try: + await self._handle_messages() + except asyncio.CancelledError: break - - await asyncio.sleep(1) # Brief pause before retrying + except Exception as e: + self._stats['errors'] += 1 + self._stats['last_error'] = str(e) + self._log_error(f"Error processing messages: {e}") + + # Small delay to prevent tight error loops + await asyncio.sleep(0.1) + + except asyncio.CancelledError: + self._log_debug("Message loop cancelled") + raise + except Exception as e: + self._log_error(f"Error in message loop: {e}") + self.status = CollectorStatus.ERROR async def _health_monitor(self) -> None: """Monitor collector health and restart if needed.""" - self.logger.debug("Starting health monitor") - - while self._running and self.auto_restart: - try: - await asyncio.sleep(self.health_check_interval) - - # Check if we should be running but aren't - if self._should_be_running and not self._running: - self.logger.warning("Collector should be running but isn't - restarting") - await self.restart() - continue - - # Check heartbeat freshness - time_since_heartbeat = datetime.now(timezone.utc) - self._last_heartbeat - if time_since_heartbeat > timedelta(seconds=self.health_check_interval * 2): - self.logger.warning(f"No heartbeat for {time_since_heartbeat.total_seconds():.1f}s - restarting") - self.status = CollectorStatus.UNHEALTHY - await self.restart() - continue - - # Check data freshness (if we've received data before) - if self._last_data_received: - time_since_data = datetime.now(timezone.utc) - self._last_data_received - if time_since_data > self._max_silence_duration: - self.logger.warning(f"No data received for {time_since_data.total_seconds():.1f}s - restarting") - self.status = CollectorStatus.UNHEALTHY - await self.restart() + try: + self._log_debug("Starting health monitor") + + while self._running: + try: + await asyncio.sleep(self.health_check_interval) + + current_time = datetime.now(timezone.utc) + + # Check if collector should be running but isn't + if self._should_be_running and self.status != CollectorStatus.RUNNING: + self._log_warning("Collector should be running but isn't - restarting") + if self.auto_restart: + asyncio.create_task(self.restart()) continue - - # Check if status indicates failure - if self.status in [CollectorStatus.ERROR, CollectorStatus.UNHEALTHY]: - self.logger.warning(f"Collector in {self.status.value} status - restarting") - await self.restart() - continue - - except asyncio.CancelledError: - self.logger.debug("Health monitor cancelled") - break - except Exception as e: - self.logger.error(f"Error in health monitor: {e}") - await asyncio.sleep(self.health_check_interval) + + # Check heartbeat + time_since_heartbeat = current_time - self._last_heartbeat + if time_since_heartbeat > timedelta(seconds=self.health_check_interval * 2): + self._log_warning(f"No heartbeat for {time_since_heartbeat.total_seconds():.1f}s - restarting") + if self.auto_restart: + asyncio.create_task(self.restart()) + continue + + # Check data reception + if self._last_data_received: + time_since_data = current_time - self._last_data_received + if time_since_data > self._max_silence_duration: + self._log_warning(f"No data received for {time_since_data.total_seconds():.1f}s - restarting") + if self.auto_restart: + asyncio.create_task(self.restart()) + continue + + # Check for error status + if self.status == CollectorStatus.ERROR: + self._log_warning(f"Collector in {self.status.value} status - restarting") + if self.auto_restart: + asyncio.create_task(self.restart()) + + except asyncio.CancelledError: + break + + except asyncio.CancelledError: + self._log_debug("Health monitor cancelled") + raise + except Exception as e: + self._log_error(f"Error in health monitor: {e}") @abstractmethod async def _handle_messages(self) -> None: @@ -435,78 +477,84 @@ class BaseDataCollector(ABC): Returns: True if reconnection successful, False if max attempts exceeded """ - if self._reconnect_attempts >= self._max_reconnect_attempts: - self.logger.error(f"Max reconnection attempts ({self._max_reconnect_attempts}) exceeded") + self._reconnect_attempts += 1 + + if self._reconnect_attempts > self._max_reconnect_attempts: + self._log_error(f"Max reconnection attempts ({self._max_reconnect_attempts}) exceeded") self.status = CollectorStatus.ERROR + self._should_be_running = False return False - self._reconnect_attempts += 1 self.status = CollectorStatus.RECONNECTING + self._log_warning(f"Connection lost. Attempting reconnection {self._reconnect_attempts}/{self._max_reconnect_attempts}") - self.logger.warning(f"Connection lost. Attempting reconnection {self._reconnect_attempts}/{self._max_reconnect_attempts}") - + # Disconnect and wait before retrying + await self.disconnect() await asyncio.sleep(self._reconnect_delay) + # Attempt to reconnect try: if await self.connect(): if await self.subscribe_to_data(list(self.symbols), self.data_types): + self._log_info("Reconnection successful") self.status = CollectorStatus.RUNNING self._reconnect_attempts = 0 - self._stats['connection_uptime'] = datetime.now(timezone.utc) - self.logger.info("Reconnection successful") return True - - return False - + except Exception as e: - self._stats['last_error'] = str(e) - self.logger.error(f"Reconnection attempt failed: {e}") - return False + self._log_error(f"Reconnection attempt failed: {e}") + + return False def add_data_callback(self, data_type: DataType, callback: Callable[[MarketDataPoint], None]) -> None: """ - Add a callback function to be called when data of specified type is received. + Add a callback function for specific data type. Args: - data_type: Type of data to register callback for - callback: Function to call with MarketDataPoint data + data_type: Type of data to monitor + callback: Function to call when data is received """ - self._data_callbacks[data_type].append(callback) - self.logger.debug(f"Added callback for {data_type.value} data") + if callback not in self._data_callbacks[data_type]: + self._data_callbacks[data_type].append(callback) + self._log_debug(f"Added callback for {data_type.value} data") def remove_data_callback(self, data_type: DataType, callback: Callable[[MarketDataPoint], None]) -> None: """ - Remove a data callback. + Remove a callback function for specific data type. Args: - data_type: Type of data to remove callback for - callback: Callback function to remove + data_type: Type of data to stop monitoring + callback: Function to remove """ if callback in self._data_callbacks[data_type]: self._data_callbacks[data_type].remove(callback) - self.logger.debug(f"Removed callback for {data_type.value} data") + self._log_debug(f"Removed callback for {data_type.value} data") async def _notify_callbacks(self, data_point: MarketDataPoint) -> None: """ - Notify all registered callbacks for the data type. + Notify all registered callbacks for a data point. Args: - data_point: Market data to send to callbacks + data_point: Market data to distribute """ - # Update data received timestamp - self._last_data_received = datetime.now(timezone.utc) - self._stats['last_message_time'] = self._last_data_received - callbacks = self._data_callbacks.get(data_point.data_type, []) for callback in callbacks: try: + # Handle both sync and async callbacks if asyncio.iscoroutinefunction(callback): await callback(data_point) else: callback(data_point) + except Exception as e: - self.logger.error(f"Error in data callback: {e}") + self._log_error(f"Error in data callback: {e}") + + # Update statistics + self._stats['messages_processed'] += 1 + self._stats['last_message_time'] = data_point.timestamp + self._last_data_received = datetime.now(timezone.utc) + self._last_heartbeat = datetime.now(timezone.utc) def get_status(self) -> Dict[str, Any]: """ @@ -601,7 +649,13 @@ class BaseDataCollector(ABC): """ if symbol not in self.symbols: self.symbols.add(symbol) - self.logger.info(f"Added symbol: {symbol}") + self._log_info(f"Added symbol: {symbol}") + + # If collector is running, subscribe to new symbol + if self.status == CollectorStatus.RUNNING: + # Note: This needs to be called from an async context + # Users should handle this appropriately + pass def remove_symbol(self, symbol: str) -> None: """ @@ -612,7 +666,13 @@ class BaseDataCollector(ABC): """ if symbol in self.symbols: self.symbols.remove(symbol) - self.logger.info(f"Removed symbol: {symbol}") + self._log_info(f"Removed symbol: {symbol}") + + # If collector is running, unsubscribe from symbol + if self.status == CollectorStatus.RUNNING: + # Note: This needs to be called from an async context + # Users should handle this appropriately + pass def validate_ohlcv_data(self, data: Dict[str, Any], symbol: str, timeframe: str) -> OHLCVData: """ diff --git a/data/collector_manager.py b/data/collector_manager.py index 79af2aa..c5e2e4e 100644 --- a/data/collector_manager.py +++ b/data/collector_manager.py @@ -51,7 +51,9 @@ class CollectorManager: def __init__(self, manager_name: str = "collector_manager", global_health_check_interval: float = 60.0, - restart_delay: float = 5.0): + restart_delay: float = 5.0, + logger = None, + log_errors_only: bool = False): """ Initialize the collector manager. @@ -59,13 +61,19 @@ class CollectorManager: manager_name: Name for logging global_health_check_interval: Seconds between global health checks restart_delay: Delay between restart attempts + logger: Logger instance. If None, no logging will be performed. + log_errors_only: If True and logger is provided, only log error-level messages """ self.manager_name = manager_name self.global_health_check_interval = global_health_check_interval self.restart_delay = restart_delay + self.log_errors_only = log_errors_only - # Initialize logger - self.logger = get_logger(f"data_collector_manager", verbose=True) + # Initialize logger based on parameters + if logger is not None: + self.logger = logger + else: + self.logger = None # Manager state self.status = ManagerStatus.STOPPED @@ -91,7 +99,33 @@ class CollectorManager: 'uptime_start': None } - self.logger.info(f"Initialized collector manager: {manager_name}") + if self.logger and not self.log_errors_only: + self.logger.info(f"Initialized collector manager: {manager_name}") + + def _log_debug(self, message: str) -> None: + """Log debug message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.debug(message) + + def _log_info(self, message: str) -> None: + """Log info message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.info(message) + + def _log_warning(self, message: str) -> None: + """Log warning message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.warning(message) + + def _log_error(self, message: str, exc_info: bool = False) -> None: + """Log error message if logger is available (always logs errors regardless of log_errors_only).""" + if self.logger: + self.logger.error(message, exc_info=exc_info) + + def _log_critical(self, message: str, exc_info: bool = False) -> None: + """Log critical message if logger is available (always logs critical regardless of log_errors_only).""" + if self.logger: + self.logger.critical(message, exc_info=exc_info) def add_collector(self, collector: BaseDataCollector, @@ -131,8 +165,8 @@ class CollectorManager: self._stats['total_collectors'] = len(self._collectors) - self.logger.info(f"Added collector: {collector_name} ({collector.exchange_name}) - " - f"Symbols: {', '.join(collector.symbols)} - Enabled: {config.enabled}") + self._log_info(f"Added collector: {collector_name} ({collector.exchange_name}) - " + f"Symbols: {', '.join(collector.symbols)} - Enabled: {config.enabled}") def remove_collector(self, collector_name: str) -> bool: """ @@ -145,7 +179,7 @@ class CollectorManager: True if removed successfully, False if not found """ if collector_name not in self._collectors: - self.logger.warning(f"Collector not found: {collector_name}") + self._log_warning(f"Collector not found: {collector_name}") return False # Stop the collector first (only if event loop is running) @@ -156,7 +190,7 @@ class CollectorManager: asyncio.create_task(collector.stop(force=True)) except RuntimeError: # No event loop running, just log - self.logger.info(f"Collector {collector_name} will be removed without stopping (no event loop)") + self._log_info(f"Collector {collector_name} will be removed without stopping (no event loop)") # Remove from management del self._collectors[collector_name] @@ -165,7 +199,7 @@ class CollectorManager: self._stats['total_collectors'] = len(self._collectors) - self.logger.info(f"Removed collector: {collector_name}") + self._log_info(f"Removed collector: {collector_name}") return True def enable_collector(self, collector_name: str) -> bool: @@ -179,7 +213,7 @@ class CollectorManager: True if enabled successfully, False if not found """ if collector_name not in self._collectors: - self.logger.warning(f"Collector not found: {collector_name}") + self._log_warning(f"Collector not found: {collector_name}") return False self._enabled_collectors.add(collector_name) @@ -191,9 +225,9 @@ class CollectorManager: asyncio.create_task(self._start_collector(collector_name)) except RuntimeError: # No event loop running, will be started when manager starts - self.logger.debug(f"Collector {collector_name} enabled but will start when manager starts") + self._log_debug(f"Collector {collector_name} enabled but will start when manager starts") - self.logger.info(f"Enabled collector: {collector_name}") + self._log_info(f"Enabled collector: {collector_name}") return True def disable_collector(self, collector_name: str) -> bool: @@ -232,10 +266,10 @@ class CollectorManager: True if started successfully, False otherwise """ if self.status in [ManagerStatus.RUNNING, ManagerStatus.STARTING]: - self.logger.warning("Collector manager is already running or starting") + self._log_warning("Collector manager is already running or starting") return True - self.logger.info("Starting collector manager") + self._log_info("Starting collector manager") self.status = ManagerStatus.STARTING try: @@ -253,7 +287,7 @@ class CollectorManager: try: await asyncio.wait_for(asyncio.gather(*start_tasks, return_exceptions=True), timeout=30.0) except asyncio.TimeoutError: - self.logger.warning("Some collectors took too long to start") + self._log_warning("Some collectors took too long to start") # Start global health monitoring health_task = asyncio.create_task(self._global_health_monitor()) @@ -261,21 +295,21 @@ class CollectorManager: health_task.add_done_callback(self._tasks.discard) self.status = ManagerStatus.RUNNING - self.logger.info(f"Collector manager started - Managing {len(self._enabled_collectors)} collectors") + self._log_info(f"Collector manager started - Managing {len(self._enabled_collectors)} collectors") return True except Exception as e: self.status = ManagerStatus.ERROR - self.logger.error(f"Failed to start collector manager: {e}") + self._log_error(f"Failed to start collector manager: {e}") return False async def stop(self) -> None: """Stop the collector manager and all collectors.""" if self.status == ManagerStatus.STOPPED: - self.logger.warning("Collector manager is already stopped") + self._log_warning("Collector manager is already stopped") return - self.logger.info("Stopping collector manager") + self._log_info("Stopping collector manager") self.status = ManagerStatus.STOPPING self._running = False @@ -298,14 +332,14 @@ class CollectorManager: try: await asyncio.wait_for(asyncio.gather(*stop_tasks, return_exceptions=True), timeout=30.0) except asyncio.TimeoutError: - self.logger.warning("Some collectors took too long to stop") + self._log_warning("Some collectors took too long to stop") self.status = ManagerStatus.STOPPED - self.logger.info("Collector manager stopped") + self._log_info("Collector manager stopped") except Exception as e: self.status = ManagerStatus.ERROR - self.logger.error(f"Error stopping collector manager: {e}") + self._log_error(f"Error stopping collector manager: {e}") async def restart_collector(self, collector_name: str) -> bool: """ @@ -318,23 +352,23 @@ class CollectorManager: True if restarted successfully, False otherwise """ if collector_name not in self._collectors: - self.logger.warning(f"Collector not found: {collector_name}") + self._log_warning(f"Collector not found: {collector_name}") return False collector = self._collectors[collector_name] - self.logger.info(f"Restarting collector: {collector_name}") + self._log_info(f"Restarting collector: {collector_name}") try: success = await collector.restart() if success: self._stats['restarts_performed'] += 1 - self.logger.info(f"Successfully restarted collector: {collector_name}") + self._log_info(f"Successfully restarted collector: {collector_name}") else: - self.logger.error(f"Failed to restart collector: {collector_name}") + self._log_error(f"Failed to restart collector: {collector_name}") return success except Exception as e: - self.logger.error(f"Error restarting collector {collector_name}: {e}") + self._log_error(f"Error restarting collector {collector_name}: {e}") return False async def _start_collector(self, collector_name: str) -> bool: @@ -348,7 +382,7 @@ class CollectorManager: True if started successfully, False otherwise """ if collector_name not in self._collectors: - self.logger.warning(f"Collector not found: {collector_name}") + self._log_warning(f"Collector not found: {collector_name}") return False collector = self._collectors[collector_name] @@ -356,18 +390,18 @@ class CollectorManager: try: success = await collector.start() if success: - self.logger.info(f"Started collector: {collector_name}") + self._log_info(f"Started collector: {collector_name}") else: - self.logger.error(f"Failed to start collector: {collector_name}") + self._log_error(f"Failed to start collector: {collector_name}") return success except Exception as e: - self.logger.error(f"Error starting collector {collector_name}: {e}") + self._log_error(f"Error starting collector {collector_name}: {e}") return False async def _global_health_monitor(self) -> None: """Global health monitoring for all collectors.""" - self.logger.debug("Starting global health monitor") + self._log_debug("Starting global health monitor") while self._running: try: @@ -388,25 +422,25 @@ class CollectorManager: running_count += 1 elif not health_status['is_healthy']: failed_count += 1 - self.logger.warning(f"Collector {collector_name} is unhealthy: {health_status['issues']}") + self._log_warning(f"Collector {collector_name} is unhealthy: {health_status['issues']}") # Auto-restart if needed and not already restarting if (collector.auto_restart and collector.status not in [CollectorStatus.STARTING, CollectorStatus.STOPPING]): - self.logger.info(f"Auto-restarting unhealthy collector: {collector_name}") + self._log_info(f"Auto-restarting unhealthy collector: {collector_name}") asyncio.create_task(self.restart_collector(collector_name)) # Update global statistics self._stats['running_collectors'] = running_count self._stats['failed_collectors'] = failed_count - self.logger.debug(f"Health check complete - Running: {running_count}, Failed: {failed_count}") + self._log_debug(f"Health check complete - Running: {running_count}, Failed: {failed_count}") except asyncio.CancelledError: - self.logger.debug("Global health monitor cancelled") + self._log_debug("Global health monitor cancelled") break except Exception as e: - self.logger.error(f"Error in global health monitor: {e}") + self._log_error(f"Error in global health monitor: {e}") await asyncio.sleep(self.global_health_check_interval) def get_status(self) -> Dict[str, Any]: diff --git a/data/common/aggregation.py b/data/common/aggregation.py index 3b3748d..bb803df 100644 --- a/data/common/aggregation.py +++ b/data/common/aggregation.py @@ -30,7 +30,6 @@ from .data_types import ( CandleProcessingConfig, ProcessingStats ) -from utils.logger import get_logger class TimeframeBucket: @@ -183,7 +182,8 @@ class RealTimeCandleProcessor: symbol: str, exchange: str, config: Optional[CandleProcessingConfig] = None, - component_name: str = "realtime_candle_processor"): + component_name: str = "realtime_candle_processor", + logger = None): """ Initialize real-time candle processor. @@ -197,7 +197,7 @@ class RealTimeCandleProcessor: self.exchange = exchange self.config = config or CandleProcessingConfig() self.component_name = component_name - self.logger = get_logger(self.component_name) + self.logger = logger # Current buckets for each timeframe self.current_buckets: Dict[str, TimeframeBucket] = {} @@ -208,12 +208,14 @@ class RealTimeCandleProcessor: # Statistics self.stats = ProcessingStats(active_timeframes=len(self.config.timeframes)) - self.logger.info(f"Initialized real-time candle processor for {symbol} on {exchange} with timeframes: {self.config.timeframes}") + if self.logger: + self.logger.info(f"{self.component_name}: Initialized real-time candle processor for {symbol} on {exchange} with timeframes: {self.config.timeframes}") def add_candle_callback(self, callback: Callable[[OHLCVCandle], None]) -> None: """Add callback function to receive completed candles.""" self.candle_callbacks.append(callback) - self.logger.debug(f"Added candle callback: {callback.__name__ if hasattr(callback, '__name__') else str(callback)}") + if self.logger: + self.logger.debug(f"{self.component_name}: Added candle callback: {callback.__name__ if hasattr(callback, '__name__') else str(callback)}") def process_trade(self, trade: StandardizedTrade) -> List[OHLCVCandle]: """ @@ -250,7 +252,8 @@ class RealTimeCandleProcessor: return completed_candles except Exception as e: - self.logger.error(f"Error processing trade for {self.symbol}: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error processing trade for {self.symbol}: {e}") self.stats.errors_count += 1 return [] @@ -292,12 +295,14 @@ class RealTimeCandleProcessor: # Add trade to current bucket if not current_bucket.add_trade(trade): # This should never happen if logic is correct - self.logger.warning(f"Trade {trade.timestamp} could not be added to bucket {current_bucket.start_time}-{current_bucket.end_time}") + if self.logger: + self.logger.warning(f"{self.component_name}: Trade {trade.timestamp} could not be added to bucket {current_bucket.start_time}-{current_bucket.end_time}") return completed_candle except Exception as e: - self.logger.error(f"Error processing trade for timeframe {timeframe}: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error processing trade for timeframe {timeframe}: {e}") self.stats.errors_count += 1 return None @@ -353,7 +358,8 @@ class RealTimeCandleProcessor: for callback in self.candle_callbacks: callback(candle) except Exception as e: - self.logger.error(f"Error in candle callback: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error in candle callback: {e}") self.stats.errors_count += 1 def get_current_candles(self, incomplete: bool = True) -> List[OHLCVCandle]: @@ -408,7 +414,8 @@ class BatchCandleProcessor: symbol: str, exchange: str, timeframes: List[str], - component_name: str = "batch_candle_processor"): + component_name: str = "batch_candle_processor", + logger = None): """ Initialize batch candle processor. @@ -422,12 +429,13 @@ class BatchCandleProcessor: self.exchange = exchange self.timeframes = timeframes self.component_name = component_name - self.logger = get_logger(self.component_name) + self.logger = logger # Statistics self.stats = ProcessingStats(active_timeframes=len(timeframes)) - self.logger.info(f"Initialized batch candle processor for {symbol} on {exchange}") + if self.logger: + self.logger.info(f"{self.component_name}: Initialized batch candle processor for {symbol} on {exchange}") def process_trades_to_candles(self, trades: Iterator[StandardizedTrade]) -> List[OHLCVCandle]: """ @@ -469,11 +477,13 @@ class BatchCandleProcessor: if all_candles: self.stats.last_candle_time = max(candle.end_time for candle in all_candles) - self.logger.info(f"Batch processed {self.stats.trades_processed} trades to {len(all_candles)} candles") + if self.logger: + self.logger.info(f"{self.component_name}: Batch processed {self.stats.trades_processed} trades to {len(all_candles)} candles") return all_candles except Exception as e: - self.logger.error(f"Error in batch processing trades to candles: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error in batch processing trades to candles: {e}") self.stats.errors_count += 1 return [] diff --git a/data/common/transformation.py b/data/common/transformation.py index cf7bf8d..25b412b 100644 --- a/data/common/transformation.py +++ b/data/common/transformation.py @@ -12,7 +12,6 @@ from abc import ABC, abstractmethod from .data_types import StandardizedTrade, OHLCVCandle, DataValidationResult from .aggregation import BatchCandleProcessor -from utils.logger import get_logger class BaseDataTransformer(ABC): @@ -25,7 +24,8 @@ class BaseDataTransformer(ABC): def __init__(self, exchange_name: str, - component_name: str = "base_data_transformer"): + component_name: str = "base_data_transformer", + logger = None): """ Initialize base data transformer. @@ -35,9 +35,10 @@ class BaseDataTransformer(ABC): """ self.exchange_name = exchange_name self.component_name = component_name - self.logger = get_logger(self.component_name) + self.logger = logger - self.logger.info(f"Initialized base data transformer for {exchange_name}") + if self.logger: + self.logger.info(f"{self.component_name}: Initialized base data transformer for {exchange_name}") # Abstract methods that must be implemented by subclasses @@ -87,7 +88,8 @@ class BaseDataTransformer(ABC): return dt except Exception as e: - self.logger.error(f"Error converting timestamp {timestamp}: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error converting timestamp {timestamp}: {e}") # Return current time as fallback return datetime.now(timezone.utc) @@ -107,7 +109,8 @@ class BaseDataTransformer(ABC): return None return Decimal(str(value)) except Exception as e: - self.logger.warning(f"Failed to convert {field_name} '{value}' to Decimal: {e}") + if self.logger: + self.logger.warning(f"{self.component_name}: Failed to convert {field_name} '{value}' to Decimal: {e}") return None def normalize_trade_side(self, side: str) -> str: @@ -125,10 +128,11 @@ class BaseDataTransformer(ABC): # Handle common variations if normalized in ['buy', 'bid', 'b', '1']: return 'buy' - elif normalized in ['sell', 'ask', 's', '0']: + elif normalized in ['sell', 'ask', 's', '0']: return 'sell' else: - self.logger.warning(f"Unknown trade side: {side}, defaulting to 'buy'") + if self.logger: + self.logger.warning(f"{self.component_name}: Unknown trade side: {side}, defaulting to 'buy'") return 'buy' def validate_symbol_format(self, symbol: str) -> str: @@ -165,7 +169,8 @@ class BaseDataTransformer(ABC): Returns: StandardizedTrade or None if transformation failed """ - self.logger.warning("transform_database_record not implemented for this exchange") + if self.logger: + self.logger.warning(f"{self.component_name}: transform_database_record not implemented for this exchange") return None def get_transformer_info(self) -> Dict[str, Any]: @@ -201,7 +206,8 @@ class UnifiedDataTransformer: def __init__(self, exchange_transformer: BaseDataTransformer, - component_name: str = "unified_data_transformer"): + component_name: str = "unified_data_transformer", + logger = None): """ Initialize unified data transformer. @@ -211,9 +217,10 @@ class UnifiedDataTransformer: """ self.exchange_transformer = exchange_transformer self.component_name = component_name - self.logger = get_logger(self.component_name) + self.logger = logger - self.logger.info(f"Initialized unified data transformer with {exchange_transformer.exchange_name} transformer") + if self.logger: + self.logger.info(f"{self.component_name}: Initialized unified data transformer with {exchange_transformer.exchange_name} transformer") def transform_trade_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[StandardizedTrade]: """ @@ -229,7 +236,8 @@ class UnifiedDataTransformer: try: return self.exchange_transformer.transform_trade_data(raw_data, symbol) except Exception as e: - self.logger.error(f"Error in trade transformation: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error in trade transformation: {e}") return None def transform_orderbook_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[Dict[str, Any]]: @@ -246,7 +254,8 @@ class UnifiedDataTransformer: try: return self.exchange_transformer.transform_orderbook_data(raw_data, symbol) except Exception as e: - self.logger.error(f"Error in orderbook transformation: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error in orderbook transformation: {e}") return None def transform_ticker_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[Dict[str, Any]]: @@ -263,7 +272,8 @@ class UnifiedDataTransformer: try: return self.exchange_transformer.transform_ticker_data(raw_data, symbol) except Exception as e: - self.logger.error(f"Error in ticker transformation: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error in ticker transformation: {e}") return None def process_trades_to_candles(self, @@ -296,11 +306,13 @@ class UnifiedDataTransformer: candles = processor.process_trades_to_candles(trades) - self.logger.info(f"Processed {processor.get_stats()['trades_processed']} trades to {len(candles)} candles") + if self.logger: + self.logger.info(f"{self.component_name}: Processed {processor.get_stats()['trades_processed']} trades to {len(candles)} candles") return candles except Exception as e: - self.logger.error(f"Error processing trades to candles: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error processing trades to candles: {e}") return [] def batch_transform_trades(self, @@ -327,10 +339,12 @@ class UnifiedDataTransformer: else: errors += 1 except Exception as e: - self.logger.error(f"Error transforming trade: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error transforming trade: {e}") errors += 1 - self.logger.info(f"Batch transformed {len(transformed_trades)} trades successfully, {errors} errors") + if self.logger: + self.logger.info(f"{self.component_name}: Batch transformed {len(transformed_trades)} trades successfully, {errors} errors") return transformed_trades def get_transformer_info(self) -> Dict[str, Any]: @@ -457,8 +471,7 @@ def batch_create_standardized_trades(raw_trades: List[Dict[str, Any]], trades.append(trade) except Exception as e: # Log error but continue processing - logger = get_logger("batch_transform") - logger.warning(f"Failed to transform trade: {e}") + print(f"Failed to transform trade: {e}") return trades diff --git a/data/common/validation.py b/data/common/validation.py index a86eb6f..e820ea8 100644 --- a/data/common/validation.py +++ b/data/common/validation.py @@ -12,7 +12,6 @@ from typing import Dict, List, Optional, Any, Union, Pattern from abc import ABC, abstractmethod from .data_types import DataValidationResult, StandardizedTrade, TradeSide -from utils.logger import get_logger class ValidationResult: @@ -35,17 +34,19 @@ class BaseDataValidator(ABC): def __init__(self, exchange_name: str, - component_name: str = "base_data_validator"): + component_name: str = "base_data_validator", + logger = None): """ Initialize base data validator. Args: exchange_name: Name of the exchange (e.g., 'okx', 'binance') component_name: Name for logging + logger: Logger instance. If None, no logging will be performed. """ self.exchange_name = exchange_name self.component_name = component_name - self.logger = get_logger(self.component_name) + self.logger = logger # Common validation patterns self._numeric_pattern = re.compile(r'^-?\d*\.?\d+$') @@ -64,7 +65,8 @@ class BaseDataValidator(ABC): self._min_timestamp = 1000000000000 # 2001-09-09 (reasonable minimum) self._max_timestamp = 9999999999999 # 2286-11-20 (reasonable maximum) - self.logger.debug(f"Initialized base data validator for {exchange_name}") + if self.logger: + self.logger.debug(f"{self.component_name}: Initialized {exchange_name} data validator") # Abstract methods that must be implemented by subclasses diff --git a/data/exchanges/okx/collector.py b/data/exchanges/okx/collector.py index 455d942..e3333b0 100644 --- a/data/exchanges/okx/collector.py +++ b/data/exchanges/okx/collector.py @@ -23,7 +23,6 @@ from .websocket import ( from .data_processor import OKXDataProcessor from database.connection import get_db_manager, get_raw_data_manager from database.models import MarketData, RawTrade -from utils.logger import get_logger @dataclass @@ -52,7 +51,9 @@ class OKXCollector(BaseDataCollector): component_name: Optional[str] = None, auto_restart: bool = True, health_check_interval: float = 30.0, - store_raw_data: bool = True): + store_raw_data: bool = True, + logger = None, + log_errors_only: bool = False): """ Initialize OKX collector for a single trading pair. @@ -63,6 +64,8 @@ class OKXCollector(BaseDataCollector): auto_restart: Enable automatic restart on failures health_check_interval: Seconds between health checks store_raw_data: Whether to store raw data for debugging + logger: Logger instance for conditional logging (None for no logging) + log_errors_only: If True and logger provided, only log error-level messages """ # Default data types if not specified if data_types is None: @@ -79,7 +82,9 @@ class OKXCollector(BaseDataCollector): data_types=data_types, component_name=component_name, auto_restart=auto_restart, - health_check_interval=health_check_interval + health_check_interval=health_check_interval, + logger=logger, + log_errors_only=log_errors_only ) # OKX-specific settings @@ -90,7 +95,7 @@ class OKXCollector(BaseDataCollector): self._ws_client: Optional[OKXWebSocketClient] = None # Data processor using new common framework - self._data_processor = OKXDataProcessor(symbol, component_name=f"{component_name}_processor") + self._data_processor = OKXDataProcessor(symbol, component_name=f"{component_name}_processor", logger=logger) # Add callbacks for processed data self._data_processor.add_trade_callback(self._on_trade_processed) @@ -113,8 +118,9 @@ class OKXCollector(BaseDataCollector): DataType.TICKER: OKXChannelType.TICKERS.value } - self.logger.info(f"Initialized OKX collector for {symbol} with data types: {[dt.value for dt in data_types]}") - self.logger.info(f"Using common data processing framework") + if logger: + logger.info(f"{component_name}: Initialized OKX collector for {symbol} with data types: {[dt.value for dt in data_types]}") + logger.info(f"{component_name}: Using common data processing framework") async def connect(self) -> bool: """ @@ -124,7 +130,8 @@ class OKXCollector(BaseDataCollector): True if connection successful, False otherwise """ try: - self.logger.info(f"Connecting OKX collector for {self.symbol}") + if self.logger: + self.logger.info(f"{self.component_name}: Connecting OKX collector for {self.symbol}") # Initialize database managers self._db_manager = get_db_manager() @@ -146,29 +153,35 @@ class OKXCollector(BaseDataCollector): # Connect to WebSocket if not await self._ws_client.connect(use_public=True): - self.logger.error("Failed to connect to OKX WebSocket") + if self.logger: + self.logger.error(f"{self.component_name}: Failed to connect to OKX WebSocket") return False - self.logger.info(f"Successfully connected OKX collector for {self.symbol}") + if self.logger: + self.logger.info(f"{self.component_name}: Successfully connected OKX collector for {self.symbol}") return True except Exception as e: - self.logger.error(f"Error connecting OKX collector for {self.symbol}: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error connecting OKX collector for {self.symbol}: {e}") return False async def disconnect(self) -> None: """Disconnect from OKX WebSocket API.""" try: - self.logger.info(f"Disconnecting OKX collector for {self.symbol}") + if self.logger: + self.logger.info(f"{self.component_name}: Disconnecting OKX collector for {self.symbol}") if self._ws_client: await self._ws_client.disconnect() self._ws_client = None - self.logger.info(f"Disconnected OKX collector for {self.symbol}") + if self.logger: + self.logger.info(f"{self.component_name}: Disconnected OKX collector for {self.symbol}") except Exception as e: - self.logger.error(f"Error disconnecting OKX collector for {self.symbol}: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error disconnecting OKX collector for {self.symbol}: {e}") async def subscribe_to_data(self, symbols: List[str], data_types: List[DataType]) -> bool: """ @@ -182,12 +195,14 @@ class OKXCollector(BaseDataCollector): True if subscription successful, False otherwise """ if not self._ws_client or not self._ws_client.is_connected: - self.logger.error("WebSocket client not connected") + if self.logger: + self.logger.error(f"{self.component_name}: WebSocket client not connected") return False # Validate symbol if self.symbol not in symbols: - self.logger.warning(f"Symbol {self.symbol} not in subscription list: {symbols}") + if self.logger: + self.logger.warning(f"{self.component_name}: Symbol {self.symbol} not in subscription list: {symbols}") return False try: @@ -202,25 +217,31 @@ class OKXCollector(BaseDataCollector): enabled=True ) subscriptions.append(subscription) - self.logger.debug(f"Added subscription: {channel} for {self.symbol}") + if self.logger: + self.logger.debug(f"{self.component_name}: Added subscription: {channel} for {self.symbol}") else: - self.logger.warning(f"Unsupported data type: {data_type}") + if self.logger: + self.logger.warning(f"{self.component_name}: Unsupported data type: {data_type}") if not subscriptions: - self.logger.warning("No valid subscriptions to create") + if self.logger: + self.logger.warning(f"{self.component_name}: No valid subscriptions to create") return False # Subscribe to channels success = await self._ws_client.subscribe(subscriptions) if success: - self.logger.info(f"Successfully subscribed to {len(subscriptions)} channels for {self.symbol}") + if self.logger: + self.logger.info(f"{self.component_name}: Successfully subscribed to {len(subscriptions)} channels for {self.symbol}") return True else: - self.logger.error(f"Failed to subscribe to channels for {self.symbol}") + if self.logger: + self.logger.error(f"{self.component_name}: Failed to subscribe to channels for {self.symbol}") return False except Exception as e: - self.logger.error(f"Error subscribing to data for {self.symbol}: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error subscribing to data for {self.symbol}: {e}") return False async def unsubscribe_from_data(self, symbols: List[str], data_types: List[DataType]) -> bool: @@ -235,7 +256,8 @@ class OKXCollector(BaseDataCollector): True if unsubscription successful, False otherwise """ if not self._ws_client or not self._ws_client.is_connected: - self.logger.warning("WebSocket client not connected") + if self.logger: + self.logger.warning(f"{self.component_name}: WebSocket client not connected") return True # Consider it successful if not connected try: @@ -257,14 +279,17 @@ class OKXCollector(BaseDataCollector): # Unsubscribe from channels success = await self._ws_client.unsubscribe(subscriptions) if success: - self.logger.info(f"Successfully unsubscribed from {len(subscriptions)} channels for {self.symbol}") + if self.logger: + self.logger.info(f"{self.component_name}: Successfully unsubscribed from {len(subscriptions)} channels for {self.symbol}") return True else: - self.logger.error(f"Failed to unsubscribe from channels for {self.symbol}") + if self.logger: + self.logger.error(f"{self.component_name}: Failed to unsubscribe from channels for {self.symbol}") return False except Exception as e: - self.logger.error(f"Error unsubscribing from data for {self.symbol}: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error unsubscribing from data for {self.symbol}: {e}") return False async def _process_message(self, message: Any) -> Optional[MarketDataPoint]: @@ -278,7 +303,8 @@ class OKXCollector(BaseDataCollector): MarketDataPoint if processing successful, None otherwise """ if not isinstance(message, dict): - self.logger.warning(f"Received non-dict message: {type(message)}") + if self.logger: + self.logger.warning(f"{self.component_name}: Received non-dict message: {type(message)}") return None try: @@ -291,11 +317,13 @@ class OKXCollector(BaseDataCollector): if not success: self._error_count += 1 - self.logger.error(f"Message processing failed: {errors}") + if self.logger: + self.logger.error(f"{self.component_name}: Message processing failed: {errors}") return None if errors: - self.logger.warning(f"Message processing warnings: {errors}") + if self.logger: + self.logger.warning(f"{self.component_name}: Message processing warnings: {errors}") # Store raw data if enabled (for debugging/compliance) if self.store_raw_data and 'data' in message and 'arg' in message: @@ -310,7 +338,8 @@ class OKXCollector(BaseDataCollector): except Exception as e: self._error_count += 1 - self.logger.error(f"Error processing message: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error processing message: {e}") return None async def _handle_messages(self) -> None: @@ -340,10 +369,12 @@ class OKXCollector(BaseDataCollector): raw_data=data_point.data ) session.add(raw_trade) - self.logger.debug(f"Stored raw data: {data_point.data_type.value} for {data_point.symbol}") + if self.logger: + self.logger.debug(f"{self.component_name}: Stored raw data: {data_point.data_type.value} for {data_point.symbol}") except Exception as e: - self.logger.error(f"Error storing raw market data: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error storing raw market data: {e}") async def _store_completed_candle(self, candle: OHLCVCandle) -> None: """ @@ -371,10 +402,12 @@ class OKXCollector(BaseDataCollector): trades_count=candle.trade_count ) session.add(market_data) - self.logger.info(f"Stored completed candle: {candle.symbol} {candle.timeframe} at {candle.start_time}") + if self.logger: + self.logger.info(f"{self.component_name}: Stored completed candle: {candle.symbol} {candle.timeframe} at {candle.start_time}") except Exception as e: - self.logger.error(f"Error storing completed candle: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error storing completed candle: {e}") async def _store_raw_data(self, channel: str, raw_message: Dict[str, Any]) -> None: """ @@ -399,7 +432,8 @@ class OKXCollector(BaseDataCollector): ) except Exception as e: - self.logger.error(f"Error storing raw WebSocket data: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error storing raw WebSocket data: {e}") def _on_message(self, message: Dict[str, Any]) -> None: """ @@ -412,7 +446,8 @@ class OKXCollector(BaseDataCollector): # Process message asynchronously asyncio.create_task(self._process_message(message)) except Exception as e: - self.logger.error(f"Error handling WebSocket message: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error handling WebSocket message: {e}") def _on_trade_processed(self, trade: StandardizedTrade) -> None: """ @@ -422,7 +457,8 @@ class OKXCollector(BaseDataCollector): trade: Processed standardized trade """ self._processed_trades += 1 - self.logger.debug(f"Processed trade: {trade.symbol} {trade.side} {trade.size}@{trade.price}") + if self.logger: + self.logger.debug(f"{self.component_name}: Processed trade: {trade.symbol} {trade.side} {trade.size}@{trade.price}") def _on_candle_processed(self, candle: OHLCVCandle) -> None: """ @@ -432,7 +468,8 @@ class OKXCollector(BaseDataCollector): candle: Completed OHLCV candle """ self._processed_candles += 1 - self.logger.info(f"Completed candle: {candle.symbol} {candle.timeframe} O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}") + if self.logger: + self.logger.info(f"{self.component_name}: Completed candle: {candle.symbol} {candle.timeframe} O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}") # Store completed candle in market_data table if candle.is_complete: diff --git a/data/exchanges/okx/data_processor.py b/data/exchanges/okx/data_processor.py index 4069f96..dc7c0b0 100644 --- a/data/exchanges/okx/data_processor.py +++ b/data/exchanges/okx/data_processor.py @@ -24,7 +24,6 @@ from ...common import ( UnifiedDataTransformer, create_standardized_trade ) -from utils.logger import get_logger class OKXMessageType(Enum): @@ -81,9 +80,9 @@ class OKXDataValidator(BaseDataValidator): symbol patterns, and data structures. """ - def __init__(self, component_name: str = "okx_data_validator"): + def __init__(self, component_name: str = "okx_data_validator", logger = None): """Initialize OKX data validator.""" - super().__init__("okx", component_name) + super().__init__("okx", component_name, logger) # OKX-specific patterns self._symbol_pattern = re.compile(r'^[A-Z0-9]+-[A-Z0-9]+$') # BTC-USDT, ETH-USDC @@ -95,7 +94,8 @@ class OKXDataValidator(BaseDataValidator): 'candle1m', 'candle5m', 'candle15m', 'candle1H', 'candle4H', 'candle1D' } - self.logger.debug("Initialized OKX data validator") + if self.logger: + self.logger.debug("Initialized OKX data validator") def validate_symbol_format(self, symbol: str) -> ValidationResult: """Validate OKX symbol format (e.g., BTC-USDT).""" @@ -423,9 +423,9 @@ class OKXDataTransformer(BaseDataTransformer): This class handles transformation of OKX data formats to standardized formats. """ - def __init__(self, component_name: str = "okx_data_transformer"): + def __init__(self, component_name: str = "okx_data_transformer", logger = None): """Initialize OKX data transformer.""" - super().__init__("okx", component_name) + super().__init__("okx", component_name, logger) def transform_trade_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[StandardizedTrade]: """Transform OKX trade data to standardized format.""" @@ -442,7 +442,8 @@ class OKXDataTransformer(BaseDataTransformer): is_milliseconds=True ) except Exception as e: - self.logger.error(f"Error transforming OKX trade data: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error transforming OKX trade data: {e}") return None def transform_orderbook_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[Dict[str, Any]]: @@ -458,7 +459,8 @@ class OKXDataTransformer(BaseDataTransformer): 'raw_data': raw_data } except Exception as e: - self.logger.error(f"Error transforming OKX orderbook data: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error transforming OKX orderbook data: {e}") return None def transform_ticker_data(self, raw_data: Dict[str, Any], symbol: str) -> Optional[Dict[str, Any]]: @@ -497,7 +499,8 @@ class OKXDataTransformer(BaseDataTransformer): return ticker_data except Exception as e: - self.logger.error(f"Error transforming OKX ticker data: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error transforming OKX ticker data: {e}") return None @@ -512,7 +515,8 @@ class OKXDataProcessor: def __init__(self, symbol: str, config: Optional[CandleProcessingConfig] = None, - component_name: str = "okx_data_processor"): + component_name: str = "okx_data_processor", + logger = None): """ Initialize OKX data processor. @@ -523,17 +527,17 @@ class OKXDataProcessor: """ self.symbol = symbol self.component_name = component_name - self.logger = get_logger(self.component_name) + self.logger = logger # Core components using common utilities - self.validator = OKXDataValidator(f"{component_name}_validator") - self.transformer = OKXDataTransformer(f"{component_name}_transformer") - self.unified_transformer = UnifiedDataTransformer(self.transformer, f"{component_name}_unified") + self.validator = OKXDataValidator(f"{component_name}_validator", self.logger) + self.transformer = OKXDataTransformer(f"{component_name}_transformer", self.logger) + self.unified_transformer = UnifiedDataTransformer(self.transformer, f"{component_name}_unified", self.logger) # Real-time candle processing using common utilities self.config = config or CandleProcessingConfig() self.candle_processor = RealTimeCandleProcessor( - symbol, "okx", self.config, f"{component_name}_candles" + symbol, "okx", self.config, f"{component_name}_candles", self.logger ) # Callbacks @@ -543,7 +547,8 @@ class OKXDataProcessor: # Connect candle processor callbacks self.candle_processor.add_candle_callback(self._emit_candle_to_callbacks) - self.logger.info(f"Initialized OKX data processor for {symbol} with real-time candle processing") + if self.logger: + self.logger.info(f"{self.component_name}: Initialized OKX data processor for {symbol} with real-time candle processing") def add_trade_callback(self, callback: callable) -> None: """Add callback for processed trades.""" @@ -571,12 +576,14 @@ class OKXDataProcessor: validation_result = self.validator.validate_websocket_message(message) if not validation_result.is_valid: - self.logger.error(f"Message validation failed: {validation_result.errors}") + if self.logger: + self.logger.error(f"{self.component_name}: Message validation failed: {validation_result.errors}") return False, [], validation_result.errors # Log warnings if any if validation_result.warnings: - self.logger.warning(f"Message validation warnings: {validation_result.warnings}") + if self.logger: + self.logger.warning(f"{self.component_name}: Message validation warnings: {validation_result.warnings}") # Process data if it's a data message if 'data' in message and 'arg' in message: @@ -586,8 +593,9 @@ class OKXDataProcessor: return True, [], [] except Exception as e: - error_msg = f"Exception during message validation and processing: {str(e)}" - self.logger.error(error_msg) + error_msg = f"{self.component_name}: Exception during message validation and processing: {str(e)}" + if self.logger: + self.logger.error(error_msg) return False, [], [error_msg] def _process_data_message(self, message: Dict[str, Any], expected_symbol: Optional[str] = None) -> Tuple[bool, List[MarketDataPoint], List[str]]: @@ -626,7 +634,8 @@ class OKXDataProcessor: continue if validation_result.warnings: - self.logger.warning(f"Data validation warnings: {validation_result.warnings}") + if self.logger: + self.logger.warning(f"{self.component_name}: Data validation warnings: {validation_result.warnings}") # Create MarketDataPoint using sanitized data sanitized_data = validation_result.sanitized_data or data_item @@ -650,13 +659,14 @@ class OKXDataProcessor: self._process_real_time_trade(sanitized_data) except Exception as e: - self.logger.error(f"Error processing data item: {e}") - errors.append(f"Error processing data item: {str(e)}") + if self.logger: + self.logger.error(f"{self.component_name}: Error processing data item: {e}") + errors.append(f"{self.component_name}: Error processing data item: {str(e)}") return len(errors) == 0, market_data_points, errors except Exception as e: - error_msg = f"Exception during data message processing: {str(e)}" + error_msg = f"{self.component_name}: Exception during data message processing: {str(e)}" errors.append(error_msg) return False, [], errors @@ -675,12 +685,14 @@ class OKXDataProcessor: try: callback(standardized_trade) except Exception as e: - self.logger.error(f"Error in trade callback: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error in trade callback: {e}") # Note: Candle callbacks are handled by _emit_candle_to_callbacks except Exception as e: - self.logger.error(f"Error processing real-time trade: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error processing real-time trade: {e}") def _emit_candle_to_callbacks(self, candle: OHLCVCandle) -> None: """Emit candle to all registered callbacks.""" @@ -688,7 +700,8 @@ class OKXDataProcessor: try: callback(candle) except Exception as e: - self.logger.error(f"Error in candle callback: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error in candle callback: {e}") def _channel_to_data_type(self, channel: str) -> Optional[DataType]: """Convert OKX channel name to DataType enum.""" diff --git a/data/exchanges/okx/websocket.py b/data/exchanges/okx/websocket.py index 7bcf4a4..d146cc9 100644 --- a/data/exchanges/okx/websocket.py +++ b/data/exchanges/okx/websocket.py @@ -17,8 +17,6 @@ from dataclasses import dataclass import websockets from websockets.exceptions import ConnectionClosed, InvalidHandshake, InvalidURI -from utils.logger import get_logger - class OKXChannelType(Enum): """OKX WebSocket channel types.""" @@ -91,7 +89,8 @@ class OKXWebSocketClient: ping_interval: float = 25.0, pong_timeout: float = 10.0, max_reconnect_attempts: int = 5, - reconnect_delay: float = 5.0): + reconnect_delay: float = 5.0, + logger = None): """ Initialize OKX WebSocket client. @@ -109,7 +108,7 @@ class OKXWebSocketClient: self.reconnect_delay = reconnect_delay # Initialize logger - self.logger = get_logger(self.component_name, verbose=True) + self.logger = logger # Connection management self._websocket: Optional[Any] = None # Changed to Any to handle different websocket types @@ -138,7 +137,8 @@ class OKXWebSocketClient: 'last_message_time': None } - self.logger.info(f"Initialized OKX WebSocket client: {component_name}") + if self.logger: + self.logger.info(f"{self.component_name}: Initialized OKX WebSocket client") @property def is_connected(self) -> bool: @@ -184,7 +184,8 @@ class OKXWebSocketClient: True if connection successful, False otherwise """ if self.is_connected: - self.logger.warning("Already connected to OKX WebSocket") + if self.logger: + self.logger.warning("Already connected to OKX WebSocket") return True url = self.PUBLIC_WS_URL if use_public else self.PRIVATE_WS_URL @@ -194,7 +195,8 @@ class OKXWebSocketClient: self._connection_state = ConnectionState.CONNECTING try: - self.logger.info(f"Connecting to OKX WebSocket (attempt {attempt + 1}/{self.max_reconnect_attempts}): {url}") + if self.logger: + self.logger.info(f"{self.component_name}: Connecting to OKX WebSocket (attempt {attempt + 1}/{self.max_reconnect_attempts}): {url}") # Create SSL context for secure connection ssl_context = ssl.create_default_context() @@ -219,25 +221,30 @@ class OKXWebSocketClient: # Start background tasks await self._start_background_tasks() - self.logger.info("Successfully connected to OKX WebSocket") + if self.logger: + self.logger.info(f"{self.component_name}: Successfully connected to OKX WebSocket") return True except (InvalidURI, InvalidHandshake) as e: - self.logger.error(f"Invalid WebSocket configuration: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Invalid WebSocket configuration: {e}") self._connection_state = ConnectionState.ERROR return False except Exception as e: attempt_num = attempt + 1 - self.logger.error(f"Connection attempt {attempt_num} failed: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Connection attempt {attempt_num} failed: {e}") if attempt_num < self.max_reconnect_attempts: # Exponential backoff with jitter delay = self.reconnect_delay * (2 ** attempt) + (0.1 * attempt) - self.logger.info(f"Retrying connection in {delay:.1f} seconds...") + if self.logger: + self.logger.info(f"{self.component_name}: Retrying connection in {delay:.1f} seconds...") await asyncio.sleep(delay) else: - self.logger.error(f"All {self.max_reconnect_attempts} connection attempts failed") + if self.logger: + self.logger.error(f"{self.component_name}: All {self.max_reconnect_attempts} connection attempts failed") self._connection_state = ConnectionState.ERROR return False @@ -248,7 +255,8 @@ class OKXWebSocketClient: if not self._websocket: return - self.logger.info("Disconnecting from OKX WebSocket") + if self.logger: + self.logger.info(f"{self.component_name}: Disconnecting from OKX WebSocket") self._connection_state = ConnectionState.DISCONNECTED # Cancel background tasks @@ -258,12 +266,14 @@ class OKXWebSocketClient: try: await self._websocket.close() except Exception as e: - self.logger.warning(f"Error closing WebSocket: {e}") + if self.logger: + self.logger.warning(f"{self.component_name}: Error closing WebSocket: {e}") self._websocket = None self._is_authenticated = False - self.logger.info("Disconnected from OKX WebSocket") + if self.logger: + self.logger.info(f"{self.component_name}: Disconnected from OKX WebSocket") async def subscribe(self, subscriptions: List[OKXSubscription]) -> bool: """ @@ -276,7 +286,8 @@ class OKXWebSocketClient: True if subscription successful, False otherwise """ if not self.is_connected: - self.logger.error("Cannot subscribe: WebSocket not connected") + if self.logger: + self.logger.error("Cannot subscribe: WebSocket not connected") return False try: @@ -295,11 +306,13 @@ class OKXWebSocketClient: key = f"{sub.channel}:{sub.inst_id}" self._subscriptions[key] = sub - self.logger.info(f"Subscribed to {len(subscriptions)} channels") + if self.logger: + self.logger.info(f"{self.component_name}: Subscribed to {len(subscriptions)} channels") return True except Exception as e: - self.logger.error(f"Failed to subscribe to channels: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Failed to subscribe to channels: {e}") return False async def unsubscribe(self, subscriptions: List[OKXSubscription]) -> bool: @@ -313,7 +326,8 @@ class OKXWebSocketClient: True if unsubscription successful, False otherwise """ if not self.is_connected: - self.logger.error("Cannot unsubscribe: WebSocket not connected") + if self.logger: + self.logger.error("Cannot unsubscribe: WebSocket not connected") return False try: @@ -332,11 +346,13 @@ class OKXWebSocketClient: key = f"{sub.channel}:{sub.inst_id}" self._subscriptions.pop(key, None) - self.logger.info(f"Unsubscribed from {len(subscriptions)} channels") + if self.logger: + self.logger.info(f"{self.component_name}: Unsubscribed from {len(subscriptions)} channels") return True except Exception as e: - self.logger.error(f"Failed to unsubscribe from channels: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Failed to unsubscribe from channels: {e}") return False def add_message_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None: @@ -347,7 +363,8 @@ class OKXWebSocketClient: callback: Function to call when message received """ self._message_callbacks.append(callback) - self.logger.debug(f"Added message callback: {callback.__name__}") + if self.logger: + self.logger.debug(f"{self.component_name}: Added message callback: {callback.__name__}") def remove_message_callback(self, callback: Callable[[Dict[str, Any]], None]) -> None: """ @@ -358,7 +375,8 @@ class OKXWebSocketClient: """ if callback in self._message_callbacks: self._message_callbacks.remove(callback) - self.logger.debug(f"Removed message callback: {callback.__name__}") + if self.logger: + self.logger.debug(f"{self.component_name}: Removed message callback: {callback.__name__}") async def _start_background_tasks(self) -> None: """Start background tasks for ping and message handling.""" @@ -368,7 +386,8 @@ class OKXWebSocketClient: # Start message handler task self._message_handler_task = asyncio.create_task(self._message_handler()) - self.logger.debug("Started background tasks") + if self.logger: + self.logger.debug(f"{self.component_name}: Started background tasks") async def _stop_background_tasks(self) -> None: """Stop background tasks.""" @@ -385,7 +404,8 @@ class OKXWebSocketClient: self._ping_task = None self._message_handler_task = None - self.logger.debug("Stopped background tasks") + if self.logger: + self.logger.debug(f"{self.component_name}: Stopped background tasks") async def _ping_loop(self) -> None: """Background task for sending ping messages.""" @@ -401,7 +421,8 @@ class OKXWebSocketClient: # Check for pong timeout if (self._last_ping_time > self._last_pong_time and current_time - self._last_ping_time > self.pong_timeout): - self.logger.warning("Pong timeout - connection may be stale") + if self.logger: + self.logger.warning(f"{self.component_name}: Pong timeout - connection may be stale") # Don't immediately disconnect, let connection error handling deal with it await asyncio.sleep(1) # Check every second @@ -409,7 +430,8 @@ class OKXWebSocketClient: except asyncio.CancelledError: break except Exception as e: - self.logger.error(f"Error in ping loop: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error in ping loop: {e}") await asyncio.sleep(5) async def _message_handler(self) -> None: @@ -432,32 +454,38 @@ class OKXWebSocketClient: await self._process_message(message) except ConnectionClosed as e: - self.logger.warning(f"WebSocket connection closed: {e}") + if self.logger: + self.logger.warning(f"{self.component_name}: WebSocket connection closed: {e}") self._connection_state = ConnectionState.DISCONNECTED # Attempt automatic reconnection if enabled if self._reconnect_attempts < self.max_reconnect_attempts: self._reconnect_attempts += 1 - self.logger.info(f"Attempting automatic reconnection ({self._reconnect_attempts}/{self.max_reconnect_attempts})") + if self.logger: + self.logger.info(f"{self.component_name}: Attempting automatic reconnection ({self._reconnect_attempts}/{self.max_reconnect_attempts})") # Stop current tasks await self._stop_background_tasks() # Attempt reconnection if await self.reconnect(): - self.logger.info("Automatic reconnection successful") + if self.logger: + self.logger.info(f"{self.component_name}: Automatic reconnection successful") continue else: - self.logger.error("Automatic reconnection failed") + if self.logger: + self.logger.error(f"{self.component_name}: Automatic reconnection failed") break else: - self.logger.error("Max reconnection attempts exceeded") + if self.logger: + self.logger.error(f"{self.component_name}: Max reconnection attempts exceeded") break except asyncio.CancelledError: break except Exception as e: - self.logger.error(f"Error in message handler: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error in message handler: {e}") await asyncio.sleep(1) async def _send_message(self, message: Dict[str, Any]) -> None: @@ -474,14 +502,17 @@ class OKXWebSocketClient: message_str = json.dumps(message) await self._websocket.send(message_str) self._stats['messages_sent'] += 1 - self.logger.debug(f"Sent message: {message}") + if self.logger: + self.logger.debug(f"{self.component_name}: Sent message: {message}") except ConnectionClosed as e: - self.logger.error(f"Connection closed while sending message: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Connection closed while sending message: {e}") self._connection_state = ConnectionState.DISCONNECTED raise OKXConnectionError(f"Connection closed: {e}") except Exception as e: - self.logger.error(f"Failed to send message: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Failed to send message: {e}") raise OKXConnectionError(f"Failed to send message: {e}") async def _send_ping(self) -> None: @@ -493,14 +524,17 @@ class OKXWebSocketClient: # OKX expects a simple "ping" string, not JSON await self._websocket.send("ping") self._stats['pings_sent'] += 1 - self.logger.debug("Sent ping to OKX") + if self.logger: + self.logger.debug(f"{self.component_name}: Sent ping to OKX") except ConnectionClosed as e: - self.logger.error(f"Connection closed while sending ping: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Connection closed while sending ping: {e}") self._connection_state = ConnectionState.DISCONNECTED raise OKXConnectionError(f"Connection closed: {e}") except Exception as e: - self.logger.error(f"Failed to send ping: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Failed to send ping: {e}") raise OKXConnectionError(f"Failed to send ping: {e}") async def _process_message(self, message: str) -> None: @@ -519,7 +553,8 @@ class OKXWebSocketClient: if message.strip() == "pong": self._last_pong_time = time.time() self._stats['pongs_received'] += 1 - self.logger.debug("Received pong from OKX") + if self.logger: + self.logger.debug(f"{self.component_name}: Received pong from OKX") return # Parse JSON message for all other responses @@ -529,21 +564,25 @@ class OKXWebSocketClient: if data.get('event') == 'pong': self._last_pong_time = time.time() self._stats['pongs_received'] += 1 - self.logger.debug("Received pong from OKX (JSON format)") + if self.logger: + self.logger.debug(f"{self.component_name}: Received pong from OKX (JSON format)") return # Handle subscription confirmations if data.get('event') == 'subscribe': - self.logger.info(f"Subscription confirmed: {data}") + if self.logger: + self.logger.info(f"{self.component_name}: Subscription confirmed: {data}") return if data.get('event') == 'unsubscribe': - self.logger.info(f"Unsubscription confirmed: {data}") + if self.logger: + self.logger.info(f"{self.component_name}: Unsubscription confirmed: {data}") return # Handle error messages if data.get('event') == 'error': - self.logger.error(f"OKX error: {data}") + if self.logger: + self.logger.error(f"{self.component_name}: OKX error: {data}") return # Process data messages @@ -553,19 +592,23 @@ class OKXWebSocketClient: try: callback(data) except Exception as e: - self.logger.error(f"Error in message callback {callback.__name__}: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error in message callback {callback.__name__}: {e}") except json.JSONDecodeError as e: # Check if it's a simple string response we haven't handled if message.strip() in ["ping", "pong"]: - self.logger.debug(f"Received simple message: {message.strip()}") + if self.logger: + self.logger.debug(f"{self.component_name}: Received simple message: {message.strip()}") if message.strip() == "pong": self._last_pong_time = time.time() self._stats['pongs_received'] += 1 else: - self.logger.error(f"Failed to parse JSON message: {e}, message: {message}") + if self.logger: + self.logger.error(f"{self.component_name}: Failed to parse JSON message: {e}, message: {message}") except Exception as e: - self.logger.error(f"Error processing message: {e}") + if self.logger: + self.logger.error(f"{self.component_name}: Error processing message: {e}") def get_stats(self) -> Dict[str, Any]: """Get connection statistics.""" @@ -588,7 +631,8 @@ class OKXWebSocketClient: Returns: True if reconnection successful, False otherwise """ - self.logger.info("Attempting to reconnect to OKX WebSocket") + if self.logger: + self.logger.info(f"{self.component_name}: Attempting to reconnect to OKX WebSocket") self._connection_state = ConnectionState.RECONNECTING self._stats['reconnections'] += 1 @@ -605,7 +649,8 @@ class OKXWebSocketClient: # Re-subscribe to previous subscriptions if self._subscriptions: subscriptions = list(self._subscriptions.values()) - self.logger.info(f"Re-subscribing to {len(subscriptions)} channels") + if self.logger: + self.logger.info(f"{self.component_name}: Re-subscribing to {len(subscriptions)} channels") await self.subscribe(subscriptions) return success diff --git a/docs/components/logging.md b/docs/components/logging.md index 47b4e68..28f5c16 100644 --- a/docs/components/logging.md +++ b/docs/components/logging.md @@ -1,6 +1,6 @@ # Unified Logging System -The TCP Dashboard project uses a unified logging system that provides consistent, centralized logging across all components. +The TCP Dashboard project uses a unified logging system that provides consistent, centralized logging across all components with advanced conditional logging capabilities. ## Features @@ -11,6 +11,315 @@ The TCP Dashboard project uses a unified logging system that provides consistent - **Verbose console logging**: Configurable console output with proper log level handling - **Automatic log cleanup**: Built-in functionality to remove old log files automatically - **Error handling**: Graceful fallback to console logging if file logging fails +- **Conditional logging**: Components can operate with or without loggers +- **Error-only logging**: Option to log only error-level messages +- **Hierarchical logging**: Parent components can pass loggers to children +- **Logger inheritance**: Consistent logging across component hierarchies + +## Conditional Logging System + +The TCP Dashboard implements a sophisticated conditional logging system that allows components to work with or without loggers, providing maximum flexibility for different deployment scenarios. + +### Key Concepts + +1. **Optional Logging**: Components accept `logger=None` and function normally without logging +2. **Error-Only Mode**: Components can log only error-level messages with `log_errors_only=True` +3. **Logger Inheritance**: Parent components pass their logger to child components +4. **Hierarchical Structure**: Log files are organized by component hierarchy + +### Usage Patterns + +#### 1. No Logging +```python +from data.collector_manager import CollectorManager +from data.exchanges.okx.collector import OKXCollector + +# Components work without any logging +manager = CollectorManager(logger=None) +collector = OKXCollector("BTC-USDT", logger=None) + +# No log files created, no console output +# Components function normally without exceptions +``` + +#### 2. Normal Logging +```python +from utils.logger import get_logger +from data.collector_manager import CollectorManager + +# Create logger for the manager +logger = get_logger('production_manager') + +# Manager logs all activities +manager = CollectorManager(logger=logger) + +# Child components inherit the logger +collector = manager.add_okx_collector("BTC-USDT") # Uses manager's logger +``` + +#### 3. Error-Only Logging +```python +from utils.logger import get_logger +from data.exchanges.okx.collector import OKXCollector + +# Create logger but only log errors +logger = get_logger('critical_only') + +# Only error and critical messages are logged +collector = OKXCollector( + "BTC-USDT", + logger=logger, + log_errors_only=True +) + +# Debug, info, warning messages are suppressed +# Error and critical messages are always logged +``` + +#### 4. Hierarchical Logging +```python +from utils.logger import get_logger +from data.collector_manager import CollectorManager + +# Top-level application logger +app_logger = get_logger('tcp_dashboard') + +# Production manager with its own logger +prod_logger = get_logger('production_manager') +manager = CollectorManager(logger=prod_logger) + +# Individual collectors with specific loggers +btc_logger = get_logger('btc_collector') +btc_collector = OKXCollector("BTC-USDT", logger=btc_logger) + +eth_collector = OKXCollector("ETH-USDT", logger=None) # No logging + +# Results in organized log structure: +# logs/tcp_dashboard/ +# logs/production_manager/ +# logs/btc_collector/ +# (no logs for ETH collector) +``` + +#### 5. Mixed Configuration +```python +from utils.logger import get_logger +from data.collector_manager import CollectorManager + +# System logger for normal operations +system_logger = get_logger('system') + +# Critical logger for error-only components +critical_logger = get_logger('critical_only') + +manager = CollectorManager(logger=system_logger) + +# Different logging strategies for different collectors +btc_collector = OKXCollector("BTC-USDT", logger=system_logger) # Full logging +eth_collector = OKXCollector("ETH-USDT", logger=critical_logger, log_errors_only=True) # Errors only +ada_collector = OKXCollector("ADA-USDT", logger=None) # No logging + +manager.add_collector(btc_collector) +manager.add_collector(eth_collector) +manager.add_collector(ada_collector) +``` + +### Implementation Details + +#### Component Constructor Pattern +All major components follow this pattern: +```python +class ComponentExample: + def __init__(self, logger=None, log_errors_only=False): + self.logger = logger + self.log_errors_only = log_errors_only + + # Conditional logging helpers + self._log_debug = self._create_conditional_logger('debug') + self._log_info = self._create_conditional_logger('info') + self._log_warning = self._create_conditional_logger('warning') + self._log_error = self._create_conditional_logger('error') + self._log_critical = self._create_conditional_logger('critical') + + def _create_conditional_logger(self, level): + """Create conditional logging function based on configuration.""" + if not self.logger: + return lambda msg: None # No-op if no logger + + log_func = getattr(self.logger, level) + + if level in ['debug', 'info', 'warning'] and self.log_errors_only: + return lambda msg: None # Suppress non-error messages + + return log_func # Normal logging +``` + +#### Supported Components + +The following components support conditional logging: + +1. **BaseDataCollector** (`data/base_collector.py`) + - Parameters: `logger=None, log_errors_only=False` + - Conditional logging for all collector operations + +2. **CollectorManager** (`data/collector_manager.py`) + - Parameters: `logger=None, log_errors_only=False` + - Manages multiple collectors with consistent logging + +3. **OKXCollector** (`data/exchanges/okx/collector.py`) + - Parameters: `logger=None, log_errors_only=False` + - Exchange-specific data collection with conditional logging + +4. **BaseDataValidator** (`data/common/validation.py`) + - Parameters: `logger=None` + - Data validation with optional logging + +5. **OKXDataTransformer** (`data/exchanges/okx/data_processor.py`) + - Parameters: `logger=None` + - Data processing with conditional logging + +### Best Practices for Conditional Logging + +#### 1. Logger Inheritance +```python +# Parent component creates logger +parent_logger = get_logger('parent_system') +parent = ParentComponent(logger=parent_logger) + +# Pass logger to children for consistent hierarchy +child1 = ChildComponent(logger=parent_logger) +child2 = ChildComponent(logger=parent_logger, log_errors_only=True) +child3 = ChildComponent(logger=None) # No logging +``` + +#### 2. Environment-Based Configuration +```python +import os +from utils.logger import get_logger + +def create_system_logger(): + """Create logger based on environment.""" + env = os.getenv('ENVIRONMENT', 'development') + + if env == 'production': + return get_logger('production_system', log_level='INFO', verbose=False) + elif env == 'testing': + return None # No logging during tests + else: + return get_logger('dev_system', log_level='DEBUG', verbose=True) + +# Use in components +system_logger = create_system_logger() +manager = CollectorManager(logger=system_logger) +``` + +#### 3. Conditional Error-Only Mode +```python +def create_collector_with_logging_strategy(symbol, strategy='normal'): + """Create collector with different logging strategies.""" + base_logger = get_logger(f'collector_{symbol.lower().replace("-", "_")}') + + if strategy == 'silent': + return OKXCollector(symbol, logger=None) + elif strategy == 'errors_only': + return OKXCollector(symbol, logger=base_logger, log_errors_only=True) + else: + return OKXCollector(symbol, logger=base_logger) + +# Usage +btc_collector = create_collector_with_logging_strategy('BTC-USDT', 'normal') +eth_collector = create_collector_with_logging_strategy('ETH-USDT', 'errors_only') +ada_collector = create_collector_with_logging_strategy('ADA-USDT', 'silent') +``` + +#### 4. Performance Optimization +```python +class OptimizedComponent: + def __init__(self, logger=None, log_errors_only=False): + self.logger = logger + self.log_errors_only = log_errors_only + + # Pre-compute logging capabilities for performance + self.can_log_debug = logger and not log_errors_only + self.can_log_info = logger and not log_errors_only + self.can_log_warning = logger and not log_errors_only + self.can_log_error = logger is not None + self.can_log_critical = logger is not None + + def process_data(self, data): + if self.can_log_debug: + self.logger.debug(f"Processing {len(data)} records") + + # ... processing logic ... + + if self.can_log_info: + self.logger.info("Data processing completed") +``` + +### Migration Guide + +#### From Standard Logging +```python +# Old approach +import logging +logger = logging.getLogger(__name__) + +class OldComponent: + def __init__(self): + self.logger = logger + +# New conditional approach +from utils.logger import get_logger + +class NewComponent: + def __init__(self, logger=None, log_errors_only=False): + self.logger = logger + self.log_errors_only = log_errors_only + + # Add conditional logging helpers + self._setup_conditional_logging() +``` + +#### Gradual Adoption +1. **Phase 1**: Add optional logger parameters to new components +2. **Phase 2**: Update existing components to support conditional logging +3. **Phase 3**: Implement hierarchical logging structure +4. **Phase 4**: Add error-only logging mode + +### Testing Conditional Logging + +#### Test Script Example +```python +# test_conditional_logging.py +from utils.logger import get_logger +from data.collector_manager import CollectorManager +from data.exchanges.okx.collector import OKXCollector + +def test_no_logging(): + """Test components work without loggers.""" + manager = CollectorManager(logger=None) + collector = OKXCollector("BTC-USDT", logger=None) + print("✓ No logging test passed") + +def test_with_logging(): + """Test components work with loggers.""" + logger = get_logger('test_system') + manager = CollectorManager(logger=logger) + collector = OKXCollector("BTC-USDT", logger=logger) + print("✓ With logging test passed") + +def test_error_only(): + """Test error-only logging mode.""" + logger = get_logger('test_errors') + collector = OKXCollector("BTC-USDT", logger=logger, log_errors_only=True) + print("✓ Error-only logging test passed") + +if __name__ == "__main__": + test_no_logging() + test_with_logging() + test_error_only() + print("✅ All conditional logging tests passed!") +``` ## Log Format diff --git a/docs/logging_system.md b/docs/logging_system.md new file mode 100644 index 0000000..7dc0b94 --- /dev/null +++ b/docs/logging_system.md @@ -0,0 +1,292 @@ +# Conditional Logging System + +## Overview + +The TCP Dashboard project implements a sophisticated conditional logging system that provides fine-grained control over logging behavior across all components. This system supports hierarchical logging, conditional logging, and error-only logging modes. + +## Key Features + +### 1. Conditional Logging +- **No Logger**: If no logger instance is passed to a component's constructor, that component performs no logging operations +- **Logger Provided**: If a logger instance is passed, the component uses it for logging +- **Error-Only Mode**: If `log_errors_only=True` is set, only error and critical level messages are logged + +### 2. Logger Inheritance +- Components that receive a logger pass the same logger instance down to child components +- This creates a hierarchical logging structure that follows the component hierarchy + +### 3. Hierarchical File Organization +- Log files are organized based on component hierarchy +- Each major component gets its own log directory +- Child components log to their parent's log file + +## Component Hierarchy + +``` +Top-level Application (individual logger) +├── ProductionManager (individual logger) +│ ├── DataSaver (receives logger from ProductionManager) +│ ├── DataValidator (receives logger from ProductionManager) +│ ├── DatabaseConnection (receives logger from ProductionManager) +│ └── CollectorManager (individual logger) +│ ├── OKX collector BTC-USD (individual logger) +│ │ ├── DataAggregator (receives logger from OKX collector) +│ │ ├── DataTransformer (receives logger from OKX collector) +│ │ └── DataProcessor (receives logger from OKX collector) +│ └── Another collector... +``` + +## Usage Examples + +### Basic Usage + +```python +from utils.logger import get_logger +from data.exchanges.okx.collector import OKXCollector + +# Create a logger for the collector +collector_logger = get_logger('okx_collector_btc_usdt', verbose=True) + +# Create collector with logger - all child components will use this logger +collector = OKXCollector( + symbol='BTC-USDT', + logger=collector_logger +) + +# Child components (data processor, validator, transformer) will automatically +# receive and use the same logger instance +``` + +### No Logging Mode + +```python +# Create collector without logger - no logging will be performed +collector = OKXCollector( + symbol='BTC-USDT', + logger=None # or simply omit the parameter +) + +# No log files will be created, no console output +``` + +### Error-Only Logging Mode + +```python +from utils.logger import get_logger +from data.collector_manager import CollectorManager + +# Create logger for manager +manager_logger = get_logger('collector_manager', verbose=True) + +# Create manager with error-only logging +manager = CollectorManager( + manager_name="production_manager", + logger=manager_logger, + log_errors_only=True # Only errors and critical messages will be logged +) + +# Manager will only log errors, but child collectors can have their own loggers +``` + +### Hierarchical Logging Setup + +```python +from utils.logger import get_logger +from data.collector_manager import CollectorManager +from data.exchanges.okx.collector import OKXCollector + +# Create manager with its own logger +manager_logger = get_logger('collector_manager', verbose=True) +manager = CollectorManager(logger=manager_logger) + +# Create individual collectors with their own loggers +btc_logger = get_logger('okx_collector_btc_usdt', verbose=True) +eth_logger = get_logger('okx_collector_eth_usdt', verbose=True) + +btc_collector = OKXCollector('BTC-USDT', logger=btc_logger) +eth_collector = OKXCollector('ETH-USDT', logger=eth_logger) + +# Add collectors to manager +manager.add_collector(btc_collector) +manager.add_collector(eth_collector) + +# Result: +# - Manager logs to: logs/collector_manager/YYYY-MM-DD.txt +# - BTC collector logs to: logs/okx_collector_btc_usdt/YYYY-MM-DD.txt +# - ETH collector logs to: logs/okx_collector_eth_usdt/YYYY-MM-DD.txt +# - All child components of each collector log to their parent's file +``` + +## Implementation Details + +### Base Classes + +All base classes support conditional logging: + +```python +class BaseDataCollector: + def __init__(self, ..., logger=None, log_errors_only=False): + self.logger = logger + self.log_errors_only = log_errors_only + + def _log_debug(self, message: str) -> None: + if self.logger and not self.log_errors_only: + self.logger.debug(message) + + def _log_error(self, message: str, exc_info: bool = False) -> None: + if self.logger: + self.logger.error(message, exc_info=exc_info) +``` + +### Child Component Pattern + +Child components receive logger from parent: + +```python +class OKXCollector(BaseDataCollector): + def __init__(self, symbol: str, logger=None): + super().__init__(..., logger=logger) + + # Pass logger to child components + self._data_processor = OKXDataProcessor( + symbol, + logger=self.logger # Pass parent's logger + ) +``` + +### Conditional Logging Helpers + +All components use helper methods for conditional logging: + +```python +def _log_debug(self, message: str) -> None: + """Log debug message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.debug(message) + +def _log_info(self, message: str) -> None: + """Log info message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.info(message) + +def _log_warning(self, message: str) -> None: + """Log warning message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.warning(message) + +def _log_error(self, message: str, exc_info: bool = False) -> None: + """Log error message if logger is available (always logs errors).""" + if self.logger: + self.logger.error(message, exc_info=exc_info) + +def _log_critical(self, message: str, exc_info: bool = False) -> None: + """Log critical message if logger is available (always logs critical).""" + if self.logger: + self.logger.critical(message, exc_info=exc_info) +``` + +## Log File Structure + +``` +logs/ +├── collector_manager/ +│ └── 2024-01-15.txt +├── okx_collector_btc_usdt/ +│ └── 2024-01-15.txt +├── okx_collector_eth_usdt/ +│ └── 2024-01-15.txt +└── production_manager/ + └── 2024-01-15.txt +``` + +## Configuration Options + +### Logger Parameters + +- `logger`: Logger instance or None +- `log_errors_only`: Boolean flag for error-only mode +- `verbose`: Console output (when creating new loggers) +- `clean_old_logs`: Automatic cleanup of old log files +- `max_log_files`: Maximum number of log files to keep + +### Environment Variables + +```bash +# Enable verbose console logging +VERBOSE_LOGGING=true + +# Enable console output +LOG_TO_CONSOLE=true +``` + +## Best Practices + +### 1. Component Design +- Always accept `logger=None` parameter in constructors +- Pass logger to all child components +- Use conditional logging helper methods +- Never assume logger is available + +### 2. Error Handling +- Always log errors regardless of `log_errors_only` setting +- Use appropriate log levels +- Include context in error messages + +### 3. Performance +- Conditional logging has minimal performance impact +- Logger checks are fast boolean operations +- No string formatting when logging is disabled + +### 4. Testing +- Test components with and without loggers +- Verify error-only mode works correctly +- Check that child components receive loggers properly + +## Migration Guide + +### Updating Existing Components + +1. **Add logger parameter to constructor**: +```python +def __init__(self, ..., logger=None, log_errors_only=False): +``` + +2. **Add conditional logging helpers**: +```python +def _log_debug(self, message: str) -> None: + if self.logger and not self.log_errors_only: + self.logger.debug(message) +``` + +3. **Update all logging calls**: +```python +# Before +self.logger.info("Message") + +# After +self._log_info("Message") +``` + +4. **Pass logger to child components**: +```python +child = ChildComponent(logger=self.logger) +``` + +### Testing Changes + +```python +# Test without logger +component = MyComponent(logger=None) +# Should work without errors, no logging + +# Test with logger +logger = get_logger('test_component') +component = MyComponent(logger=logger) +# Should log normally + +# Test error-only mode +component = MyComponent(logger=logger, log_errors_only=True) +# Should only log errors +``` + +This conditional logging system provides maximum flexibility while maintaining clean, maintainable code that works in all scenarios. \ No newline at end of file diff --git a/scripts/production_clean.py b/scripts/production_clean.py index 100e450..8415c65 100644 --- a/scripts/production_clean.py +++ b/scripts/production_clean.py @@ -58,11 +58,11 @@ class ProductionManager: self.config_path = config_path self.config = self._load_config() - # Configure clean logging - minimal console output, detailed file logs + # Configure clean logging - minimal console output, error-only file logs self.logger = get_logger("production_manager", verbose=False) - # Core components - self.collector_manager = CollectorManager() + # Core components with error-only logging + self.collector_manager = CollectorManager(logger=self.logger, log_errors_only=True) self.collectors: List[OKXCollector] = [] # Runtime state @@ -73,7 +73,7 @@ class ProductionManager: 'uptime_seconds': 0 } - self.logger.info(f"🚀 Production Manager initialized") + self.logger.info(f"🚀 Production Manager initialized with error-only logging") self.logger.info(f"📁 Config: {config_path}") def _load_config(self) -> dict: @@ -110,21 +110,24 @@ class ProductionManager: auto_save_candles=True ) - # Create custom data processor with 1m/5m timeframes + # Create custom data processor with error-only logging data_processor = OKXDataProcessor( symbol=symbol, config=candle_config, - component_name=f"okx_processor_{symbol.replace('-', '_').lower()}" + component_name=f"okx_processor_{symbol.replace('-', '_').lower()}", + logger=self.logger ) - # Create OKX collector with custom processor + # Create OKX collector with error-only logging collector = OKXCollector( symbol=symbol, data_types=data_types, component_name=f"okx_collector_{symbol.replace('-', '_').lower()}", auto_restart=self.config.get('data_collection', {}).get('auto_restart', True), health_check_interval=self.config.get('data_collection', {}).get('health_check_interval', 30.0), - store_raw_data=self.config.get('data_collection', {}).get('store_raw_data', True) + store_raw_data=self.config.get('data_collection', {}).get('store_raw_data', True), + logger=self.logger, + log_errors_only=True ) # Replace the default data processor with our custom one @@ -139,9 +142,9 @@ class ProductionManager: self.collectors.append(collector) self.statistics['collectors_created'] += 1 - self.logger.info(f"✅ Collector created for {symbol} with 1m/5m timeframes") + self.logger.info(f"✅ Collector created for {symbol} with 1m/5m timeframes and error-only logging") - self.logger.info(f"🎉 All {len(self.collectors)} collectors created successfully") + self.logger.info(f"🎉 All {len(self.collectors)} collectors created successfully with error-only logging") self.logger.info(f"📊 Collectors configured with 1m and 5m aggregation timeframes") return True @@ -191,11 +194,9 @@ class ProductionManager: self.logger.error(f"❌ Error during shutdown: {e}") -async def run_clean_production(duration_hours: float = 8.0): +async def run_clean_production(duration_hours: Optional[float] = None): """Run production collector with clean output.""" - duration_seconds = int(duration_hours * 3600) - # Global state for signal handling shutdown_event = asyncio.Event() manager = None @@ -212,7 +213,10 @@ async def run_clean_production(duration_hours: float = 8.0): # Header print("🚀 OKX PRODUCTION DATA COLLECTOR") print("="*50) - print(f"⏱️ Duration: {duration_hours} hours") + if duration_hours: + print(f"⏱️ Duration: {duration_hours} hours") + else: + print(f"⏱️ Duration: Indefinite (until stopped)") print(f"📊 Timeframes: 1m and 5m candles") print(f"💾 Database: Raw trades + aggregated candles") print(f"📝 Logs: logs/ directory") @@ -238,6 +242,8 @@ async def run_clean_production(duration_hours: float = 8.0): print("✅ Data collection active!") print(f"📈 Collecting: {len(manager.collectors)} trading pairs") print(f"📊 Monitor: python scripts/monitor_clean.py") + if not duration_hours: + print("⏹️ Stop: Ctrl+C") print("-" * 50) # Main monitoring loop @@ -252,17 +258,22 @@ async def run_clean_production(duration_hours: float = 8.0): except asyncio.TimeoutError: pass - # Check duration + # Check duration if specified current_time = time.time() - if current_time - start_time >= duration_seconds: - print(f"⏰ Completed {duration_hours} hour run") - break + if duration_hours: + duration_seconds = int(duration_hours * 3600) + if current_time - start_time >= duration_seconds: + print(f"⏰ Completed {duration_hours} hour run") + break # Periodic status update if current_time - last_update >= update_interval: elapsed_hours = (current_time - start_time) / 3600 - remaining_hours = duration_hours - elapsed_hours - print(f"⏱️ Runtime: {elapsed_hours:.1f}h | Remaining: {remaining_hours:.1f}h") + if duration_hours: + remaining_hours = duration_hours - elapsed_hours + print(f"⏱️ Runtime: {elapsed_hours:.1f}h | Remaining: {remaining_hours:.1f}h") + else: + print(f"⏱️ Runtime: {elapsed_hours:.1f}h | Mode: Continuous") last_update = current_time # Final summary @@ -292,6 +303,9 @@ def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: + # Run indefinitely (until stopped with Ctrl+C) + python scripts/production_clean.py + # Run for 8 hours python scripts/production_clean.py --hours 8 @@ -303,13 +317,13 @@ Examples: parser.add_argument( '--hours', type=float, - default=8.0, - help='Collection duration in hours (default: 8.0)' + default=None, + help='Collection duration in hours (default: indefinite until stopped manually)' ) args = parser.parse_args() - if args.hours <= 0: + if args.hours is not None and args.hours <= 0: print("❌ Duration must be positive") sys.exit(1)