23 KiB
Data Collection Service
Service for collecting and storing real-time market data from multiple exchanges.
Architecture Overview
The data collection service uses a manager-worker architecture to collect data for multiple trading pairs concurrently.
CollectorManager: The central manager responsible for creating, starting, stopping, and monitoring individual data collectors.OKXCollector: A dedicated worker responsible for collecting data for a single trading pair from the OKX exchange.
This architecture allows for high scalability and fault tolerance.
Key Components
CollectorManager
- Location:
tasks/collector_manager.py - Responsibilities:
- Manages the lifecycle of multiple collectors
- Provides a unified API for controlling all collectors
- Monitors the health of each collector
- Distributes tasks and aggregates results
OKXCollector
- Location:
data/exchanges/okx/collector.py - Responsibilities:
- Connects to the OKX WebSocket API
- Subscribes to real-time data channels
- Processes and standardizes incoming data
- Stores data in the database
Configuration
The service is configured through config/bot_configs/data_collector_config.json:
{
"service_name": "data_collection_service",
"enabled": true,
"manager_config": {
"component_name": "collector_manager",
"health_check_interval": 60,
"log_level": "INFO",
"verbose": true
},
"collectors": [
{
"exchange": "okx",
"symbol": "BTC-USDT",
"data_types": ["trade", "orderbook"],
"enabled": true
},
{
"exchange": "okx",
"symbol": "ETH-USDT",
"data_types": ["trade"],
"enabled": true
}
]
}
Usage
Start the service from the main application entry point:
# main.py
from tasks.collector_manager import CollectorManager
async def main():
manager = CollectorManager()
await manager.start_all_collectors()
if __name__ == "__main__":
asyncio.run(main())
Health & Monitoring
The CollectorManager provides a get_status() method to monitor the health of all collectors.
Features
- Service Lifecycle Management: Start, stop, and monitor data collection operations
- JSON Configuration: File-based configuration with automatic defaults
- Clean Production Logging: Only essential operational information
- Health Monitoring: Service-level health checks and auto-recovery
- Graceful Shutdown: Proper signal handling and cleanup
- Multi-Exchange Orchestration: Coordinate collectors across multiple exchanges
- Production Ready: Designed for 24/7 operation with monitoring
Quick Start
Basic Usage
# Start with default configuration (indefinite run)
python scripts/start_data_collection.py
# Run for 8 hours
python scripts/start_data_collection.py --hours 8
# Use custom configuration
python scripts/start_data_collection.py --config config/my_config.json
Monitoring
# Check status once
python scripts/monitor_clean.py
# Monitor continuously every 60 seconds
python scripts/monitor_clean.py --interval 60
Configuration
The service uses JSON configuration files with automatic default creation if none exists.
Default Configuration Location
config/data_collection.json
Configuration Structure
{
"exchanges": {
"okx": {
"enabled": true,
"trading_pairs": [
{
"symbol": "BTC-USDT",
"enabled": true,
"data_types": ["trade"],
"timeframes": ["1m", "5m", "15m", "1h"]
},
{
"symbol": "ETH-USDT",
"enabled": true,
"data_types": ["trade"],
"timeframes": ["1m", "5m", "15m", "1h"]
}
]
}
},
"collection_settings": {
"health_check_interval": 120,
"store_raw_data": true,
"auto_restart": true,
"max_restart_attempts": 3
},
"logging": {
"level": "INFO",
"log_errors_only": true,
"verbose_data_logging": false
}
}
Configuration Options
Exchange Settings
- enabled: Whether to enable this exchange
- trading_pairs: Array of trading pair configurations
Trading Pair Settings
- symbol: Trading pair symbol (e.g., "BTC-USDT")
- enabled: Whether to collect data for this pair
- data_types: Types of data to collect (["trade"], ["ticker"], etc.)
- timeframes: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"])
Collection Settings
- health_check_interval: Health check frequency in seconds
- store_raw_data: Whether to store raw trade data
- auto_restart: Enable automatic restart on failures
- max_restart_attempts: Maximum restart attempts before giving up
Logging Settings
- level: Log level ("DEBUG", "INFO", "WARNING", "ERROR")
- log_errors_only: Only log errors and essential events
- verbose_data_logging: Enable verbose logging of individual trades/candles
Service Architecture
Service Layer Components
┌─────────────────────────────────────────────────┐
│ DataCollectionService │
│ ┌─────────────────────────────────────────┐ │
│ │ Configuration Manager │ │
│ │ • JSON config loading/validation │ │
│ │ • Default config generation │ │
│ │ • Runtime config updates │ │
│ └─────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────┐ │
│ │ Service Monitor │ │
│ │ • Service-level health checks │ │
│ │ • Uptime tracking │ │
│ │ • Error aggregation │ │
│ └─────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────┐ │
│ │ CollectorManager │ │
│ │ • Individual collector management │ │
│ │ • Health monitoring │ │
│ │ • Auto-restart coordination │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
│
┌─────────────────────────────┐
│ Core Data Collectors │
│ (See data_collectors.md) │
└─────────────────────────────┘
Data Flow
Configuration → Service → CollectorManager → Data Collectors → Database
↓ ↓
Service Monitor Health Monitor
Storage Integration
- Raw Data: PostgreSQL
raw_tradestable via repository pattern - Candles: PostgreSQL
market_datatable with multiple timeframes - Real-time: Redis pub/sub for live data distribution
- Service Metrics: Service uptime, error counts, collector statistics
Logging Philosophy
The service implements clean production logging focused on operational needs:
What Gets Logged
✅ Service Lifecycle
- Service start/stop events
- Configuration loading
- Service initialization
✅ Collector Orchestration
- Collector creation and destruction
- Service-level health summaries
- Recovery operations
✅ Configuration Events
- Config file changes
- Runtime configuration updates
- Validation errors
✅ Service Statistics
- Periodic uptime reports
- Collection summary statistics
- Performance metrics
What Doesn't Get Logged
❌ Individual Data Points
- Every trade received
- Every candle generated
- Raw market data
❌ Internal Operations
- Individual collector heartbeats
- Routine database operations
- Internal processing steps
API Reference
DataCollectionService
The main service class for managing data collection operations.
Constructor
DataCollectionService(config_path: str = "config/data_collection.json")
Parameters:
config_path: Path to JSON configuration file
Methods
async run(duration_hours: Optional[float] = None) -> bool
Run the service for a specified duration or indefinitely.
Parameters:
duration_hours: Optional duration in hours (None = indefinite)
Returns:
bool: True if successful, False if error occurred
Example:
service = DataCollectionService()
await service.run(duration_hours=24) # Run for 24 hours
async start() -> bool
Start the data collection service and all configured collectors.
Returns:
bool: True if started successfully
async stop() -> None
Stop the service gracefully, including all collectors and cleanup.
get_status() -> Dict[str, Any]
Get current service status including uptime, collector counts, and errors.
Returns:
{
'service_running': True,
'uptime_hours': 12.5,
'collectors_total': 6,
'collectors_running': 5,
'collectors_failed': 1,
'errors_count': 2,
'last_error': 'Connection timeout for ETH-USDT',
'configuration': {
'config_file': 'config/data_collection.json',
'exchanges_enabled': ['okx'],
'total_trading_pairs': 6
}
}
async initialize_collectors() -> bool
Initialize all collectors based on configuration.
Returns:
bool: True if all collectors initialized successfully
load_configuration() -> Dict[str, Any]
Load and validate configuration from file.
Returns:
dict: Loaded configuration
Standalone Function
run_data_collection_service(config_path, duration_hours)
async def run_data_collection_service(
config_path: str = "config/data_collection.json",
duration_hours: Optional[float] = None
) -> bool
Convenience function to run the service with minimal setup.
Parameters:
config_path: Path to configuration fileduration_hours: Optional duration in hours
Returns:
bool: True if successful
Integration Examples
Basic Service Integration
import asyncio
from data.collection_service import DataCollectionService
async def main():
service = DataCollectionService("config/my_config.json")
# Run for 24 hours
success = await service.run(duration_hours=24)
if not success:
print("Service encountered errors")
if __name__ == "__main__":
asyncio.run(main())
Custom Status Monitoring
import asyncio
from data.collection_service import DataCollectionService
async def monitor_service():
service = DataCollectionService()
# Start service in background
start_task = asyncio.create_task(service.run())
# Monitor status every 5 minutes
while service.running:
status = service.get_status()
print(f"Service Uptime: {status['uptime_hours']:.1f}h")
print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")
print(f"Errors: {status['errors_count']}")
await asyncio.sleep(300) # 5 minutes
await start_task
asyncio.run(monitor_service())
Programmatic Control
import asyncio
from data.collection_service import DataCollectionService
async def controlled_collection():
service = DataCollectionService()
try:
# Initialize and start
await service.initialize_collectors()
await service.start()
# Monitor and control
while True:
status = service.get_status()
# Check if any collectors failed
if status['collectors_failed'] > 0:
print("Some collectors failed, checking health...")
# Service auto-restart will handle this
await asyncio.sleep(60) # Check every minute
except KeyboardInterrupt:
print("Shutting down service...")
finally:
await service.stop()
asyncio.run(controlled_collection())
Configuration Management
import asyncio
import json
from data.collection_service import DataCollectionService
async def dynamic_configuration():
service = DataCollectionService()
# Load and modify configuration
config = service.load_configuration()
# Add new trading pair
config['exchanges']['okx']['trading_pairs'].append({
'symbol': 'SOL-USDT',
'enabled': True,
'data_types': ['trade'],
'timeframes': ['1m', '5m']
})
# Save updated configuration
with open('config/data_collection.json', 'w') as f:
json.dump(config, f, indent=2)
# Restart service with new config
await service.stop()
await service.start()
asyncio.run(dynamic_configuration())
Error Handling
The service implements robust error handling at the service orchestration level:
Service Level Errors
- Configuration Errors: Invalid JSON, missing required fields
- Initialization Errors: Failed collector creation, database connectivity
- Runtime Errors: Service-level exceptions, resource exhaustion
Error Recovery Strategies
- Graceful Degradation: Continue with healthy collectors
- Configuration Validation: Validate before applying changes
- Service Restart: Full service restart on critical errors
- Error Aggregation: Collect and report errors across all collectors
Error Reporting
# Service status includes error information
status = service.get_status()
if status['errors_count'] > 0:
print(f"Service has {status['errors_count']} errors")
print(f"Last error: {status['last_error']}")
# Get detailed error information from collectors
for collector_name in service.manager.list_collectors():
collector_status = service.manager.get_collector_status(collector_name)
if collector_status['status'] == 'error':
print(f"Collector {collector_name}: {collector_status['statistics']['last_error']}")
Testing
Running Service Tests
# Run all data collection service tests
uv run pytest tests/test_data_collection_service.py -v
# Run specific test categories
uv run pytest tests/test_data_collection_service.py::TestDataCollectionService -v
# Run with coverage
uv run pytest tests/test_data_collection_service.py --cov=data.collection_service
Test Coverage
The service test suite covers:
- Service initialization and configuration loading
- Collector orchestration and management
- Service lifecycle (start/stop/restart)
- Configuration validation and error handling
- Signal handling and graceful shutdown
- Status reporting and monitoring
- Error aggregation and recovery
Mock Testing
import pytest
from unittest.mock import AsyncMock, patch
from data.collection_service import DataCollectionService
@pytest.mark.asyncio
async def test_service_with_mock_collectors():
with patch('data.collection_service.CollectorManager') as mock_manager:
# Mock successful initialization
mock_manager.return_value.start.return_value = True
service = DataCollectionService()
result = await service.start()
assert result is True
mock_manager.return_value.start.assert_called_once()
Production Deployment
Docker Deployment
FROM python:3.11-slim
WORKDIR /app
COPY . .
# Install dependencies
RUN pip install uv
RUN uv pip install -r requirements.txt
# Create logs and config directories
RUN mkdir -p logs config
# Copy production configuration
COPY config/production.json config/data_collection.json
# Health check
HEALTHCHECK --interval=60s --timeout=10s --start-period=30s --retries=3 \
CMD python scripts/health_check.py || exit 1
# Run service
CMD ["python", "scripts/start_data_collection.py", "--config", "config/data_collection.json"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: data-collection-service
spec:
replicas: 1
selector:
matchLabels:
app: data-collection-service
template:
metadata:
labels:
app: data-collection-service
spec:
containers:
- name: data-collector
image: crypto-dashboard/data-collector:latest
ports:
- containerPort: 8080
env:
- name: POSTGRES_HOST
value: "postgres-service"
- name: REDIS_HOST
value: "redis-service"
volumeMounts:
- name: config-volume
mountPath: /app/config
- name: logs-volume
mountPath: /app/logs
livenessProbe:
exec:
command:
- python
- scripts/health_check.py
initialDelaySeconds: 30
periodSeconds: 60
volumes:
- name: config-volume
configMap:
name: data-collection-config
- name: logs-volume
emptyDir: {}
Systemd Service
[Unit]
Description=Cryptocurrency Data Collection Service
After=network.target postgres.service redis.service
Requires=postgres.service redis.service
[Service]
Type=simple
User=crypto-collector
Group=crypto-collector
WorkingDirectory=/opt/crypto-dashboard
ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=10
KillMode=mixed
TimeoutStopSec=30
# Environment
Environment=PYTHONPATH=/opt/crypto-dashboard
Environment=LOG_LEVEL=INFO
# Security
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ReadWritePaths=/opt/crypto-dashboard/logs
[Install]
WantedBy=multi-user.target
Environment Configuration
# Production environment variables
export ENVIRONMENT=production
export POSTGRES_HOST=postgres.internal
export POSTGRES_PORT=5432
export POSTGRES_DB=crypto_dashboard
export POSTGRES_USER=dashboard_user
export POSTGRES_PASSWORD=secure_password
export REDIS_HOST=redis.internal
export REDIS_PORT=6379
# Service configuration
export DATA_COLLECTION_CONFIG=/etc/crypto-dashboard/data_collection.json
export LOG_LEVEL=INFO
export HEALTH_CHECK_INTERVAL=120
Monitoring and Alerting
Metrics Collection
The service exposes metrics for monitoring systems:
# Service metrics
service_uptime_hours = 24.5
collectors_running = 5
collectors_total = 6
errors_per_hour = 0.2
data_points_processed = 15000
Health Checks
# External health check endpoint
async def health_check():
service = DataCollectionService()
status = service.get_status()
if not status['service_running']:
return {'status': 'unhealthy', 'reason': 'service_stopped'}
if status['collectors_failed'] > status['collectors_total'] * 0.5:
return {'status': 'degraded', 'reason': 'too_many_failed_collectors'}
return {'status': 'healthy'}
Alerting Rules
# Prometheus alerting rules
groups:
- name: data_collection_service
rules:
- alert: DataCollectionServiceDown
expr: up{job="data-collection-service"} == 0
for: 5m
annotations:
summary: "Data collection service is down"
- alert: TooManyFailedCollectors
expr: collectors_failed / collectors_total > 0.5
for: 10m
annotations:
summary: "More than 50% of collectors have failed"
- alert: HighErrorRate
expr: rate(errors_total[5m]) > 0.1
for: 15m
annotations:
summary: "High error rate in data collection service"
Performance Considerations
Resource Usage
- Memory: ~150MB base + ~15MB per trading pair (including service overhead)
- CPU: Low (async I/O bound, service orchestration)
- Network: ~1KB/s per trading pair
- Storage: Service logs ~10MB/day
Scaling Strategies
- Horizontal Scaling: Multiple service instances with different configurations
- Configuration Partitioning: Separate services by exchange or asset class
- Load Balancing: Distribute trading pairs across service instances
- Regional Deployment: Deploy closer to exchange data centers
Optimization Tips
- Configuration Tuning: Optimize health check intervals and timeframes
- Resource Limits: Set appropriate memory and CPU limits
- Batch Operations: Use efficient database operations
- Monitoring Overhead: Balance monitoring frequency with performance
Troubleshooting
Common Service Issues
Service Won't Start
❌ Failed to start data collection service
Solutions:
- Check configuration file validity
- Verify database connectivity
- Ensure no port conflicts
- Check file permissions
Configuration Loading Failed
❌ Failed to load config from config/data_collection.json: Invalid JSON
Solutions:
- Validate JSON syntax
- Check required fields
- Verify file encoding (UTF-8)
- Recreate default configuration
No Collectors Created
❌ No collectors were successfully initialized
Solutions:
- Check exchange configuration
- Verify trading pair symbols
- Check network connectivity
- Review collector creation logs
Debug Mode
Enable verbose service debugging:
{
"logging": {
"level": "DEBUG",
"log_errors_only": false,
"verbose_data_logging": true
}
}
Service Diagnostics
# Run diagnostic check
from data.collection_service import DataCollectionService
service = DataCollectionService()
status = service.get_status()
print(f"Service Running: {status['service_running']}")
print(f"Configuration File: {status['configuration']['config_file']}")
print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")
# Check individual collector health
for collector_name in service.manager.list_collectors():
collector_status = service.manager.get_collector_status(collector_name)
print(f"{collector_name}: {collector_status['status']}")
Related Documentation
- Data Collectors System - Core collector components
- Logging System - Logging configuration
- Database Operations - Database integration
- Monitoring Guide - System monitoring setup