TCPDashboard/tests/data/collector/test_collector_state_telemetry.py
Vasily.onl 60434afd5d Refactor BaseDataCollector to utilize CollectorStateAndTelemetry for improved state management
- Introduced a new `CollectorStateAndTelemetry` class to encapsulate the status, health checks, and statistics of the data collector, promoting modularity and separation of concerns.
- Updated `BaseDataCollector` to replace direct status management with calls to the new telemetry class, enhancing maintainability and readability.
- Refactored logging methods to utilize the telemetry class, ensuring consistent logging practices.
- Modified the `OKXCollector` to integrate with the new telemetry system for improved status reporting and error handling.
- Added comprehensive tests for the `CollectorStateAndTelemetry` class to ensure functionality and reliability.

These changes streamline the data collector's architecture, aligning with project standards for maintainability and performance.
2025-06-09 17:27:29 +08:00

185 lines
7.9 KiB
Python

import pytest
import asyncio
from datetime import datetime, timedelta, timezone
from unittest.mock import Mock
from data.collector.collector_state_telemetry import CollectorStatus, CollectorStateAndTelemetry
class TestCollectorStateAndTelemetry:
@pytest.fixture
def mock_logger(self):
return Mock()
@pytest.fixture
def telemetry(self, mock_logger):
return CollectorStateAndTelemetry(
exchange_name="test_exchange",
component_name="test_collector",
health_check_interval=30.0,
max_silence_duration=timedelta(minutes=5),
logger=mock_logger,
log_errors_only=False
)
def test_initial_state(self, telemetry):
assert telemetry.status == CollectorStatus.STOPPED
assert telemetry._running is False
assert telemetry._should_be_running is False
assert telemetry._stats['messages_received'] == 0
assert telemetry._stats['errors'] == 0
assert telemetry._last_heartbeat is not None
assert telemetry._last_data_received is None
def test_update_status(self, telemetry):
telemetry.update_status(CollectorStatus.RUNNING)
assert telemetry.status == CollectorStatus.RUNNING
telemetry.logger.debug.assert_called_with("Collector status updated to: running")
def test_set_running_state(self, telemetry):
telemetry.set_running_state(True)
assert telemetry._running is True
telemetry.logger.debug.assert_called_with("Collector internal running state set to: True")
def test_set_should_be_running(self, telemetry):
telemetry.set_should_be_running(True)
assert telemetry._should_be_running is True
telemetry.logger.debug.assert_called_with("Collector desired running state set to: True")
def test_update_heartbeat(self, telemetry):
old_heartbeat = telemetry._last_heartbeat
telemetry.update_heartbeat()
assert telemetry._last_heartbeat >= old_heartbeat
assert telemetry._last_heartbeat.date() == datetime.now(timezone.utc).date()
telemetry.logger.debug.assert_called_with("Heartbeat updated")
def test_update_data_received_timestamp(self, telemetry):
old_timestamp = telemetry._last_data_received
telemetry.update_data_received_timestamp()
assert telemetry._last_data_received is not None
assert telemetry._last_data_received >= (old_timestamp if old_timestamp else datetime.min.replace(tzinfo=timezone.utc))
telemetry.logger.debug.assert_called_with("Last data received timestamp updated")
def test_increment_messages_received(self, telemetry):
telemetry.increment_messages_received()
assert telemetry._stats['messages_received'] == 1
telemetry.logger.debug.assert_called_with("Messages received: 1")
def test_increment_messages_processed(self, telemetry):
telemetry.increment_messages_processed()
assert telemetry._stats['messages_processed'] == 1
telemetry.logger.debug.assert_called_with("Messages processed: 1")
def test_increment_errors(self, telemetry):
error_msg = "Test Error"
telemetry.increment_errors(error_msg)
assert telemetry._stats['errors'] == 1
assert telemetry._stats['last_error'] == error_msg
telemetry.logger.error.assert_called_with(f"Error count: 1, Last error: {error_msg}", exc_info=False)
def test_increment_restarts(self, telemetry):
telemetry.increment_restarts()
assert telemetry._stats['restarts'] == 1
assert telemetry._stats['last_restart_time'] is not None
telemetry.logger.info.assert_called_with("Collector restarts: 1")
def test_set_connection_uptime_start(self, telemetry):
telemetry.set_connection_uptime_start()
assert telemetry._stats['connection_uptime'] is not None
telemetry.logger.debug.assert_called_with("Connection uptime start set")
def test_get_status(self, telemetry):
telemetry.set_connection_uptime_start()
telemetry.update_heartbeat()
telemetry.update_data_received_timestamp()
telemetry.update_status(CollectorStatus.RUNNING)
telemetry.set_should_be_running(True)
telemetry.set_running_state(True)
status = telemetry.get_status()
assert status['exchange'] == "test_exchange"
assert status['status'] == "running"
assert status['should_be_running'] is True
assert status['auto_restart'] is True
assert status['health']['time_since_heartbeat'] is not None
assert status['health']['time_since_data'] is not None
assert status['statistics']['uptime_seconds'] is not None
assert status['statistics']['reconnect_attempts'] == 0
def test_get_health_status_healthy(self, telemetry):
telemetry.set_running_state(True)
telemetry.set_should_be_running(True)
telemetry.update_status(CollectorStatus.RUNNING)
telemetry.update_heartbeat()
telemetry.update_data_received_timestamp()
health = telemetry.get_health_status()
assert health['is_healthy'] is True
assert len(health['issues']) == 0
assert health['status'] == "running"
@pytest.mark.asyncio
async def test_get_health_status_unhealthy_no_heartbeat(self, telemetry):
telemetry.set_running_state(True)
telemetry.set_should_be_running(True)
telemetry.update_status(CollectorStatus.RUNNING)
# Simulate no heartbeat for a long time
telemetry._last_heartbeat = datetime.now(timezone.utc) - timedelta(seconds=telemetry.health_check_interval * 3)
health = telemetry.get_health_status()
assert health['is_healthy'] is False
assert "No heartbeat for" in health['issues'][0]
@pytest.mark.asyncio
async def test_get_health_status_unhealthy_no_data(self, telemetry):
telemetry.set_running_state(True)
telemetry.set_should_be_running(True)
telemetry.update_status(CollectorStatus.RUNNING)
telemetry.update_heartbeat()
# Simulate no data for a long time
telemetry._last_data_received = datetime.now(timezone.utc) - (telemetry._max_silence_duration + timedelta(minutes=1))
health = telemetry.get_health_status()
assert health['is_healthy'] is False
assert "No data for" in health['issues'][0]
def test_get_health_status_error_status(self, telemetry):
telemetry.update_status(CollectorStatus.ERROR)
health = telemetry.get_health_status()
assert health['is_healthy'] is False
assert health['issues'][0] == "Status: error"
def test_logging_methods_no_logger(self):
telemetry_no_logger = CollectorStateAndTelemetry(
exchange_name="test",
component_name="test",
logger=None
)
# Should not raise an error, calls should just be no-ops
telemetry_no_logger._log_debug("test")
telemetry_no_logger._log_info("test")
telemetry_no_logger._log_warning("test")
telemetry_no_logger._log_error("test")
telemetry_no_logger._log_critical("test")
def test_logging_methods_log_errors_only(self, mock_logger):
telemetry_errors_only = CollectorStateAndTelemetry(
exchange_name="test",
component_name="test",
logger=mock_logger,
log_errors_only=True
)
telemetry_errors_only._log_debug("debug msg")
telemetry_errors_only._log_info("info msg")
telemetry_errors_only._log_warning("warning msg")
mock_logger.debug.assert_not_called()
mock_logger.info.assert_not_called()
mock_logger.warning.assert_not_called()
telemetry_errors_only._log_error("error msg")
telemetry_errors_only._log_critical("critical msg")
mock_logger.error.assert_called_once_with("error msg", exc_info=False)
mock_logger.critical.assert_called_once_with("critical msg", exc_info=False)