TCPDashboard/docs/services/data_collection_service.md
Vasily.onl 74d7e1ab2c docs
2025-06-03 12:08:43 +08:00

21 KiB

Data Collection Service

The Data Collection Service is a production-ready service for cryptocurrency market data collection with clean logging and robust error handling. It provides a service layer that manages multiple data collectors for different trading pairs and exchanges.

Overview

The service provides a high-level interface for managing the data collection system, handling configuration, lifecycle management, and monitoring. It acts as a orchestration layer on top of the core data collector components.

Features

  • Service Lifecycle Management: Start, stop, and monitor data collection operations
  • JSON Configuration: File-based configuration with automatic defaults
  • Clean Production Logging: Only essential operational information
  • Health Monitoring: Service-level health checks and auto-recovery
  • Graceful Shutdown: Proper signal handling and cleanup
  • Multi-Exchange Orchestration: Coordinate collectors across multiple exchanges
  • Production Ready: Designed for 24/7 operation with monitoring

Quick Start

Basic Usage

# Start with default configuration (indefinite run)
python scripts/start_data_collection.py

# Run for 8 hours
python scripts/start_data_collection.py --hours 8

# Use custom configuration
python scripts/start_data_collection.py --config config/my_config.json

Monitoring

# Check status once
python scripts/monitor_clean.py

# Monitor continuously every 60 seconds
python scripts/monitor_clean.py --interval 60

Configuration

The service uses JSON configuration files with automatic default creation if none exists.

Default Configuration Location

config/data_collection.json

Configuration Structure

{
  "exchanges": {
    "okx": {
      "enabled": true,
      "trading_pairs": [
        {
          "symbol": "BTC-USDT",
          "enabled": true,
          "data_types": ["trade"],
          "timeframes": ["1m", "5m", "15m", "1h"]
        },
        {
          "symbol": "ETH-USDT",
          "enabled": true,
          "data_types": ["trade"],
          "timeframes": ["1m", "5m", "15m", "1h"]
        }
      ]
    }
  },
  "collection_settings": {
    "health_check_interval": 120,
    "store_raw_data": true,
    "auto_restart": true,
    "max_restart_attempts": 3
  },
  "logging": {
    "level": "INFO",
    "log_errors_only": true,
    "verbose_data_logging": false
  }
}

Configuration Options

Exchange Settings

  • enabled: Whether to enable this exchange
  • trading_pairs: Array of trading pair configurations

Trading Pair Settings

  • symbol: Trading pair symbol (e.g., "BTC-USDT")
  • enabled: Whether to collect data for this pair
  • data_types: Types of data to collect (["trade"], ["ticker"], etc.)
  • timeframes: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"])

Collection Settings

  • health_check_interval: Health check frequency in seconds
  • store_raw_data: Whether to store raw trade data
  • auto_restart: Enable automatic restart on failures
  • max_restart_attempts: Maximum restart attempts before giving up

Logging Settings

  • level: Log level ("DEBUG", "INFO", "WARNING", "ERROR")
  • log_errors_only: Only log errors and essential events
  • verbose_data_logging: Enable verbose logging of individual trades/candles

Service Architecture

Service Layer Components

┌─────────────────────────────────────────────────┐
│                DataCollectionService            │
│  ┌─────────────────────────────────────────┐    │
│  │         Configuration Manager           │    │
│  │  • JSON config loading/validation      │    │
│  │  • Default config generation           │    │
│  │  • Runtime config updates              │    │
│  └─────────────────────────────────────────┘    │
│  ┌─────────────────────────────────────────┐    │
│  │           Service Monitor               │    │
│  │  • Service-level health checks         │    │
│  │  • Uptime tracking                     │    │
│  │  • Error aggregation                   │    │
│  └─────────────────────────────────────────┘    │
│                      │                          │
│  ┌─────────────────────────────────────────┐    │
│  │         CollectorManager                │    │
│  │  • Individual collector management     │    │
│  │  • Health monitoring                   │    │
│  │  • Auto-restart coordination           │    │
│  └─────────────────────────────────────────┘    │
└─────────────────────────────────────────────────┘
                      │
        ┌─────────────────────────────┐
        │     Core Data Collectors    │
        │  (See data_collectors.md)   │
        └─────────────────────────────┘

Data Flow

Configuration → Service → CollectorManager → Data Collectors → Database
                 ↓                                ↓
            Service Monitor                 Health Monitor

Storage Integration

  • Raw Data: PostgreSQL raw_trades table via repository pattern
  • Candles: PostgreSQL market_data table with multiple timeframes
  • Real-time: Redis pub/sub for live data distribution
  • Service Metrics: Service uptime, error counts, collector statistics

Logging Philosophy

The service implements clean production logging focused on operational needs:

What Gets Logged

Service Lifecycle

  • Service start/stop events
  • Configuration loading
  • Service initialization

Collector Orchestration

  • Collector creation and destruction
  • Service-level health summaries
  • Recovery operations

Configuration Events

  • Config file changes
  • Runtime configuration updates
  • Validation errors

Service Statistics

  • Periodic uptime reports
  • Collection summary statistics
  • Performance metrics

What Doesn't Get Logged

Individual Data Points

  • Every trade received
  • Every candle generated
  • Raw market data

Internal Operations

  • Individual collector heartbeats
  • Routine database operations
  • Internal processing steps

API Reference

DataCollectionService

The main service class for managing data collection operations.

Constructor

DataCollectionService(config_path: str = "config/data_collection.json")

Parameters:

  • config_path: Path to JSON configuration file

Methods

async run(duration_hours: Optional[float] = None) -> bool

Run the service for a specified duration or indefinitely.

Parameters:

  • duration_hours: Optional duration in hours (None = indefinite)

Returns:

  • bool: True if successful, False if error occurred

Example:

service = DataCollectionService()
await service.run(duration_hours=24)  # Run for 24 hours
async start() -> bool

Start the data collection service and all configured collectors.

Returns:

  • bool: True if started successfully
async stop() -> None

Stop the service gracefully, including all collectors and cleanup.

get_status() -> Dict[str, Any]

Get current service status including uptime, collector counts, and errors.

Returns:

{
    'service_running': True,
    'uptime_hours': 12.5,
    'collectors_total': 6,
    'collectors_running': 5,
    'collectors_failed': 1,
    'errors_count': 2,
    'last_error': 'Connection timeout for ETH-USDT',
    'configuration': {
        'config_file': 'config/data_collection.json',
        'exchanges_enabled': ['okx'],
        'total_trading_pairs': 6
    }
}
async initialize_collectors() -> bool

Initialize all collectors based on configuration.

Returns:

  • bool: True if all collectors initialized successfully
load_configuration() -> Dict[str, Any]

Load and validate configuration from file.

Returns:

  • dict: Loaded configuration

Standalone Function

run_data_collection_service(config_path, duration_hours)

async def run_data_collection_service(
    config_path: str = "config/data_collection.json",
    duration_hours: Optional[float] = None
) -> bool

Convenience function to run the service with minimal setup.

Parameters:

  • config_path: Path to configuration file
  • duration_hours: Optional duration in hours

Returns:

  • bool: True if successful

Integration Examples

Basic Service Integration

import asyncio
from data.collection_service import DataCollectionService

async def main():
    service = DataCollectionService("config/my_config.json")
    
    # Run for 24 hours
    success = await service.run(duration_hours=24)
    
    if not success:
        print("Service encountered errors")

if __name__ == "__main__":
    asyncio.run(main())

Custom Status Monitoring

import asyncio
from data.collection_service import DataCollectionService

async def monitor_service():
    service = DataCollectionService()
    
    # Start service in background
    start_task = asyncio.create_task(service.run())
    
    # Monitor status every 5 minutes
    while service.running:
        status = service.get_status()
        print(f"Service Uptime: {status['uptime_hours']:.1f}h")
        print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")
        print(f"Errors: {status['errors_count']}")
        
        await asyncio.sleep(300)  # 5 minutes
    
    await start_task

asyncio.run(monitor_service())

Programmatic Control

import asyncio
from data.collection_service import DataCollectionService

async def controlled_collection():
    service = DataCollectionService()
    
    try:
        # Initialize and start
        await service.initialize_collectors()
        await service.start()
        
        # Monitor and control
        while True:
            status = service.get_status()
            
            # Check if any collectors failed
            if status['collectors_failed'] > 0:
                print("Some collectors failed, checking health...")
                # Service auto-restart will handle this
            
            await asyncio.sleep(60)  # Check every minute
            
    except KeyboardInterrupt:
        print("Shutting down service...")
    finally:
        await service.stop()

asyncio.run(controlled_collection())

Configuration Management

import asyncio
import json
from data.collection_service import DataCollectionService

async def dynamic_configuration():
    service = DataCollectionService()
    
    # Load and modify configuration
    config = service.load_configuration()
    
    # Add new trading pair
    config['exchanges']['okx']['trading_pairs'].append({
        'symbol': 'SOL-USDT',
        'enabled': True,
        'data_types': ['trade'],
        'timeframes': ['1m', '5m']
    })
    
    # Save updated configuration
    with open('config/data_collection.json', 'w') as f:
        json.dump(config, f, indent=2)
    
    # Restart service with new config
    await service.stop()
    await service.start()

asyncio.run(dynamic_configuration())

Error Handling

The service implements robust error handling at the service orchestration level:

Service Level Errors

  • Configuration Errors: Invalid JSON, missing required fields
  • Initialization Errors: Failed collector creation, database connectivity
  • Runtime Errors: Service-level exceptions, resource exhaustion

Error Recovery Strategies

  1. Graceful Degradation: Continue with healthy collectors
  2. Configuration Validation: Validate before applying changes
  3. Service Restart: Full service restart on critical errors
  4. Error Aggregation: Collect and report errors across all collectors

Error Reporting

# Service status includes error information
status = service.get_status()

if status['errors_count'] > 0:
    print(f"Service has {status['errors_count']} errors")
    print(f"Last error: {status['last_error']}")
    
    # Get detailed error information from collectors
    for collector_name in service.manager.list_collectors():
        collector_status = service.manager.get_collector_status(collector_name)
        if collector_status['status'] == 'error':
            print(f"Collector {collector_name}: {collector_status['statistics']['last_error']}")

Testing

Running Service Tests

# Run all data collection service tests
uv run pytest tests/test_data_collection_service.py -v

# Run specific test categories
uv run pytest tests/test_data_collection_service.py::TestDataCollectionService -v

# Run with coverage
uv run pytest tests/test_data_collection_service.py --cov=data.collection_service

Test Coverage

The service test suite covers:

  • Service initialization and configuration loading
  • Collector orchestration and management
  • Service lifecycle (start/stop/restart)
  • Configuration validation and error handling
  • Signal handling and graceful shutdown
  • Status reporting and monitoring
  • Error aggregation and recovery

Mock Testing

import pytest
from unittest.mock import AsyncMock, patch
from data.collection_service import DataCollectionService

@pytest.mark.asyncio
async def test_service_with_mock_collectors():
    with patch('data.collection_service.CollectorManager') as mock_manager:
        # Mock successful initialization
        mock_manager.return_value.start.return_value = True
        
        service = DataCollectionService()
        result = await service.start()
        
        assert result is True
        mock_manager.return_value.start.assert_called_once()

Production Deployment

Docker Deployment

FROM python:3.11-slim

WORKDIR /app
COPY . .

# Install dependencies
RUN pip install uv
RUN uv pip install -r requirements.txt

# Create logs and config directories
RUN mkdir -p logs config

# Copy production configuration
COPY config/production.json config/data_collection.json

# Health check
HEALTHCHECK --interval=60s --timeout=10s --start-period=30s --retries=3 \
  CMD python scripts/health_check.py || exit 1

# Run service
CMD ["python", "scripts/start_data_collection.py", "--config", "config/data_collection.json"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: data-collection-service
spec:
  replicas: 1
  selector:
    matchLabels:
      app: data-collection-service
  template:
    metadata:
      labels:
        app: data-collection-service
    spec:
      containers:
      - name: data-collector
        image: crypto-dashboard/data-collector:latest
        ports:
        - containerPort: 8080
        env:
        - name: POSTGRES_HOST
          value: "postgres-service"
        - name: REDIS_HOST
          value: "redis-service"
        volumeMounts:
        - name: config-volume
          mountPath: /app/config
        - name: logs-volume
          mountPath: /app/logs
        livenessProbe:
          exec:
            command:
            - python
            - scripts/health_check.py
          initialDelaySeconds: 30
          periodSeconds: 60
      volumes:
      - name: config-volume
        configMap:
          name: data-collection-config
      - name: logs-volume
        emptyDir: {}

Systemd Service

[Unit]
Description=Cryptocurrency Data Collection Service
After=network.target postgres.service redis.service
Requires=postgres.service redis.service

[Service]
Type=simple
User=crypto-collector
Group=crypto-collector
WorkingDirectory=/opt/crypto-dashboard
ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=10
KillMode=mixed
TimeoutStopSec=30

# Environment
Environment=PYTHONPATH=/opt/crypto-dashboard
Environment=LOG_LEVEL=INFO

# Security
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ReadWritePaths=/opt/crypto-dashboard/logs

[Install]
WantedBy=multi-user.target

Environment Configuration

# Production environment variables
export ENVIRONMENT=production
export POSTGRES_HOST=postgres.internal
export POSTGRES_PORT=5432
export POSTGRES_DB=crypto_dashboard
export POSTGRES_USER=dashboard_user
export POSTGRES_PASSWORD=secure_password
export REDIS_HOST=redis.internal
export REDIS_PORT=6379

# Service configuration
export DATA_COLLECTION_CONFIG=/etc/crypto-dashboard/data_collection.json
export LOG_LEVEL=INFO
export HEALTH_CHECK_INTERVAL=120

Monitoring and Alerting

Metrics Collection

The service exposes metrics for monitoring systems:

# Service metrics
service_uptime_hours = 24.5
collectors_running = 5
collectors_total = 6
errors_per_hour = 0.2
data_points_processed = 15000

Health Checks

# External health check endpoint
async def health_check():
    service = DataCollectionService()
    status = service.get_status()
    
    if not status['service_running']:
        return {'status': 'unhealthy', 'reason': 'service_stopped'}
    
    if status['collectors_failed'] > status['collectors_total'] * 0.5:
        return {'status': 'degraded', 'reason': 'too_many_failed_collectors'}
    
    return {'status': 'healthy'}

Alerting Rules

# Prometheus alerting rules
groups:
- name: data_collection_service
  rules:
  - alert: DataCollectionServiceDown
    expr: up{job="data-collection-service"} == 0
    for: 5m
    annotations:
      summary: "Data collection service is down"
      
  - alert: TooManyFailedCollectors
    expr: collectors_failed / collectors_total > 0.5
    for: 10m
    annotations:
      summary: "More than 50% of collectors have failed"
      
  - alert: HighErrorRate
    expr: rate(errors_total[5m]) > 0.1
    for: 15m
    annotations:
      summary: "High error rate in data collection service"

Performance Considerations

Resource Usage

  • Memory: ~150MB base + ~15MB per trading pair (including service overhead)
  • CPU: Low (async I/O bound, service orchestration)
  • Network: ~1KB/s per trading pair
  • Storage: Service logs ~10MB/day

Scaling Strategies

  1. Horizontal Scaling: Multiple service instances with different configurations
  2. Configuration Partitioning: Separate services by exchange or asset class
  3. Load Balancing: Distribute trading pairs across service instances
  4. Regional Deployment: Deploy closer to exchange data centers

Optimization Tips

  1. Configuration Tuning: Optimize health check intervals and timeframes
  2. Resource Limits: Set appropriate memory and CPU limits
  3. Batch Operations: Use efficient database operations
  4. Monitoring Overhead: Balance monitoring frequency with performance

Troubleshooting

Common Service Issues

Service Won't Start

❌ Failed to start data collection service

Solutions:

  1. Check configuration file validity
  2. Verify database connectivity
  3. Ensure no port conflicts
  4. Check file permissions

Configuration Loading Failed

❌ Failed to load config from config/data_collection.json: Invalid JSON

Solutions:

  1. Validate JSON syntax
  2. Check required fields
  3. Verify file encoding (UTF-8)
  4. Recreate default configuration

No Collectors Created

❌ No collectors were successfully initialized

Solutions:

  1. Check exchange configuration
  2. Verify trading pair symbols
  3. Check network connectivity
  4. Review collector creation logs

Debug Mode

Enable verbose service debugging:

{
  "logging": {
    "level": "DEBUG",
    "log_errors_only": false,
    "verbose_data_logging": true
  }
}

Service Diagnostics

# Run diagnostic check
from data.collection_service import DataCollectionService

service = DataCollectionService()
status = service.get_status()

print(f"Service Running: {status['service_running']}")
print(f"Configuration File: {status['configuration']['config_file']}")
print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")

# Check individual collector health
for collector_name in service.manager.list_collectors():
    collector_status = service.manager.get_collector_status(collector_name)
    print(f"{collector_name}: {collector_status['status']}")