# Enhanced Data Collector System This documentation describes the enhanced data collector system, featuring a modular architecture, centralized management, and robust health monitoring. ## Table of Contents - [Overview](#overview) - [System Architecture](#system-architecture) - [Core Components](#core-components) - [Exchange Factory](#exchange-factory) - [Health Monitoring](#health-monitoring) - [API Reference](#api-reference) - [Troubleshooting](#troubleshooting) ## Overview ### Key Features - **Modular Exchange Integration**: Easily add new exchanges without impacting core logic - **Centralized Management**: `CollectorManager` for system-wide control - **Robust Health Monitoring**: Automatic restarts and failure detection - **Factory Pattern**: Standardized creation of collector instances - **Asynchronous Operations**: High-performance data collection - **Comprehensive Logging**: Detailed component-level logging ### Supported Exchanges - **OKX**: Full implementation with WebSocket support - **Binance (Future)**: Planned support - **Coinbase (Future)**: Planned support For exchange-specific documentation, see [Exchange Implementations (`./exchanges/`)](./exchanges/). ## System Architecture ``` ┌─────────────────────────────────────────────────────────────┐ │ TCP Dashboard Platform │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ CollectorManager │ │ │ │ • Centralized start/stop/status control │ │ │ │ │ │ │ │ ┌─────────────────────────────────────────────────┐│ │ │ │ │ Global Health Monitor ││ │ │ │ │ • System-wide health checks ││ │ │ │ │ • Auto-restart coordination ││ │ │ │ │ • Performance analytics ││ │ │ │ └─────────────────────────────────────────────────┘│ │ │ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌────────────────┐ │ │ │ │ │OKX Collector│ │Binance Coll.│ │Custom Collector│ │ │ │ │ │• Health Mon │ │• Health Mon │ │• Health Monitor│ │ │ │ │ │• Auto-restart│ │• Auto-restart│ │• Auto-restart │ │ │ │ │ │• Data Valid │ │• Data Valid │ │• Data Validate │ │ │ │ │ └─────────────┘ └─────────────┘ └────────────────┘ │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` ## Core Components ### 1. `BaseDataCollector` An abstract base class that defines the common interface for all exchange collectors. It now orchestrates specialized components for connection management, state and telemetry, and callback dispatching. **Key Responsibilities:** - Standardized `start`, `stop`, `restart` methods. - Orchestrates connection handling via `ConnectionManager`. - Delegates state, health, and statistics management to `CollectorStateAndTelemetry`. - Utilizes `CallbackDispatcher` for managing and notifying data subscribers. - Defines abstract methods for exchange-specific implementations (e.g., `_actual_connect`, `_actual_disconnect`, `_subscribe_channels`, `_process_message`). ### 2. `CollectorStateAndTelemetry` Manages the operational state, health, and performance statistics of a data collector. **Key Responsibilities:** - Tracks `CollectorStatus` (e.g., `RUNNING`, `STOPPED`, `ERROR`). - Monitors health metrics like heartbeat and data silence. - Collects and provides operational statistics (e.g., messages processed, errors). - Provides centralized logging functionality for the collector. ### 3. `ConnectionManager` Handles the WebSocket connection lifecycle and resilience for a data collector. **Key Responsibilities:** - Establishes and terminates WebSocket connections. - Manages automatic reconnection attempts with exponential backoff. - Handles connection-related errors and ensures robust connectivity. - Tracks WebSocket connection state and statistics. ### 4. `CallbackDispatcher` Manages and dispatches real-time data to registered callbacks. **Key Responsibilities:** - Registers and unregisters data callbacks for different `DataType`s. - Notifies all subscribed listeners when new data points are received. - Ensures efficient and reliable distribution of processed market data. ### 5. `CollectorLifecycleManager` Manages the lifecycle of individual data collectors, including adding, removing, starting, stopping, and restarting them. **Key Responsibilities:** - Handles `add_collector`, `remove_collector`, `enable_collector`, `disable_collector`. - Manages `_start_collector`, `restart_collector`, `restart_all_collectors` operations. ### 6. `ManagerHealthMonitor` Encapsulates the logic for global system health monitoring. **Key Responsibilities:** - Implements the `_global_health_monitor` logic. - Provides system-wide health checks and auto-restart coordination. ### 7. `ManagerStatsTracker` Manages the collection and retrieval of performance statistics for the `CollectorManager`. **Key Responsibilities:** - Updates and provides statistics via `get_status`. - Utilizes `CachedStatsManager` for optimized, periodic updates of statistics. ### 8. `ManagerLogger` Centralizes all logging operations for the `CollectorManager`. **Key Responsibilities:** - Provides wrapper methods for logging at different levels (`_log_debug`, `_log_info`, `_log_warning`, `_log_error`, `_log_critical`). - Ensures consistent log formatting and includes `exc_info=True` for error logs. ### 9. `CollectorManager` A singleton class that orchestrates all active data collectors and their associated components. It now delegates specific responsibilities to dedicated component classes. **Key Responsibilities:** - Centralized control and coordination of `CollectorLifecycleManager`. - Aggregation of system-wide status from `ManagerHealthMonitor` and `ManagerStatsTracker`. - Unified logging through `ManagerLogger`. - Overall system-wide status aggregation. ### 10. Exchange-Specific Collectors Concrete implementations of `BaseDataCollector` for each exchange (e.g., `OKXCollector`). **Key Responsibilities:** - Handle exchange-specific WebSocket protocols - Parse and standardize incoming data - Implement exchange-specific authentication - Define subscription messages for different data types For more details, see [OKX Collector Documentation (`./exchanges/okx.md`)](./exchanges/okx.md). ### 11. `ServiceConfig` Handles the loading, creation, and validation of service configurations. **Key Responsibilities:** - Manages `_load_config` and `_create_default_config` logic. - Implements schema validation for configuration files and file permission validation. ### 12. `CollectorFactory` Encapsulates the logic for creating individual data collector instances. **Key Responsibilities:** - Manages the `_create_collector` logic, decoupling collector creation from the `DataCollectionService`. ### 13. `AsyncTaskManager` Provides a comprehensive utility for managing and tracking asynchronous tasks. **Key Responsibilities:** - Manages `asyncio.Task` instances, preventing potential memory leaks and ensuring proper task lifecycle. - Used by `CollectorManager` and `DataCollectionService` for robust asynchronous operations. ## Exchange Factory The `ExchangeFactory` provides a standardized way to create data collectors, decoupling the client code from specific implementations. ### Features - **Simplified Creation**: Single function to create any supported collector - **Configuration Driven**: Uses `ExchangeCollectorConfig` for flexible setup - **Validation**: Validates configuration before creating a collector - **Extensible**: Easily register new exchange collectors ### Usage ```python from data.exchanges import ExchangeFactory, ExchangeCollectorConfig from data.common import DataType # Create config for OKX collector config = ExchangeCollectorConfig( exchange="okx", symbol="BTC-USDT", data_types=[DataType.TRADE, DataType.ORDERBOOK], auto_restart=True ) # Create collector using the factory try: collector = ExchangeFactory.create_collector(config) # Use the collector await collector.start() except ValueError as e: print(f"Error creating collector: {e}") # Create multiple collectors configs = [...] collectors = ExchangeFactory.create_multiple_collectors(configs) ``` ## Health Monitoring The system includes a robust, two-level health monitoring system, now enhanced with cached statistics management. ### 1. Collector-Level Monitoring Each `BaseDataCollector` instance has its own health monitoring. **Key Metrics:** - **Heartbeat**: Regular internal signal to confirm the collector is responsive - **Data Silence**: Tracks time since last message to detect frozen connections - **Restart Count**: Number of automatic restarts - **Connection Status**: Tracks WebSocket connection state ### 2. Manager-Level Monitoring The `CollectorManager` provides a global view of system health, leveraging `ManagerHealthMonitor` and `ManagerStatsTracker`. **Key Metrics:** - **Aggregate Status**: Overview of all collectors (running, stopped, failed) - **System Uptime**: Total uptime for the collector system - **Failed Collectors**: List of collectors that failed to restart - **Resource Usage**: System-level CPU and memory monitoring ### Health Status API ```python # Get status of a single collector status = collector.get_status() health = collector.get_health_status() # Get status of the entire system system_status = manager.get_status() ``` For detailed status schemas, refer to the [Reference Documentation (`../../reference/README.md`)](../../reference/README.md). ## API Reference ### `BaseDataCollector` - `async start()` - `async stop()` - `async restart()` - `get_status() -> dict` - `get_health_status() -> dict` ### `CollectorManager` - `add_collector(collector)` - `remove_collector(collector_id)` - `enable_collector(collector_id)` - `disable_collector(collector_id)` - `restart_collector(collector_id)` - `async start_all()` - `async stop_all()` - `get_status() -> dict` - `list_collectors() -> list` ### `DataCollectionService` - `async run()` - `async stop()` ### `ExchangeFactory` - `create_collector(config) -> BaseDataCollector` - `create_multiple_collectors(configs) -> list` - `get_supported_exchanges() -> list` ### `CollectorLifecycleManager` - `add_collector(collector)` - `remove_collector(collector_id)` - `enable_collector(collector_id)` - `disable_collector(collector_id)` - `_start_collector(collector)` - `restart_collector(collector_id)` - `restart_all_collectors()` ### `ManagerHealthMonitor` - `_global_health_monitor()` ### `ManagerStatsTracker` - `get_status() -> dict` - `_update_stats()` ### `ManagerLogger` - `_log_debug(message, exc_info=False)` - `_log_info(message, exc_info=False)` - `_log_warning(message, exc_info=False)` - `_log_error(message, exc_info=True)` - `_log_critical(message, exc_info=True)` ### `ServiceConfig` - `_load_config(config_path)` - `_create_default_config()` - `validate_permissions(file_path)` ### `CollectorFactory` - `_create_collector(config)` ### `AsyncTaskManager` - `add_task(task)` - `remove_task(task_id)` - `cancel_all_tasks()` - `wait_for_all_tasks()` ## Troubleshooting ### Common Issues 1. **Collector fails to start** - **Cause**: Invalid symbol, incorrect API keys, or network issues. - **Solution**: Check logs for error messages. Verify configuration and network connectivity. 2. **Collector stops receiving data** - **Cause**: WebSocket connection dropped, exchange issues. - **Solution**: Health monitor should automatically restart. If not, check logs for reconnect errors. 3. **"Exchange not supported" error** - **Cause**: Trying to create a collector for an exchange not registered in the factory. - **Solution**: Implement the collector and register it in `data/exchanges/__init__.py`. 4. **Error information leakage in logs/responses** - **Cause**: Raw exception details being exposed. - **Solution**: Ensure error messages are sanitized using `_sanitize_error` before logging or returning to external calls. 5. **Configuration file permission issues** - **Cause**: Improper file permissions preventing the service from reading configuration. - **Solution**: Verify file permissions for configuration files. The `ServiceConfig` now includes validation for this. ### Best Practices - Use the `CollectorManager` for lifecycle management, delegating to its components. - Always validate configurations before creating collectors, leveraging `ServiceConfig`. - Monitor system status regularly using `manager.get_status()`. - Refer to logs for detailed error analysis, paying attention to `exc_info=True` for critical errors. - Ensure `AsyncTaskManager` is used for all long-running asynchronous operations to prevent resource leaks. --- *Back to [Modules Documentation (`../README.md`)]*