# Data Collection Service **Service for collecting and storing real-time market data from multiple exchanges.** ## Architecture Overview The data collection service uses a **manager-worker architecture** to collect data for multiple trading pairs concurrently. - **`CollectorManager`**: The central manager responsible for creating, starting, stopping, and monitoring individual data collectors. - **`OKXCollector`**: A dedicated worker responsible for collecting data for a single trading pair from the OKX exchange. This architecture allows for high scalability and fault tolerance. ## Key Components ### `CollectorManager` - **Location**: `tasks/collector_manager.py` - **Responsibilities**: - Manages the lifecycle of multiple collectors - Provides a unified API for controlling all collectors - Monitors the health of each collector - Distributes tasks and aggregates results ### `OKXCollector` - **Location**: `data/exchanges/okx/collector.py` - **Responsibilities**: - Inherits from `BaseDataCollector` and implements exchange-specific data collection logic. - Utilizes `ConnectionManager` for robust WebSocket connection management. - Leverages `CollectorStateAndTelemetry` for internal status, health, and logging. - Uses `CallbackDispatcher` to notify registered consumers of processed data. - Subscribes to real-time data channels specific to OKX. - Processes and standardizes incoming OKX data before dispatching. - Stores processed data in the database. ## Configuration The service is configured through `config/bot_configs/data_collector_config.json`: ```json { "service_name": "data_collection_service", "enabled": true, "manager_config": { "component_name": "collector_manager", "health_check_interval": 60, "log_level": "INFO", "verbose": true }, "collectors": [ { "exchange": "okx", "symbol": "BTC-USDT", "data_types": ["trade", "orderbook"], "enabled": true }, { "exchange": "okx", "symbol": "ETH-USDT", "data_types": ["trade"], "enabled": true } ] } ``` ## Usage Start the service from the main application entry point: ```python # main.py from tasks.collector_manager import CollectorManager async def main(): manager = CollectorManager() await manager.start_all_collectors() if __name__ == "__main__": asyncio.run(main()) ``` ## Health & Monitoring The `CollectorManager` provides a `get_status()` method to monitor the health of all collectors. ## Features - **Service Lifecycle Management**: Start, stop, and monitor data collection operations - **JSON Configuration**: File-based configuration with automatic defaults - **Clean Production Logging**: Only essential operational information - **Health Monitoring**: Service-level health checks and auto-recovery - **Graceful Shutdown**: Proper signal handling and cleanup - **Multi-Exchange Orchestration**: Coordinate collectors across multiple exchanges - **Production Ready**: Designed for 24/7 operation with monitoring ## Quick Start ### Basic Usage ```bash # Start with default configuration (indefinite run) python scripts/start_data_collection.py # Run for 8 hours python scripts/start_data_collection.py --hours 8 # Use custom configuration python scripts/start_data_collection.py --config config/my_config.json ``` ### Monitoring ```bash # Check status once python scripts/monitor_clean.py # Monitor continuously every 60 seconds python scripts/monitor_clean.py --interval 60 ``` ## Configuration The service uses JSON configuration files with automatic default creation if none exists. ### Default Configuration Location `config/data_collection.json` ### Configuration Structure ```json { "exchanges": { "okx": { "enabled": true, "trading_pairs": [ { "symbol": "BTC-USDT", "enabled": true, "data_types": ["trade"], "timeframes": ["1m", "5m", "15m", "1h"] }, { "symbol": "ETH-USDT", "enabled": true, "data_types": ["trade"], "timeframes": ["1m", "5m", "15m", "1h"] } ] } }, "collection_settings": { "health_check_interval": 120, "store_raw_data": true, "auto_restart": true, "max_restart_attempts": 3 }, "logging": { "level": "INFO", "log_errors_only": true, "verbose_data_logging": false } } ``` ### Configuration Options #### Exchange Settings - **enabled**: Whether to enable this exchange - **trading_pairs**: Array of trading pair configurations #### Trading Pair Settings - **symbol**: Trading pair symbol (e.g., "BTC-USDT") - **enabled**: Whether to collect data for this pair - **data_types**: Types of data to collect (["trade"], ["ticker"], etc.) - **timeframes**: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"]) #### Collection Settings - **health_check_interval**: Health check frequency in seconds - **store_raw_data**: Whether to store raw trade data - **auto_restart**: Enable automatic restart on failures - **max_restart_attempts**: Maximum restart attempts before giving up #### Logging Settings - **level**: Log level ("DEBUG", "INFO", "WARNING", "ERROR") - **log_errors_only**: Only log errors and essential events - **verbose_data_logging**: Enable verbose logging of individual trades/candles ## Service Architecture ### Service Layer Components ``` ┌─────────────────────────────────────────────────┐ │ DataCollectionService │ │ ┌─────────────────────────────────────────┐ │ │ │ Configuration Manager │ │ │ │ • JSON config loading/validation │ │ │ │ • Default config generation │ │ │ │ • Runtime config updates │ │ │ └─────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────┐ │ │ │ Service Monitor │ │ │ │ • Service-level health checks │ │ │ │ • Uptime tracking │ │ │ │ • Error aggregation │ │ │ └─────────────────────────────────────────┘ │ │ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ CollectorManager │ │ │ │ • Individual collector management │ │ │ │ • Health monitoring │ │ │ │ • Auto-restart coordination │ │ │ └─────────────────────────────────────────┘ │ └─────────────────────────────────────────────────┘ │ ┌─────────────────────────────┐ │ Core Data Collectors │ │ (See data_collectors.md) │ └─────────────────────────────┘ ``` ### Data Flow ``` Configuration → Service → CollectorManager → Data Collectors → Database ↓ ↓ Service Monitor Health Monitor ``` ### Storage Integration - **Raw Data**: PostgreSQL `raw_trades` table via repository pattern - **Candles**: PostgreSQL `market_data` table with multiple timeframes - **Real-time**: Redis pub/sub for live data distribution - **Service Metrics**: Service uptime, error counts, collector statistics ## Logging Philosophy The service implements **clean production logging** focused on operational needs: ### What Gets Logged ✅ **Service Lifecycle** - Service start/stop events - Configuration loading - Service initialization ✅ **Collector Orchestration** - Collector creation and destruction - Service-level health summaries - Recovery operations ✅ **Configuration Events** - Config file changes - Runtime configuration updates - Validation errors ✅ **Service Statistics** - Periodic uptime reports - Collection summary statistics - Performance metrics ### What Doesn't Get Logged ❌ **Individual Data Points** - Every trade received - Every candle generated - Raw market data ❌ **Internal Operations** - Individual collector heartbeats - Routine database operations - Internal processing steps ## API Reference ### DataCollectionService The main service class for managing data collection operations. #### Constructor ```python DataCollectionService(config_path: str = "config/data_collection.json") ``` **Parameters:** - `config_path`: Path to JSON configuration file #### Methods ##### `async run(duration_hours: Optional[float] = None) -> bool` Run the service for a specified duration or indefinitely. **Parameters:** - `duration_hours`: Optional duration in hours (None = indefinite) **Returns:** - `bool`: True if successful, False if error occurred **Example:** ```python service = DataCollectionService() await service.run(duration_hours=24) # Run for 24 hours ``` ##### `async start() -> bool` Start the data collection service and all configured collectors. **Returns:** - `bool`: True if started successfully ##### `async stop() -> None` Stop the service gracefully, including all collectors and cleanup. ##### `get_status() -> Dict[str, Any]` Get current service status including uptime, collector counts, and errors. **Returns:** ```python { 'service_running': True, 'uptime_hours': 12.5, 'collectors_total': 6, 'collectors_running': 5, 'collectors_failed': 1, 'errors_count': 2, 'last_error': 'Connection timeout for ETH-USDT', 'configuration': { 'config_file': 'config/data_collection.json', 'exchanges_enabled': ['okx'], 'total_trading_pairs': 6 } } ``` ##### `async initialize_collectors() -> bool` Initialize all collectors based on configuration. **Returns:** - `bool`: True if all collectors initialized successfully ##### `load_configuration() -> Dict[str, Any]` Load and validate configuration from file. **Returns:** - `dict`: Loaded configuration ### Standalone Function #### `run_data_collection_service(config_path, duration_hours)` ```python async def run_data_collection_service( config_path: str = "config/data_collection.json", duration_hours: Optional[float] = None ) -> bool ``` Convenience function to run the service with minimal setup. **Parameters:** - `config_path`: Path to configuration file - `duration_hours`: Optional duration in hours **Returns:** - `bool`: True if successful ## Integration Examples ### Basic Service Integration ```python import asyncio from data.collection_service import DataCollectionService async def main(): service = DataCollectionService("config/my_config.json") # Run for 24 hours success = await service.run(duration_hours=24) if not success: print("Service encountered errors") if __name__ == "__main__": asyncio.run(main()) ``` ### Custom Status Monitoring ```python import asyncio from data.collection_service import DataCollectionService async def monitor_service(): service = DataCollectionService() # Start service in background start_task = asyncio.create_task(service.run()) # Monitor status every 5 minutes while service.running: status = service.get_status() print(f"Service Uptime: {status['uptime_hours']:.1f}h") print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}") print(f"Errors: {status['errors_count']}") await asyncio.sleep(300) # 5 minutes await start_task asyncio.run(monitor_service()) ``` ### Programmatic Control ```python import asyncio from data.collection_service import DataCollectionService async def controlled_collection(): service = DataCollectionService() try: # Initialize and start await service.initialize_collectors() await service.start() # Monitor and control while True: status = service.get_status() # Check if any collectors failed if status['collectors_failed'] > 0: print("Some collectors failed, checking health...") # Service auto-restart will handle this await asyncio.sleep(60) # Check every minute except KeyboardInterrupt: print("Shutting down service...") finally: await service.stop() asyncio.run(controlled_collection()) ``` ### Configuration Management ```python import asyncio import json from data.collection_service import DataCollectionService async def dynamic_configuration(): service = DataCollectionService() # Load and modify configuration config = service.load_configuration() # Add new trading pair config['exchanges']['okx']['trading_pairs'].append({ 'symbol': 'SOL-USDT', 'enabled': True, 'data_types': ['trade'], 'timeframes': ['1m', '5m'] }) # Save updated configuration with open('config/data_collection.json', 'w') as f: json.dump(config, f, indent=2) # Restart service with new config await service.stop() await service.start() asyncio.run(dynamic_configuration()) ``` ## Error Handling The service implements robust error handling at the service orchestration level: ### Service Level Errors - **Configuration Errors**: Invalid JSON, missing required fields - **Initialization Errors**: Failed collector creation, database connectivity - **Runtime Errors**: Service-level exceptions, resource exhaustion ### Error Recovery Strategies 1. **Graceful Degradation**: Continue with healthy collectors 2. **Configuration Validation**: Validate before applying changes 3. **Service Restart**: Full service restart on critical errors 4. **Error Aggregation**: Collect and report errors across all collectors ### Error Reporting ```python # Service status includes error information status = service.get_status() if status['errors_count'] > 0: print(f"Service has {status['errors_count']} errors") print(f"Last error: {status['last_error']}") # Get detailed error information from collectors for collector_name in service.manager.list_collectors(): collector_status = service.manager.get_collector_status(collector_name) if collector_status['status'] == 'error': print(f"Collector {collector_name}: {collector_status['statistics']['last_error']}") ``` ## Testing ### Running Service Tests ```bash # Run all data collection service tests uv run pytest tests/test_data_collection_service.py -v # Run specific test categories uv run pytest tests/test_data_collection_service.py::TestDataCollectionService -v # Run with coverage uv run pytest tests/test_data_collection_service.py --cov=data.collection_service ``` ### Test Coverage The service test suite covers: - Service initialization and configuration loading - Collector orchestration and management - Service lifecycle (start/stop/restart) - Configuration validation and error handling - Signal handling and graceful shutdown - Status reporting and monitoring - Error aggregation and recovery ### Mock Testing ```python import pytest from unittest.mock import AsyncMock, patch from data.collection_service import DataCollectionService @pytest.mark.asyncio async def test_service_with_mock_collectors(): with patch('data.collection_service.CollectorManager') as mock_manager: # Mock successful initialization mock_manager.return_value.start.return_value = True service = DataCollectionService() result = await service.start() assert result is True mock_manager.return_value.start.assert_called_once() ``` ## Production Deployment ### Docker Deployment ```dockerfile FROM python:3.11-slim WORKDIR /app COPY . . # Install dependencies RUN pip install uv RUN uv pip install -r requirements.txt # Create logs and config directories RUN mkdir -p logs config # Copy production configuration COPY config/production.json config/data_collection.json # Health check HEALTHCHECK --interval=60s --timeout=10s --start-period=30s --retries=3 \ CMD python scripts/health_check.py || exit 1 # Run service CMD ["python", "scripts/start_data_collection.py", "--config", "config/data_collection.json"] ``` ### Kubernetes Deployment ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: data-collection-service spec: replicas: 1 selector: matchLabels: app: data-collection-service template: metadata: labels: app: data-collection-service spec: containers: - name: data-collector image: crypto-dashboard/data-collector:latest ports: - containerPort: 8080 env: - name: POSTGRES_HOST value: "postgres-service" - name: REDIS_HOST value: "redis-service" volumeMounts: - name: config-volume mountPath: /app/config - name: logs-volume mountPath: /app/logs livenessProbe: exec: command: - python - scripts/health_check.py initialDelaySeconds: 30 periodSeconds: 60 volumes: - name: config-volume configMap: name: data-collection-config - name: logs-volume emptyDir: {} ``` ### Systemd Service ```ini [Unit] Description=Cryptocurrency Data Collection Service After=network.target postgres.service redis.service Requires=postgres.service redis.service [Service] Type=simple User=crypto-collector Group=crypto-collector WorkingDirectory=/opt/crypto-dashboard ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json ExecReload=/bin/kill -HUP $MAINPID Restart=always RestartSec=10 KillMode=mixed TimeoutStopSec=30 # Environment Environment=PYTHONPATH=/opt/crypto-dashboard Environment=LOG_LEVEL=INFO # Security NoNewPrivileges=true PrivateTmp=true ProtectSystem=strict ReadWritePaths=/opt/crypto-dashboard/logs [Install] WantedBy=multi-user.target ``` ### Environment Configuration ```bash # Production environment variables export ENVIRONMENT=production export POSTGRES_HOST=postgres.internal export POSTGRES_PORT=5432 export POSTGRES_DB=crypto_dashboard export POSTGRES_USER=dashboard_user export POSTGRES_PASSWORD=secure_password export REDIS_HOST=redis.internal export REDIS_PORT=6379 # Service configuration export DATA_COLLECTION_CONFIG=/etc/crypto-dashboard/data_collection.json export LOG_LEVEL=INFO export HEALTH_CHECK_INTERVAL=120 ``` ## Monitoring and Alerting ### Metrics Collection The service exposes metrics for monitoring systems: ```python # Service metrics service_uptime_hours = 24.5 collectors_running = 5 collectors_total = 6 errors_per_hour = 0.2 data_points_processed = 15000 ``` ### Health Checks ```python # External health check endpoint async def health_check(): service = DataCollectionService() status = service.get_status() if not status['service_running']: return {'status': 'unhealthy', 'reason': 'service_stopped'} if status['collectors_failed'] > status['collectors_total'] * 0.5: return {'status': 'degraded', 'reason': 'too_many_failed_collectors'} return {'status': 'healthy'} ``` ### Alerting Rules ```yaml # Prometheus alerting rules groups: - name: data_collection_service rules: - alert: DataCollectionServiceDown expr: up{job="data-collection-service"} == 0 for: 5m annotations: summary: "Data collection service is down" - alert: TooManyFailedCollectors expr: collectors_failed / collectors_total > 0.5 for: 10m annotations: summary: "More than 50% of collectors have failed" - alert: HighErrorRate expr: rate(errors_total[5m]) > 0.1 for: 15m annotations: summary: "High error rate in data collection service" ``` ## Performance Considerations ### Resource Usage - **Memory**: ~150MB base + ~15MB per trading pair (including service overhead) - **CPU**: Low (async I/O bound, service orchestration) - **Network**: ~1KB/s per trading pair - **Storage**: Service logs ~10MB/day ### Scaling Strategies 1. **Horizontal Scaling**: Multiple service instances with different configurations 2. **Configuration Partitioning**: Separate services by exchange or asset class 3. **Load Balancing**: Distribute trading pairs across service instances 4. **Regional Deployment**: Deploy closer to exchange data centers ### Optimization Tips 1. **Configuration Tuning**: Optimize health check intervals and timeframes 2. **Resource Limits**: Set appropriate memory and CPU limits 3. **Batch Operations**: Use efficient database operations 4. **Monitoring Overhead**: Balance monitoring frequency with performance ## Troubleshooting ### Common Service Issues #### Service Won't Start ``` ❌ Failed to start data collection service ``` **Solutions:** 1. Check configuration file validity 2. Verify database connectivity 3. Ensure no port conflicts 4. Check file permissions #### Configuration Loading Failed ``` ❌ Failed to load config from config/data_collection.json: Invalid JSON ``` **Solutions:** 1. Validate JSON syntax 2. Check required fields 3. Verify file encoding (UTF-8) 4. Recreate default configuration #### No Collectors Created ``` ❌ No collectors were successfully initialized ``` **Solutions:** 1. Check exchange configuration 2. Verify trading pair symbols 3. Check network connectivity 4. Review collector creation logs ### Debug Mode Enable verbose service debugging: ```json { "logging": { "level": "DEBUG", "log_errors_only": false, "verbose_data_logging": true } } ``` ### Service Diagnostics ```python # Run diagnostic check from data.collection_service import DataCollectionService service = DataCollectionService() status = service.get_status() print(f"Service Running: {status['service_running']}") print(f"Configuration File: {status['configuration']['config_file']}") print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}") # Check individual collector health for collector_name in service.manager.list_collectors(): collector_status = service.manager.get_collector_status(collector_name) print(f"{collector_name}: {collector_status['status']}") ``` ## Related Documentation - [Data Collectors System](../components/data_collectors.md) - Core collector components - [Logging System](../components/logging.md) - Logging configuration - [Database Operations](../database/operations.md) - Database integration - [Monitoring Guide](../monitoring/README.md) - System monitoring setup