TCPDashboard/data/collector/collector_connection_manager.py

149 lines
6.3 KiB
Python
Raw Normal View History

"""
Module for managing network connection and reconnection logic for data collectors.
This module encapsulates the complexities of connecting, disconnecting,
and handling reconnection attempts to a data source, promoting a clean
separation of concerns within the data collector architecture.
"""
import asyncio
from typing import List, Any
from datetime import datetime
# from ..base_collector import DataType # Import from base_collector for now, will refactor later
from .collector_state_telemetry import CollectorStatus, CollectorStateAndTelemetry
from data.common.data_types import DataType
class ConnectionManager:
"""
Manages the connection, disconnection, and reconnection logic for a data collector.
"""
def __init__(self,
exchange_name: str,
component_name: str,
max_reconnect_attempts: int = 5,
reconnect_delay: float = 5.0,
logger=None,
state_telemetry: CollectorStateAndTelemetry = None):
self.exchange_name = exchange_name
self.component_name = component_name
self._max_reconnect_attempts = max_reconnect_attempts
self._reconnect_delay = reconnect_delay
self.logger = logger
self._state_telemetry = state_telemetry
self._connection = None # Placeholder for the actual connection object
self._reconnect_attempts = 0
def _log_debug(self, message: str) -> None:
if self._state_telemetry:
self._state_telemetry._log_debug(f"{self.component_name}: {message}")
elif self.logger:
self.logger.debug(f"{self.component_name}: {message}")
def _log_info(self, message: str) -> None:
if self._state_telemetry:
self._state_telemetry._log_info(f"{self.component_name}: {message}")
elif self.logger:
self.logger.info(f"{self.component_name}: {message}")
def _log_warning(self, message: str) -> None:
if self._state_telemetry:
self._state_telemetry._log_warning(f"{self.component_name}: {message}")
elif self.logger:
self.logger.warning(f"{self.component_name}: {message}")
def _log_error(self, message: str, exc_info: bool = False) -> None:
if self._state_telemetry:
self._state_telemetry._log_error(f"{self.component_name}: {message}", exc_info=exc_info)
elif self.logger:
self.logger.error(f"{self.component_name}: {message}", exc_info=exc_info)
async def connect(self, connect_logic: callable) -> bool:
"""
Establish connection to the data source using provided logic.
Args:
connect_logic: A callable (async function) that performs the actual connection.
Returns:
True if connection successful, False otherwise
"""
self._log_info(f"Connecting to {self.exchange_name} data source")
try:
success = await connect_logic()
if success:
self._connection = True # Indicate connection is established
self._state_telemetry.set_connection_uptime_start()
self._log_info(f"Successfully connected to {self.exchange_name}")
return True
else:
self._log_error(f"Failed to connect to {self.exchange_name}")
return False
except Exception as e:
self._log_error(f"Error during connection to {self.exchange_name}: {e}", exc_info=True)
return False
async def disconnect(self, disconnect_logic: callable) -> None:
"""
Disconnect from the data source using provided logic.
Args:
disconnect_logic: A callable (async function) that performs the actual disconnection.
"""
self._log_info(f"Disconnecting from {self.exchange_name} data source")
try:
if self._connection:
await disconnect_logic()
self._connection = None
self._log_info(f"Disconnected from {self.exchange_name}")
except Exception as e:
self._log_error(f"Error during disconnection from {self.exchange_name}: {e}", exc_info=True)
async def handle_connection_error(self, connect_logic: callable, subscribe_logic: callable, symbols: List[str], data_types: List[DataType]) -> bool:
"""
Handle connection errors and attempt reconnection.
Args:
connect_logic: Callable for connecting.
subscribe_logic: Callable for subscribing.
symbols: List of symbols to re-subscribe to.
data_types: List of data types to re-subscribe to.
Returns:
True if reconnection successful, False if max attempts exceeded
"""
self._reconnect_attempts += 1
if self._reconnect_attempts > self._max_reconnect_attempts:
self._log_error(f"Max reconnection attempts ({self._max_reconnect_attempts}) exceeded for {self.exchange_name}")
if self._state_telemetry:
self._state_telemetry.update_status(CollectorStatus.ERROR)
self._state_telemetry.set_should_be_running(False)
return False
if self._state_telemetry:
self._state_telemetry.update_status(CollectorStatus.RECONNECTING)
self._log_warning(f"Connection lost. Attempting reconnection {self._reconnect_attempts}/{self._max_reconnect_attempts} for {self.exchange_name}")
# Disconnect and wait before retrying
await self.disconnect(lambda: None) # Pass a no-op disconnect for internal use, actual disconnect handled by caller
await asyncio.sleep(self._reconnect_delay)
# Attempt to reconnect
try:
if await self.connect(connect_logic):
if await subscribe_logic(symbols, data_types):
self._log_info(f"Reconnection successful for {self.exchange_name}")
if self._state_telemetry:
self._state_telemetry.update_status(CollectorStatus.RUNNING)
self._reconnect_attempts = 0
return True
except Exception as e:
self._log_error(f"Reconnection attempt failed for {self.exchange_name}: {e}", exc_info=True)
return False