TCPDashboard/docs/modules/data_collectors.md
2025-06-06 20:33:29 +08:00

8.4 KiB

Enhanced Data Collector System

This documentation describes the enhanced data collector system, featuring a modular architecture, centralized management, and robust health monitoring.

Table of Contents

Overview

Key Features

  • Modular Exchange Integration: Easily add new exchanges without impacting core logic
  • Centralized Management: CollectorManager for system-wide control
  • Robust Health Monitoring: Automatic restarts and failure detection
  • Factory Pattern: Standardized creation of collector instances
  • Asynchronous Operations: High-performance data collection
  • Comprehensive Logging: Detailed component-level logging

Supported Exchanges

  • OKX: Full implementation with WebSocket support
  • Binance (Future): Planned support
  • Coinbase (Future): Planned support

For exchange-specific documentation, see Exchange Implementations (./exchanges/).

System Architecture

┌─────────────────────────────────────────────────────────────┐
│                   TCP Dashboard Platform                    │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              CollectorManager                       │    │
│  │  • Centralized start/stop/status control           │    │
│  │                                                     │    │
│  │  ┌─────────────────────────────────────────────────┐│    │
│  │  │            Global Health Monitor               ││    │
│  │  │  • System-wide health checks                   ││    │
│  │  │  • Auto-restart coordination                   ││    │
│  │  │  • Performance analytics                       ││    │
│  │  └─────────────────────────────────────────────────┘│    │
│  │                         │                           │    │
│  │ ┌─────────────┐ ┌─────────────┐ ┌────────────────┐  │    │
│  │ │OKX Collector│ │Binance Coll.│ │Custom Collector│  │    │
│  │ │• Health Mon │ │• Health Mon │ │• Health Monitor│  │    │
│  │ │• Auto-restart│ │• Auto-restart│ │• Auto-restart │  │    │
│  │ │• Data Valid │ │• Data Valid │ │• Data Validate │  │    │
│  │ └─────────────┘ └─────────────┘ └────────────────┘  │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

Core Components

1. BaseDataCollector

An abstract base class that defines the common interface for all exchange collectors.

Key Responsibilities:

  • Standardized start, stop, restart methods
  • Built-in health monitoring with heartbeat and data silence detection
  • Automatic reconnect and restart logic
  • Asynchronous message handling

2. CollectorManager

A singleton class that manages all active data collectors in the system.

Key Responsibilities:

  • Centralized start and stop for all collectors
  • System-wide status aggregation
  • Global health monitoring
  • Coordination of restart policies

3. Exchange-Specific Collectors

Concrete implementations of BaseDataCollector for each exchange (e.g., OKXCollector).

Key Responsibilities:

  • Handle exchange-specific WebSocket protocols
  • Parse and standardize incoming data
  • Implement exchange-specific authentication
  • Define subscription messages for different data types

For more details, see OKX Collector Documentation (./exchanges/okx.md).

Exchange Factory

The ExchangeFactory provides a standardized way to create data collectors, decoupling the client code from specific implementations.

Features

  • Simplified Creation: Single function to create any supported collector
  • Configuration Driven: Uses ExchangeCollectorConfig for flexible setup
  • Validation: Validates configuration before creating a collector
  • Extensible: Easily register new exchange collectors

Usage

from data.exchanges import ExchangeFactory, ExchangeCollectorConfig
from data.common import DataType

# Create config for OKX collector
config = ExchangeCollectorConfig(
    exchange="okx",
    symbol="BTC-USDT",
    data_types=[DataType.TRADE, DataType.ORDERBOOK],
    auto_restart=True
)

# Create collector using the factory
try:
    collector = ExchangeFactory.create_collector(config)
    # Use the collector
    await collector.start()
except ValueError as e:
    print(f"Error creating collector: {e}")

# Create multiple collectors
configs = [...]
collectors = ExchangeFactory.create_multiple_collectors(configs)

Health Monitoring

The system includes a robust, two-level health monitoring system.

1. Collector-Level Monitoring

Each BaseDataCollector instance has its own health monitoring.

Key Metrics:

  • Heartbeat: Regular internal signal to confirm the collector is responsive
  • Data Silence: Tracks time since last message to detect frozen connections
  • Restart Count: Number of automatic restarts
  • Connection Status: Tracks WebSocket connection state

2. Manager-Level Monitoring

The CollectorManager provides a global view of system health.

Key Metrics:

  • Aggregate Status: Overview of all collectors (running, stopped, failed)
  • System Uptime: Total uptime for the collector system
  • Failed Collectors: List of collectors that failed to restart
  • Resource Usage: (Future) System-level CPU and memory monitoring

Health Status API

# Get status of a single collector
status = collector.get_status()
health = collector.get_health_status()

# Get status of the entire system
system_status = manager.get_status()

For detailed status schemas, refer to the Reference Documentation (../../reference/README.md).

API Reference

BaseDataCollector

  • async start()
  • async stop()
  • async restart()
  • get_status() -> dict
  • get_health_status() -> dict

CollectorManager

  • add_collector(collector)
  • async start_all()
  • async stop_all()
  • get_status() -> dict
  • list_collectors() -> list

ExchangeFactory

  • create_collector(config) -> BaseDataCollector
  • create_multiple_collectors(configs) -> list
  • get_supported_exchanges() -> list

Troubleshooting

Common Issues

  1. Collector fails to start

    • Cause: Invalid symbol, incorrect API keys, or network issues.
    • Solution: Check logs for error messages. Verify configuration and network connectivity.
  2. Collector stops receiving data

    • Cause: WebSocket connection dropped, exchange issues.
    • Solution: Health monitor should automatically restart. If not, check logs for reconnect errors.
  3. "Exchange not supported" error

    • Cause: Trying to create a collector for an exchange not registered in the factory.
    • Solution: Implement the collector and register it in data/exchanges/__init__.py.

Best Practices

  • Use the CollectorManager for lifecycle management.
  • Always validate configurations before creating collectors.
  • Monitor system status regularly using manager.get_status().
  • Refer to logs for detailed error analysis.

Back to [Modules Documentation (../README.md)]