TCPDashboard/docs/modules/services/data_collection_service.md
Vasily.onl 0a7e444206 Refactor data collection architecture for modularity and maintainability
- Updated `pyproject.toml` to include the new `data` package in the build configuration, ensuring all components are properly included.
- Introduced `ADR-004` documentation outlining the rationale for refactoring the data collection system into a modular architecture, addressing complexity and maintainability issues.
- Enhanced `data_collectors.md` to reflect the new component structure, detailing responsibilities of `CollectorLifecycleManager`, `ManagerHealthMonitor`, `ManagerStatsTracker`, and `ManagerLogger`.
- Refactored `DataCollectionService` to utilize the new modular components, improving orchestration and error handling.
- Removed the obsolete `collector-service-tasks-optimization.md` and `refactor-common-package.md` files, streamlining the tasks documentation.

These changes significantly improve the architecture and maintainability of the data collection service, aligning with project standards for modularity, performance, and documentation clarity.
2025-06-10 14:32:00 +08:00

32 KiB

Data Collection Service

Service for collecting and storing real-time market data from multiple exchanges.

Architecture Overview

The data collection service has been refactored into a modular, component-based architecture to collect data for multiple trading pairs concurrently with improved maintainability, scalability, and testability.

  • DataCollectionService: The primary orchestration layer, responsible for initializing and coordinating core service components. It delegates specific functionalities to dedicated managers and factories.
  • CollectorManager: Now acts as an orchestrator for individual data collectors, utilizing its own set of internal components (e.g., CollectorLifecycleManager, ManagerHealthMonitor, ManagerStatsTracker, ManagerLogger).
  • Dedicated Components: Specific concerns like configuration, collector creation, and asynchronous task management are handled by new, specialized classes (ServiceConfig, CollectorFactory, AsyncTaskManager).
  • OKXCollector: A dedicated worker responsible for collecting data for a single trading pair from the OKX exchange, now built upon a more robust BaseDataCollector and its internal components (ConnectionManager, CollectorStateAndTelemetry, CallbackDispatcher).

This modular architecture allows for high scalability, fault tolerance, and clear separation of concerns.

Key Components

DataCollectionService

  • Location: data/collection_service.py
  • Responsibilities:
    • Orchestrates the overall data collection process.
    • Initializes and coordinates ServiceConfig, CollectorFactory, CollectorManager, and AsyncTaskManager.
    • Manages the main service loop and graceful shutdown.
    • Provides a high-level API for running and monitoring the service.

ServiceConfig

  • Location: config/service_config.py
  • Responsibilities:
    • Handles loading, creating, and validating service configurations.
    • Ensures configuration file integrity, including file permission validation.
    • Manages default configuration generation and runtime updates.

CollectorFactory

  • Location: data/collector_factory.py
  • Responsibilities:
    • Encapsulates the logic for creating individual data collector instances (e.g., OKXCollector).
    • Decouples collector instantiation from the DataCollectionService.
    • Ensures collectors are created with correct configurations and dependencies.

AsyncTaskManager

  • Location: utils/async_task_manager.py
  • Responsibilities:
    • Manages and tracks asyncio.Task instances throughout the application.
    • Prevents potential memory leaks by ensuring proper task lifecycle management.
    • Facilitates robust asynchronous operations for both DataCollectionService and CollectorManager.

CollectorManager

  • Location: data/collector_manager.py
  • Responsibilities:
    • Acts as an orchestrator for all active data collectors.
    • Delegates specific responsibilities to its new internal components:
      • CollectorLifecycleManager: Manages adding, removing, starting, and stopping collectors.
      • ManagerHealthMonitor: Encapsulates global health monitoring and auto-restart logic.
      • ManagerStatsTracker: Handles performance statistics collection and caching.
      • ManagerLogger: Centralizes logging operations for the manager and its collectors.
    • Provides a unified interface for controlling and monitoring managed collectors.

OKXCollector

  • Location: data/exchanges/okx/collector.py
  • Responsibilities:
    • Inherits from BaseDataCollector and implements exchange-specific data collection logic.
    • Utilizes ConnectionManager for robust WebSocket connection management.
    • Leverages CollectorStateAndTelemetry for internal status, health, and logging.
    • Uses CallbackDispatcher to notify registered consumers of processed data.
    • Subscribes to real-time data channels specific to OKX.
    • Processes and standardizes incoming OKX data before dispatching.
    • Stores processed data in the database.

Configuration

The service is configured through config/bot_configs/data_collector_config.json:

{
  "service_name": "data_collection_service",
  "enabled": true,
  "manager_config": {
    "component_name": "collector_manager",
    "health_check_interval": 60,
    "log_level": "INFO",
    "verbose": true
  },
  "collectors": [
    {
      "exchange": "okx",
      "symbol": "BTC-USDT",
      "data_types": ["trade", "orderbook"],
      "enabled": true
    },
    {
      "exchange": "okx",
      "symbol": "ETH-USDT",
      "data_types": ["trade"],
      "enabled": true
    }
  ]
}

Usage

The DataCollectionService is the main entry point for running the data collection system.

Start the service from a script (e.g., scripts/start_data_collection.py):

# scripts/start_data_collection.py
import asyncio
from data.collection_service import DataCollectionService
from utils.logger import setup_logging # Assuming this exists or is created

async def main():
    setup_logging() # Initialize logging
    service = DataCollectionService(config_path="config/data_collection.json")
    await service.run() # Or run with a duration: await service.run(duration_hours=24)

if __name__ == "__main__":
    asyncio.run(main())

Health & Monitoring

The DataCollectionService and CollectorManager provide comprehensive health and monitoring capabilities through their dedicated components.

Features

  • Service Lifecycle Management: Start, stop, and monitor data collection operations
  • JSON Configuration: File-based configuration with automatic defaults
  • Clean Production Logging: Only essential operational information
  • Health Monitoring: Service-level health checks and auto-recovery
  • Graceful Shutdown: Proper signal handling and cleanup
  • Multi-Exchange Orchestration: Coordinate collectors across multiple exchanges
  • Production Ready: Designed for 24/7 operation with monitoring

Quick Start

Basic Usage

# Start with default configuration (indefinite run)
python scripts/start_data_collection.py

# Run for 8 hours
python scripts/start_data_collection.py --hours 8

# Use custom configuration
python scripts/start_data_collection.py --config config/my_config.json

Monitoring

# Check status once
python scripts/monitor_clean.py

# Monitor continuously every 60 seconds
python scripts/monitor_clean.py --interval 60

Configuration

The service uses JSON configuration files with automatic default creation if none exists.

Default Configuration Location

config/data_collection.json

Configuration Structure

{
  "exchanges": {
    "okx": {
      "enabled": true,
      "trading_pairs": [
        {
          "symbol": "BTC-USDT",
          "enabled": true,
          "data_types": ["trade"],
          "timeframes": ["1m", "5m", "15m", "1h"]
        },
        {
          "symbol": "ETH-USDT",
          "enabled": true,
          "data_types": ["trade"],
          "timeframes": ["1m", "5m", "15m", "1h"]
        }
      ]
    }
  },
  "collection_settings": {
    "health_check_interval": 120,
    "store_raw_data": true,
    "auto_restart": true,
    "max_restart_attempts": 3
  },
  "logging": {
    "level": "INFO",
    "log_errors_only": true,
    "verbose_data_logging": false
  }
}

Configuration Options

Exchange Settings

  • enabled: Whether to enable this exchange
  • trading_pairs: Array of trading pair configurations

Trading Pair Settings

  • symbol: Trading pair symbol (e.g., "BTC-USDT")
  • enabled: Whether to collect data for this pair
  • data_types: Types of data to collect (["trade"], ["ticker"], etc.)
  • timeframes: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"])

Collection Settings

  • health_check_interval: Health check frequency in seconds
  • store_raw_data: Whether to store raw trade data
  • auto_restart: Enable automatic restart on failures
  • max_restart_attempts: Maximum restart attempts before giving up

Logging Settings

  • level: Log level ("DEBUG", "INFO", "WARNING", "ERROR")
  • log_errors_only: Only log errors and essential events
  • verbose_data_logging: Enable verbose logging of individual trades/candles

Service Architecture

Service Layer Components

graph TD
    subgraph DataCollectionService
        SC[ServiceConfig] -- Manages --> Conf(Configuration)
        SCF[CollectorFactory] -- Creates --> Collectors(Data Collectors)
        ATM[AsyncTaskManager] -- Manages --> Tasks(Async Tasks)
        DCS[DataCollectionService] -- Uses --> SC
        DCS -- Uses --> SCF
        DCS -- Uses --> ATM
        DCS -- Orchestrates --> CM(CollectorManager)
    end

    subgraph CollectorManager
        CM --> CLM(CollectorLifecycleManager)
        CM --> MHM(ManagerHealthMonitor)
        CM --> MST(ManagerStatsTracker)
        CM --> ML(ManagerLogger)
        CLM -- Manages --> BC[BaseDataCollector]
        MHM -- Monitors --> BC
        MST -- Tracks --> BC
        ML -- Logs For --> BC
    end

    subgraph BaseDataCollector (Core Data Collector)
        BC --> ConM(ConnectionManager)
        BC --> CST(CollectorStateAndTelemetry)
        BC --> CD(CallbackDispatcher)
    end

    Conf -- Provides --> DCS
    Collectors -- Created By --> SCF
    Tasks -- Managed By --> ATM
    CM -- Manages --> BaseDataCollector
    BaseDataCollector -- Collects Data --> Database
    BaseDataCollector -- Publishes Data --> Redis(Redis Pub/Sub)

    style DCS fill:#f9f,stroke:#333,stroke-width:2px
    style CM fill:#bbf,stroke:#333,stroke-width:2px
    style BC fill:#cfc,stroke:#333,stroke-width:2px
    style SC fill:#FFD700,stroke:#333,stroke-width:1px
    style SCF fill:#90EE90,stroke:#333,stroke-width:1px
    style ATM fill:#ADD8E6,stroke:#333,stroke-width:1px
    style CLM fill:#FFC0CB,stroke:#333,stroke-width:1px
    style MHM fill:#C0C0C0,stroke:#333,stroke-width:1px
    style MST fill:#DA70D6,stroke:#333,stroke-width:1px
    style ML fill:#DDA0DD,stroke:#333,stroke-width:1px
    style ConM fill:#F0F8FF,stroke:#333,stroke-width:1px
    style CST fill:#FFE4E1,stroke:#333,stroke-width:1px
    style CD fill:#FAFAD2,stroke:#333,stroke-width:1px
    style DB fill:#A9A9A9,stroke:#333,stroke-width:1px
    style Redis fill:#FF6347,stroke:#333,stroke-width:1px

Data Flow

graph LR
    Config(Configuration) --> ServiceConfig
    ServiceConfig --> DataCollectionService
    DataCollectionService -- Initializes --> CollectorManager
    DataCollectionService -- Initializes --> CollectorFactory
    DataCollectionService -- Initializes --> AsyncTaskManager
    CollectorFactory -- Creates --> BaseDataCollector
    CollectorManager -- Manages --> BaseDataCollector
    BaseDataCollector -- Collects Data --> Database
    BaseDataCollector -- Publishes Data --> RedisPubSub(Redis Pub/Sub)
    HealthMonitor(Health Monitoring) --> DataCollectionService
    HealthMonitor --> CollectorManager
    HealthMonitor --> BaseDataCollector
    ErrorHandling(Error Handling) --> DataCollectionService
    ErrorHandling --> CollectorManager
    ErrorHandling --> BaseDataCollector

Storage Integration

  • Raw Data: PostgreSQL raw_trades table via repository pattern
  • Candles: PostgreSQL market_data table with multiple timeframes
  • Real-time: Redis pub/sub for live data distribution
  • Service Metrics: Service uptime, error counts, collector statistics

Logging Philosophy

The service implements clean production logging focused on operational needs:

What Gets Logged

Service Lifecycle

  • Service start/stop events
  • Configuration loading
  • Service initialization

Collector Orchestration

  • Collector creation and destruction
  • Service-level health summaries
  • Recovery operations

Configuration Events

  • Config file changes
  • Runtime configuration updates
  • Validation errors

Service Statistics

  • Periodic uptime reports
  • Collection summary statistics
  • Performance metrics

What Doesn't Get Logged

Individual Data Points

  • Every trade received
  • Every candle generated
  • Raw market data

Internal Operations

  • Individual collector heartbeats
  • Routine database operations
  • Internal processing steps

API Reference

DataCollectionService

The main service class for managing data collection operations, now orchestrating through specialized components.

Constructor

DataCollectionService(
    config_path: str = "config/data_collection.json",
    service_config: Optional[ServiceConfig] = None,
    collector_factory: Optional[CollectorFactory] = None,
    collector_manager: Optional[CollectorManager] = None,
    async_task_manager: Optional[AsyncTaskManager] = None
)

Parameters:

  • config_path: Path to JSON configuration file. Used if service_config is not provided.
  • service_config: An instance of ServiceConfig. If None, one will be created.
  • collector_factory: An instance of CollectorFactory. If None, one will be created.
  • collector_manager: An instance of CollectorManager. If None, one will be created.
  • async_task_manager: An instance of AsyncTaskManager. If None, one will be created.

Methods

async run(duration_hours: Optional[float] = None) -> None

Runs the service for a specified duration or indefinitely. This method now coordinates the main event loop and lifecycle of all internal components.

Parameters:

  • duration_hours: Optional duration in hours (None = indefinite).

Returns:

  • None

Example:

from data.collection_service import DataCollectionService
import asyncio

async def run_service():
    service = DataCollectionService()
    await service.run(duration_hours=24) # Run for 24 hours

if __name__ == "__main__":
    asyncio.run(run_service())
async start() -> None

Initializes and starts the data collection service and all configured collectors. This method delegates to internal components for their respective startup procedures.

Returns:

  • None
async stop() -> None

Stops the service gracefully, including all collectors and internal cleanup. Ensures all asynchronous tasks are properly cancelled and resources released.

Returns:

  • None
get_status() -> Dict[str, Any]

Gets current service status, including uptime, collector counts, and errors, aggregated from underlying components.

Returns:

{
    'service_running': True,
    'uptime_hours': 12.5,
    'collectors_total': 6,
    'collectors_running': 5,
    'collectors_failed': 1,
    'errors_count': 2,
    'last_error': 'Connection timeout for ETH-USDT',
    'configuration': {
        'config_file': 'config/data_collection.json',
        'exchanges_enabled': ['okx'],
        'total_trading_pairs': 6
    },
    'detailed_collector_statuses': { # New field for detailed statuses
        'okx_BTC-USDT': {'status': 'RUNNING', 'health_score': 95},
        'okx_ETH-USDT': {'status': 'ERROR', 'last_error': 'Connection refused'}
    }
}
_run_main_loop(duration_hours: Optional[float])

Internal method extracted from run() to manage the core asynchronous loop.

Parameters:

  • duration_hours: Optional duration in hours for the loop.

Returns:

  • None

Standalone Function

run_data_collection_service(config_path, duration_hours)

async def run_data_collection_service(
    config_path: str = "config/data_collection.json",
    duration_hours: Optional[float] = None
) -> None

Convenience function to run the service with minimal setup, internally creating a DataCollectionService instance.

Parameters:

  • config_path: Path to configuration file.
  • duration_hours: Optional duration in hours.

Returns:

  • None

Integration Examples

Basic Service Integration

import asyncio
from data.collection_service import DataCollectionService
from utils.logger import setup_logging # Assuming this exists or is created

async def main():
    setup_logging()
    service = DataCollectionService("config/my_config.json")

    # Run for 24 hours
    await service.run(duration_hours=24)

    print("Service run finished.")

if __name__ == "__main__":
    asyncio.run(main())

Custom Status Monitoring

import asyncio
from data.collection_service import DataCollectionService
from utils.logger import setup_logging

async def monitor_service():
    setup_logging()
    service = DataCollectionService()

    # Start service in background
    start_task = asyncio.create_task(service.run())

    # Monitor status every 60 seconds
    try:
        while True:
            status = service.get_status()
            print(f"Service Uptime: {status['uptime_hours']:.1f}h")
            print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")
            print(f"Errors: {status['errors_count']}")
            if status['errors_count'] > 0:
                print(f"Last error: {status['last_error']}")
            print("Detailed Collector Statuses:")
            for name, details in status.get('detailed_collector_statuses', {}).items():
                print(f"  - {name}: Status={details.get('status')}, Health Score={details.get('health_score')}")

            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Monitoring cancelled.")
    finally:
        await service.stop()
        await start_task # Ensure the main service task is awaited

asyncio.run(monitor_service())

Programmatic Control

import asyncio
from data.collection_service import DataCollectionService
from utils.logger import setup_logging

async def controlled_collection():
    setup_logging()
    service = DataCollectionService()

    try:
        # Start the service
        await service.start()
        print("Data collection service started.")

        # Monitor and control
        while True:
            status = service.get_status()
            print(f"Current Service Status: {status['service_running']}, Collectors Running: {status['collectors_running']}")

            # Example: Stop if certain condition met (e.g., specific error, or after a duration)
            if status['collectors_failed'] > 0:
                print("Some collectors failed, service is recovering...")
                # The service's internal health monitor and task manager will handle restarts
            # For demonstration, stop after 5 minutes
            await asyncio.sleep(300)
            print("Stopping service after 5 minutes of operation.")
            break

    except KeyboardInterrupt:
        print("Manual shutdown requested.")
    finally:
        print("Shutting down service gracefully...")
        await service.stop()
        print("Service stopped.")

if __name__ == "__main__":
    asyncio.run(controlled_collection())

Configuration Management

import asyncio
import json
from data.collection_service import DataCollectionService
from utils.logger import setup_logging
from config.service_config import ServiceConfig # Import the new ServiceConfig

async def dynamic_configuration():
    setup_logging()
    # Instantiate ServiceConfig directly or let DataCollectionService create it
    service_config_instance = ServiceConfig(config_path="config/data_collection.json")
    service = DataCollectionService(service_config=service_config_instance)

    print("Initial configuration loaded:")
    print(json.dumps(service_config_instance.get_config(), indent=2))

    # Load and modify configuration
    config = service_config_instance.get_config()

    # Add new trading pair if not already present
    new_pair = {
        'symbol': 'SOL-USDT',
        'enabled': True,
        'data_types': ['trade'],
        'timeframes': ['1m', '5m']
    }
    if new_pair not in config['exchanges']['okx']['trading_pairs']:
        config['exchanges']['okx']['trading_pairs'].append(new_pair)
        print("Added SOL-USDT to configuration.")
    else:
        print("SOL-USDT already in configuration.")

    # Save updated configuration
    service_config_instance.save_config(config) # Use ServiceConfig to save

    print("Updated configuration saved. Restarting service with new config...")
    await service.stop()
    await service.start()
    print("Service restarted with updated configuration.")

    # Verify new pair is active (logic would be in get_status or similar)
    status = service.get_status()
    print(f"Current active collectors count: {status['collectors_total']}")

if __name__ == "__main__":
    asyncio.run(dynamic_configuration())

Error Handling

The service implements robust error handling at multiple layers, leveraging the new component structure for more precise error management and recovery.

Service Level Errors

  • Configuration Errors: Invalid JSON, missing required fields, file permission issues (handled by ServiceConfig).
  • Initialization Errors: Failed collector creation (handled by CollectorFactory), database connectivity.
  • Runtime Errors: Service-level exceptions, resource exhaustion, unhandled exceptions in asynchronous tasks (managed by AsyncTaskManager).

Error Recovery Strategies

  1. Graceful Degradation: Continue with healthy collectors while attempting to recover failed ones.
  2. Configuration Validation: ServiceConfig validates configurations before application, preventing common startup issues.
  3. Automated Restarts: ManagerHealthMonitor and AsyncTaskManager coordinate automatic restarts for failed collectors/tasks.
  4. Error Aggregation: ManagerStatsTracker collects and reports errors across all collectors, providing a unified view.
  5. Sanitized Error Messages: ManagerLogger ensures sensitive internal details are not leaked in logs or public interfaces.

Error Reporting

# Service status includes aggregated error information
status = service.get_status()

if status['errors_count'] > 0:
    print(f"Service has {status['errors_count']} errors.")
    print(f"Last service error: {status['last_error']}")

    # Get detailed error information from individual collectors if available
    if 'detailed_collector_statuses' in status:
        for collector_name, details in status['detailed_collector_statuses'].items():
            if details.get('status') == 'ERROR' and 'last_error' in details:
                print(f"Collector {collector_name} error: {details['last_error']}")

Testing

The testing approach now emphasizes unit tests for individual components and integration tests for component interactions, ensuring thorough coverage of the modular architecture.

Running Service Tests

# Run all data collection service tests
uv run pytest tests/data/collection_service -v # Assuming tests are in a 'collection_service' subdir

# Run specific component tests, e.g., for ServiceConfig
uv run pytest tests/config/test_service_config.py -v

# Run with coverage for the entire data collection module
uv run pytest --cov=data --cov=config --cov=utils tests/

Test Coverage

The expanded test suite now covers:

  • Component Unit Tests: Individual tests for ServiceConfig, CollectorFactory, AsyncTaskManager, CollectorLifecycleManager, ManagerHealthMonitor, ManagerStatsTracker, ManagerLogger.
  • Service Integration Tests: Testing DataCollectionService's orchestration of its components.
  • Service initialization and configuration loading/validation.
  • Collector orchestration and management via CollectorManager and CollectorLifecycleManager.
  • Asynchronous task management and error recovery.
  • Service lifecycle (start/stop/restart) and signal handling.
  • Status reporting and monitoring, including detailed collector statuses.
  • Error aggregation and recovery strategies.

Mock Testing

import pytest
from unittest.mock import AsyncMock, patch
from data.collection_service import DataCollectionService
from config.service_config import ServiceConfig # Ensure new components are imported for mocking

@pytest.mark.asyncio
async def test_service_with_mock_components():
    with patch('data.collection_service.ServiceConfig') as MockServiceConfig, \
         patch('data.collection_service.CollectorFactory') as MockCollectorFactory, \
         patch('data.collection_service.CollectorManager') as MockCollectorManager, \
         patch('data.collection_service.AsyncTaskManager') as MockAsyncTaskManager:

        # Configure mocks for successful operation
        MockServiceConfig.return_value.load_config.return_value = {"collectors": []}
        MockServiceConfig.return_value.get_config.return_value = {"collectors": []}
        MockCollectorManager.return_value.start_all.return_value = None
        MockCollectorManager.return_value.stop_all.return_value = None
        MockAsyncTaskManager.return_value.start.return_value = None
        MockAsyncTaskManager.return_value.stop.return_value = None

        service = DataCollectionService(
            service_config=MockServiceConfig.return_value,
            collector_factory=MockCollectorFactory.return_value,
            collector_manager=MockCollectorManager.return_value,
            async_task_manager=MockAsyncTaskManager.return_value
        )
        await service.start()

        # Assertions to ensure components were called correctly
        MockServiceConfig.return_value.load_config.assert_called_once()
        MockCollectorManager.return_value.start_all.assert_called_once()
        MockAsyncTaskManager.return_value.start.assert_called_once()

        await service.stop()
        MockCollectorManager.return_value.stop_all.assert_called_once()
        MockAsyncTaskManager.return_value.stop.assert_called_once()

Production Deployment

Docker Deployment

FROM python:3.11-slim

WORKDIR /app
COPY . .

# Install dependencies
RUN pip install uv
RUN uv pip install -r requirements.txt

# Create logs and config directories
RUN mkdir -p logs config

# Copy production configuration
COPY config/production.json config/data_collection.json

# Health check
HEALTHCHECK --interval=60s --timeout=10s --start-period=30s --retries=3 \
  CMD python scripts/health_check.py || exit 1

# Run service
CMD ["python", "scripts/start_data_collection.py", "--config", "config/data_collection.json"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: data-collection-service
spec:
  replicas: 1
  selector:
    matchLabels:
      app: data-collection-service
  template:
    metadata:
      labels:
        app: data-collection-service
    spec:
      containers:
      - name: data-collector
        image: crypto-dashboard/data-collector:latest
        ports:
        - containerPort: 8080
        env:
        - name: POSTGRES_HOST
          value: "postgres-service"
        - name: REDIS_HOST
          value: "redis-service"
        volumeMounts:
        - name: config-volume
          mountPath: /app/config
        - name: logs-volume
          mountPath: /app/logs
        livenessProbe:
          exec:
            command:
            - python
            - scripts/health_check.py
          initialDelaySeconds: 30
          periodSeconds: 60
      volumes:
      - name: config-volume
        configMap:
          name: data-collection-config
      - name: logs-volume
        emptyDir: {}

Systemd Service

[Unit]
Description=Cryptocurrency Data Collection Service
After=network.target postgres.service redis.service
Requires=postgres.service redis.service

[Service]
Type=simple
User=crypto-collector
Group=crypto-collector
WorkingDirectory=/opt/crypto-dashboard
ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=10
KillMode=mixed
TimeoutStopSec=30

# Environment
Environment=PYTHONPATH=/opt/crypto-dashboard
Environment=LOG_LEVEL=INFO

# Security
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ReadWritePaths=/opt/crypto-dashboard/logs

[Install]
WantedBy=multi-user.target

Environment Configuration

# Production environment variables
export ENVIRONMENT=production
export POSTGRES_HOST=postgres.internal
export POSTGRES_PORT=5432
export POSTGRES_DB=crypto_dashboard
export POSTGRES_USER=dashboard_user
export POSTGRES_PASSWORD=secure_password
export REDIS_HOST=redis.internal
export REDIS_PORT=6379

# Service configuration
export DATA_COLLECTION_CONFIG=/etc/crypto-dashboard/data_collection.json
export LOG_LEVEL=INFO
export HEALTH_CHECK_INTERVAL=120

Monitoring and Alerting

Metrics Collection

The service exposes metrics for monitoring systems:

# Service metrics
service_uptime_hours = 24.5
collectors_running = 5
collectors_total = 6
errors_per_hour = 0.2
data_points_processed = 15000

Health Checks

# External health check endpoint
async def health_check():
    service = DataCollectionService()
    status = service.get_status()
    
    if not status['service_running']:
        return {'status': 'unhealthy', 'reason': 'service_stopped'}
    
    if status['collectors_failed'] > status['collectors_total'] * 0.5:
        return {'status': 'degraded', 'reason': 'too_many_failed_collectors'}
    
    return {'status': 'healthy'}

Alerting Rules

# Prometheus alerting rules
groups:
- name: data_collection_service
  rules:
  - alert: DataCollectionServiceDown
    expr: up{job="data-collection-service"} == 0
    for: 5m
    annotations:
      summary: "Data collection service is down"
      
  - alert: TooManyFailedCollectors
    expr: collectors_failed / collectors_total > 0.5
    for: 10m
    annotations:
      summary: "More than 50% of collectors have failed"
      
  - alert: HighErrorRate
    expr: rate(errors_total[5m]) > 0.1
    for: 15m
    annotations:
      summary: "High error rate in data collection service"

Performance Considerations

Resource Usage

  • Memory: ~150MB base + ~15MB per trading pair (including service overhead)
  • CPU: Low (async I/O bound, service orchestration)
  • Network: ~1KB/s per trading pair
  • Storage: Service logs ~10MB/day

Scaling Strategies

  1. Horizontal Scaling: Multiple service instances with different configurations
  2. Configuration Partitioning: Separate services by exchange or asset class
  3. Load Balancing: Distribute trading pairs across service instances
  4. Regional Deployment: Deploy closer to exchange data centers

Optimization Tips

  1. Configuration Tuning: Optimize health check intervals and timeframes
  2. Resource Limits: Set appropriate memory and CPU limits
  3. Batch Operations: Use efficient database operations
  4. Monitoring Overhead: Balance monitoring frequency with performance

Troubleshooting

Common Service Issues

Service Won't Start

❌ Failed to start data collection service

Solutions:

  1. Check configuration file validity
  2. Verify database connectivity
  3. Ensure no port conflicts
  4. Check file permissions

Configuration Loading Failed

❌ Failed to load config from config/data_collection.json: Invalid JSON

Solutions:

  1. Validate JSON syntax
  2. Check required fields
  3. Verify file encoding (UTF-8)
  4. Recreate default configuration

No Collectors Created

❌ No collectors were successfully initialized

Solutions:

  1. Check exchange configuration
  2. Verify trading pair symbols
  3. Check network connectivity
  4. Review collector creation logs

Debug Mode

Enable verbose service debugging:

{
  "logging": {
    "level": "DEBUG",
    "log_errors_only": false,
    "verbose_data_logging": true
  }
}

Service Diagnostics

# Run diagnostic check
from data.collection_service import DataCollectionService

service = DataCollectionService()
status = service.get_status()

print(f"Service Running: {status['service_running']}")
print(f"Configuration File: {status['configuration']['config_file']}")
print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")

# Check individual collector health
for collector_name in service.manager.list_collectors():
    collector_status = service.manager.get_collector_status(collector_name)
    print(f"{collector_name}: {collector_status['status']}")