215 lines
8.4 KiB
Markdown
215 lines
8.4 KiB
Markdown
# Enhanced Data Collector System
|
|
|
|
This documentation describes the enhanced data collector system, featuring a modular architecture, centralized management, and robust health monitoring.
|
|
|
|
## Table of Contents
|
|
|
|
- [Overview](#overview)
|
|
- [System Architecture](#system-architecture)
|
|
- [Core Components](#core-components)
|
|
- [Exchange Factory](#exchange-factory)
|
|
- [Health Monitoring](#health-monitoring)
|
|
- [API Reference](#api-reference)
|
|
- [Troubleshooting](#troubleshooting)
|
|
|
|
## Overview
|
|
|
|
### Key Features
|
|
|
|
- **Modular Exchange Integration**: Easily add new exchanges without impacting core logic
|
|
- **Centralized Management**: `CollectorManager` for system-wide control
|
|
- **Robust Health Monitoring**: Automatic restarts and failure detection
|
|
- **Factory Pattern**: Standardized creation of collector instances
|
|
- **Asynchronous Operations**: High-performance data collection
|
|
- **Comprehensive Logging**: Detailed component-level logging
|
|
|
|
### Supported Exchanges
|
|
|
|
- **OKX**: Full implementation with WebSocket support
|
|
- **Binance (Future)**: Planned support
|
|
- **Coinbase (Future)**: Planned support
|
|
|
|
For exchange-specific documentation, see [Exchange Implementations (`./exchanges/`)](./exchanges/).
|
|
|
|
## System Architecture
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ TCP Dashboard Platform │
|
|
│ │
|
|
│ ┌─────────────────────────────────────────────────────┐ │
|
|
│ │ CollectorManager │ │
|
|
│ │ • Centralized start/stop/status control │ │
|
|
│ │ │ │
|
|
│ │ ┌─────────────────────────────────────────────────┐│ │
|
|
│ │ │ Global Health Monitor ││ │
|
|
│ │ │ • System-wide health checks ││ │
|
|
│ │ │ • Auto-restart coordination ││ │
|
|
│ │ │ • Performance analytics ││ │
|
|
│ │ └─────────────────────────────────────────────────┘│ │
|
|
│ │ │ │ │
|
|
│ │ ┌─────────────┐ ┌─────────────┐ ┌────────────────┐ │ │
|
|
│ │ │OKX Collector│ │Binance Coll.│ │Custom Collector│ │ │
|
|
│ │ │• Health Mon │ │• Health Mon │ │• Health Monitor│ │ │
|
|
│ │ │• Auto-restart│ │• Auto-restart│ │• Auto-restart │ │ │
|
|
│ │ │• Data Valid │ │• Data Valid │ │• Data Validate │ │ │
|
|
│ │ └─────────────┘ └─────────────┘ └────────────────┘ │ │
|
|
│ └─────────────────────────────────────────────────────┘ │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
## Core Components
|
|
|
|
### 1. `BaseDataCollector`
|
|
|
|
An abstract base class that defines the common interface for all exchange collectors.
|
|
|
|
**Key Responsibilities:**
|
|
- Standardized `start`, `stop`, `restart` methods
|
|
- Built-in health monitoring with heartbeat and data silence detection
|
|
- Automatic reconnect and restart logic
|
|
- Asynchronous message handling
|
|
|
|
### 2. `CollectorManager`
|
|
|
|
A singleton class that manages all active data collectors in the system.
|
|
|
|
**Key Responsibilities:**
|
|
- Centralized `start` and `stop` for all collectors
|
|
- System-wide status aggregation
|
|
- Global health monitoring
|
|
- Coordination of restart policies
|
|
|
|
### 3. Exchange-Specific Collectors
|
|
|
|
Concrete implementations of `BaseDataCollector` for each exchange (e.g., `OKXCollector`).
|
|
|
|
**Key Responsibilities:**
|
|
- Handle exchange-specific WebSocket protocols
|
|
- Parse and standardize incoming data
|
|
- Implement exchange-specific authentication
|
|
- Define subscription messages for different data types
|
|
|
|
For more details, see [OKX Collector Documentation (`./exchanges/okx.md`)](./exchanges/okx.md).
|
|
|
|
## Exchange Factory
|
|
|
|
The `ExchangeFactory` provides a standardized way to create data collectors, decoupling the client code from specific implementations.
|
|
|
|
### Features
|
|
|
|
- **Simplified Creation**: Single function to create any supported collector
|
|
- **Configuration Driven**: Uses `ExchangeCollectorConfig` for flexible setup
|
|
- **Validation**: Validates configuration before creating a collector
|
|
- **Extensible**: Easily register new exchange collectors
|
|
|
|
### Usage
|
|
|
|
```python
|
|
from data.exchanges import ExchangeFactory, ExchangeCollectorConfig
|
|
from data.common import DataType
|
|
|
|
# Create config for OKX collector
|
|
config = ExchangeCollectorConfig(
|
|
exchange="okx",
|
|
symbol="BTC-USDT",
|
|
data_types=[DataType.TRADE, DataType.ORDERBOOK],
|
|
auto_restart=True
|
|
)
|
|
|
|
# Create collector using the factory
|
|
try:
|
|
collector = ExchangeFactory.create_collector(config)
|
|
# Use the collector
|
|
await collector.start()
|
|
except ValueError as e:
|
|
print(f"Error creating collector: {e}")
|
|
|
|
# Create multiple collectors
|
|
configs = [...]
|
|
collectors = ExchangeFactory.create_multiple_collectors(configs)
|
|
```
|
|
|
|
## Health Monitoring
|
|
|
|
The system includes a robust, two-level health monitoring system.
|
|
|
|
### 1. Collector-Level Monitoring
|
|
|
|
Each `BaseDataCollector` instance has its own health monitoring.
|
|
|
|
**Key Metrics:**
|
|
- **Heartbeat**: Regular internal signal to confirm the collector is responsive
|
|
- **Data Silence**: Tracks time since last message to detect frozen connections
|
|
- **Restart Count**: Number of automatic restarts
|
|
- **Connection Status**: Tracks WebSocket connection state
|
|
|
|
### 2. Manager-Level Monitoring
|
|
|
|
The `CollectorManager` provides a global view of system health.
|
|
|
|
**Key Metrics:**
|
|
- **Aggregate Status**: Overview of all collectors (running, stopped, failed)
|
|
- **System Uptime**: Total uptime for the collector system
|
|
- **Failed Collectors**: List of collectors that failed to restart
|
|
- **Resource Usage**: (Future) System-level CPU and memory monitoring
|
|
|
|
### Health Status API
|
|
|
|
```python
|
|
# Get status of a single collector
|
|
status = collector.get_status()
|
|
health = collector.get_health_status()
|
|
|
|
# Get status of the entire system
|
|
system_status = manager.get_status()
|
|
```
|
|
|
|
For detailed status schemas, refer to the [Reference Documentation (`../../reference/README.md`)](../../reference/README.md).
|
|
|
|
## API Reference
|
|
|
|
### `BaseDataCollector`
|
|
- `async start()`
|
|
- `async stop()`
|
|
- `async restart()`
|
|
- `get_status() -> dict`
|
|
- `get_health_status() -> dict`
|
|
|
|
### `CollectorManager`
|
|
- `add_collector(collector)`
|
|
- `async start_all()`
|
|
- `async stop_all()`
|
|
- `get_status() -> dict`
|
|
- `list_collectors() -> list`
|
|
|
|
### `ExchangeFactory`
|
|
- `create_collector(config) -> BaseDataCollector`
|
|
- `create_multiple_collectors(configs) -> list`
|
|
- `get_supported_exchanges() -> list`
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
1. **Collector fails to start**
|
|
- **Cause**: Invalid symbol, incorrect API keys, or network issues.
|
|
- **Solution**: Check logs for error messages. Verify configuration and network connectivity.
|
|
|
|
2. **Collector stops receiving data**
|
|
- **Cause**: WebSocket connection dropped, exchange issues.
|
|
- **Solution**: Health monitor should automatically restart. If not, check logs for reconnect errors.
|
|
|
|
3. **"Exchange not supported" error**
|
|
- **Cause**: Trying to create a collector for an exchange not registered in the factory.
|
|
- **Solution**: Implement the collector and register it in `data/exchanges/__init__.py`.
|
|
|
|
### Best Practices
|
|
|
|
- Use the `CollectorManager` for lifecycle management.
|
|
- Always validate configurations before creating collectors.
|
|
- Monitor system status regularly using `manager.get_status()`.
|
|
- Refer to logs for detailed error analysis.
|
|
|
|
---
|
|
*Back to [Modules Documentation (`../README.md`)]* |