diff --git a/data/exchanges/okx/websocket.py b/data/exchanges/okx/websocket.py index c10e8b0..21eb7e1 100644 --- a/data/exchanges/okx/websocket.py +++ b/data/exchanges/okx/websocket.py @@ -546,34 +546,34 @@ class OKXWebSocketClient: # Use lock to prevent concurrent reconnection attempts try: - # Use asyncio.wait_for to prevent hanging on lock acquisition - async with asyncio.wait_for(self._reconnection_lock.acquire(), timeout=5.0): - try: - # Double-check we still need to reconnect - if (self._connection_state == ConnectionState.DISCONNECTED and - self._reconnect_attempts < self.max_reconnect_attempts and - not self._tasks_stopping): - - self._reconnect_attempts += 1 + # Properly acquire lock with timeout + await asyncio.wait_for(self._reconnection_lock.acquire(), timeout=5.0) + try: + # Double-check we still need to reconnect + if (self._connection_state == ConnectionState.DISCONNECTED and + self._reconnect_attempts < self.max_reconnect_attempts and + not self._tasks_stopping): + + self._reconnect_attempts += 1 + if self.logger: + self.logger.info(f"{self.component_name}: Attempting automatic reconnection ({self._reconnect_attempts}/{self.max_reconnect_attempts})") + + # Attempt reconnection (this will handle task cleanup) + if await self.reconnect(): if self.logger: - self.logger.info(f"{self.component_name}: Attempting automatic reconnection ({self._reconnect_attempts}/{self.max_reconnect_attempts})") - - # Attempt reconnection (this will handle task cleanup) - if await self.reconnect(): - if self.logger: - self.logger.info(f"{self.component_name}: Automatic reconnection successful") - # Exit this handler as reconnect will start new tasks - break - else: - if self.logger: - self.logger.error(f"{self.component_name}: Automatic reconnection failed") - break + self.logger.info(f"{self.component_name}: Automatic reconnection successful") + # Exit this handler as reconnect will start new tasks + break else: if self.logger: - self.logger.error(f"{self.component_name}: Max reconnection attempts exceeded or shutdown in progress") + self.logger.error(f"{self.component_name}: Automatic reconnection failed") break - finally: - self._reconnection_lock.release() + else: + if self.logger: + self.logger.error(f"{self.component_name}: Max reconnection attempts exceeded or shutdown in progress") + break + finally: + self._reconnection_lock.release() except asyncio.TimeoutError: if self.logger: self.logger.warning(f"{self.component_name}: Timeout acquiring reconnection lock")