TCPDashboard/docs/modules/services/data_collection_service.md

858 lines
23 KiB
Markdown
Raw Normal View History

2025-06-03 12:08:43 +08:00
# Data Collection Service
2025-06-06 20:33:29 +08:00
**Service for collecting and storing real-time market data from multiple exchanges.**
2025-06-03 12:08:43 +08:00
2025-06-06 20:33:29 +08:00
## Architecture Overview
2025-06-03 12:08:43 +08:00
2025-06-06 20:33:29 +08:00
The data collection service uses a **manager-worker architecture** to collect data for multiple trading pairs concurrently.
- **`CollectorManager`**: The central manager responsible for creating, starting, stopping, and monitoring individual data collectors.
- **`OKXCollector`**: A dedicated worker responsible for collecting data for a single trading pair from the OKX exchange.
This architecture allows for high scalability and fault tolerance.
## Key Components
### `CollectorManager`
- **Location**: `tasks/collector_manager.py`
- **Responsibilities**:
- Manages the lifecycle of multiple collectors
- Provides a unified API for controlling all collectors
- Monitors the health of each collector
- Distributes tasks and aggregates results
### `OKXCollector`
- **Location**: `data/exchanges/okx/collector.py`
- **Responsibilities**:
- Connects to the OKX WebSocket API
- Subscribes to real-time data channels
- Processes and standardizes incoming data
- Stores data in the database
## Configuration
The service is configured through `config/bot_configs/data_collector_config.json`:
```json
{
"service_name": "data_collection_service",
"enabled": true,
"manager_config": {
"component_name": "collector_manager",
"health_check_interval": 60,
"log_level": "INFO",
"verbose": true
},
"collectors": [
{
"exchange": "okx",
"symbol": "BTC-USDT",
"data_types": ["trade", "orderbook"],
"enabled": true
},
{
"exchange": "okx",
"symbol": "ETH-USDT",
"data_types": ["trade"],
"enabled": true
}
]
}
```
## Usage
Start the service from the main application entry point:
```python
# main.py
from tasks.collector_manager import CollectorManager
async def main():
manager = CollectorManager()
await manager.start_all_collectors()
if __name__ == "__main__":
asyncio.run(main())
```
## Health & Monitoring
The `CollectorManager` provides a `get_status()` method to monitor the health of all collectors.
2025-06-03 12:08:43 +08:00
## Features
- **Service Lifecycle Management**: Start, stop, and monitor data collection operations
- **JSON Configuration**: File-based configuration with automatic defaults
- **Clean Production Logging**: Only essential operational information
- **Health Monitoring**: Service-level health checks and auto-recovery
- **Graceful Shutdown**: Proper signal handling and cleanup
- **Multi-Exchange Orchestration**: Coordinate collectors across multiple exchanges
- **Production Ready**: Designed for 24/7 operation with monitoring
## Quick Start
### Basic Usage
```bash
# Start with default configuration (indefinite run)
python scripts/start_data_collection.py
# Run for 8 hours
python scripts/start_data_collection.py --hours 8
# Use custom configuration
python scripts/start_data_collection.py --config config/my_config.json
```
### Monitoring
```bash
# Check status once
python scripts/monitor_clean.py
# Monitor continuously every 60 seconds
python scripts/monitor_clean.py --interval 60
```
## Configuration
The service uses JSON configuration files with automatic default creation if none exists.
### Default Configuration Location
`config/data_collection.json`
### Configuration Structure
```json
{
"exchanges": {
"okx": {
"enabled": true,
"trading_pairs": [
{
"symbol": "BTC-USDT",
"enabled": true,
"data_types": ["trade"],
"timeframes": ["1m", "5m", "15m", "1h"]
},
{
"symbol": "ETH-USDT",
"enabled": true,
"data_types": ["trade"],
"timeframes": ["1m", "5m", "15m", "1h"]
}
]
}
},
"collection_settings": {
"health_check_interval": 120,
"store_raw_data": true,
"auto_restart": true,
"max_restart_attempts": 3
},
"logging": {
"level": "INFO",
"log_errors_only": true,
"verbose_data_logging": false
}
}
```
### Configuration Options
#### Exchange Settings
- **enabled**: Whether to enable this exchange
- **trading_pairs**: Array of trading pair configurations
#### Trading Pair Settings
- **symbol**: Trading pair symbol (e.g., "BTC-USDT")
- **enabled**: Whether to collect data for this pair
- **data_types**: Types of data to collect (["trade"], ["ticker"], etc.)
- **timeframes**: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"])
#### Collection Settings
- **health_check_interval**: Health check frequency in seconds
- **store_raw_data**: Whether to store raw trade data
- **auto_restart**: Enable automatic restart on failures
- **max_restart_attempts**: Maximum restart attempts before giving up
#### Logging Settings
- **level**: Log level ("DEBUG", "INFO", "WARNING", "ERROR")
- **log_errors_only**: Only log errors and essential events
- **verbose_data_logging**: Enable verbose logging of individual trades/candles
## Service Architecture
### Service Layer Components
```
┌─────────────────────────────────────────────────┐
│ DataCollectionService │
│ ┌─────────────────────────────────────────┐ │
│ │ Configuration Manager │ │
│ │ • JSON config loading/validation │ │
│ │ • Default config generation │ │
│ │ • Runtime config updates │ │
│ └─────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────┐ │
│ │ Service Monitor │ │
│ │ • Service-level health checks │ │
│ │ • Uptime tracking │ │
│ │ • Error aggregation │ │
│ └─────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────┐ │
│ │ CollectorManager │ │
│ │ • Individual collector management │ │
│ │ • Health monitoring │ │
│ │ • Auto-restart coordination │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
┌─────────────────────────────┐
│ Core Data Collectors │
│ (See data_collectors.md) │
└─────────────────────────────┘
```
### Data Flow
```
Configuration → Service → CollectorManager → Data Collectors → Database
↓ ↓
Service Monitor Health Monitor
```
### Storage Integration
- **Raw Data**: PostgreSQL `raw_trades` table via repository pattern
- **Candles**: PostgreSQL `market_data` table with multiple timeframes
- **Real-time**: Redis pub/sub for live data distribution
- **Service Metrics**: Service uptime, error counts, collector statistics
## Logging Philosophy
The service implements **clean production logging** focused on operational needs:
### What Gets Logged
**Service Lifecycle**
- Service start/stop events
- Configuration loading
- Service initialization
**Collector Orchestration**
- Collector creation and destruction
- Service-level health summaries
- Recovery operations
**Configuration Events**
- Config file changes
- Runtime configuration updates
- Validation errors
**Service Statistics**
- Periodic uptime reports
- Collection summary statistics
- Performance metrics
### What Doesn't Get Logged
**Individual Data Points**
- Every trade received
- Every candle generated
- Raw market data
**Internal Operations**
- Individual collector heartbeats
- Routine database operations
- Internal processing steps
## API Reference
### DataCollectionService
The main service class for managing data collection operations.
#### Constructor
```python
DataCollectionService(config_path: str = "config/data_collection.json")
```
**Parameters:**
- `config_path`: Path to JSON configuration file
#### Methods
##### `async run(duration_hours: Optional[float] = None) -> bool`
Run the service for a specified duration or indefinitely.
**Parameters:**
- `duration_hours`: Optional duration in hours (None = indefinite)
**Returns:**
- `bool`: True if successful, False if error occurred
**Example:**
```python
service = DataCollectionService()
await service.run(duration_hours=24) # Run for 24 hours
```
##### `async start() -> bool`
Start the data collection service and all configured collectors.
**Returns:**
- `bool`: True if started successfully
##### `async stop() -> None`
Stop the service gracefully, including all collectors and cleanup.
##### `get_status() -> Dict[str, Any]`
Get current service status including uptime, collector counts, and errors.
**Returns:**
```python
{
'service_running': True,
'uptime_hours': 12.5,
'collectors_total': 6,
'collectors_running': 5,
'collectors_failed': 1,
'errors_count': 2,
'last_error': 'Connection timeout for ETH-USDT',
'configuration': {
'config_file': 'config/data_collection.json',
'exchanges_enabled': ['okx'],
'total_trading_pairs': 6
}
}
```
##### `async initialize_collectors() -> bool`
Initialize all collectors based on configuration.
**Returns:**
- `bool`: True if all collectors initialized successfully
##### `load_configuration() -> Dict[str, Any]`
Load and validate configuration from file.
**Returns:**
- `dict`: Loaded configuration
### Standalone Function
#### `run_data_collection_service(config_path, duration_hours)`
```python
async def run_data_collection_service(
config_path: str = "config/data_collection.json",
duration_hours: Optional[float] = None
) -> bool
```
Convenience function to run the service with minimal setup.
**Parameters:**
- `config_path`: Path to configuration file
- `duration_hours`: Optional duration in hours
**Returns:**
- `bool`: True if successful
## Integration Examples
### Basic Service Integration
```python
import asyncio
from data.collection_service import DataCollectionService
async def main():
service = DataCollectionService("config/my_config.json")
# Run for 24 hours
success = await service.run(duration_hours=24)
if not success:
print("Service encountered errors")
if __name__ == "__main__":
asyncio.run(main())
```
### Custom Status Monitoring
```python
import asyncio
from data.collection_service import DataCollectionService
async def monitor_service():
service = DataCollectionService()
# Start service in background
start_task = asyncio.create_task(service.run())
# Monitor status every 5 minutes
while service.running:
status = service.get_status()
print(f"Service Uptime: {status['uptime_hours']:.1f}h")
print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")
print(f"Errors: {status['errors_count']}")
await asyncio.sleep(300) # 5 minutes
await start_task
asyncio.run(monitor_service())
```
### Programmatic Control
```python
import asyncio
from data.collection_service import DataCollectionService
async def controlled_collection():
service = DataCollectionService()
try:
# Initialize and start
await service.initialize_collectors()
await service.start()
# Monitor and control
while True:
status = service.get_status()
# Check if any collectors failed
if status['collectors_failed'] > 0:
print("Some collectors failed, checking health...")
# Service auto-restart will handle this
await asyncio.sleep(60) # Check every minute
except KeyboardInterrupt:
print("Shutting down service...")
finally:
await service.stop()
asyncio.run(controlled_collection())
```
### Configuration Management
```python
import asyncio
import json
from data.collection_service import DataCollectionService
async def dynamic_configuration():
service = DataCollectionService()
# Load and modify configuration
config = service.load_configuration()
# Add new trading pair
config['exchanges']['okx']['trading_pairs'].append({
'symbol': 'SOL-USDT',
'enabled': True,
'data_types': ['trade'],
'timeframes': ['1m', '5m']
})
# Save updated configuration
with open('config/data_collection.json', 'w') as f:
json.dump(config, f, indent=2)
# Restart service with new config
await service.stop()
await service.start()
asyncio.run(dynamic_configuration())
```
## Error Handling
The service implements robust error handling at the service orchestration level:
### Service Level Errors
- **Configuration Errors**: Invalid JSON, missing required fields
- **Initialization Errors**: Failed collector creation, database connectivity
- **Runtime Errors**: Service-level exceptions, resource exhaustion
### Error Recovery Strategies
1. **Graceful Degradation**: Continue with healthy collectors
2. **Configuration Validation**: Validate before applying changes
3. **Service Restart**: Full service restart on critical errors
4. **Error Aggregation**: Collect and report errors across all collectors
### Error Reporting
```python
# Service status includes error information
status = service.get_status()
if status['errors_count'] > 0:
print(f"Service has {status['errors_count']} errors")
print(f"Last error: {status['last_error']}")
# Get detailed error information from collectors
for collector_name in service.manager.list_collectors():
collector_status = service.manager.get_collector_status(collector_name)
if collector_status['status'] == 'error':
print(f"Collector {collector_name}: {collector_status['statistics']['last_error']}")
```
## Testing
### Running Service Tests
```bash
# Run all data collection service tests
uv run pytest tests/test_data_collection_service.py -v
# Run specific test categories
uv run pytest tests/test_data_collection_service.py::TestDataCollectionService -v
# Run with coverage
uv run pytest tests/test_data_collection_service.py --cov=data.collection_service
```
### Test Coverage
The service test suite covers:
- Service initialization and configuration loading
- Collector orchestration and management
- Service lifecycle (start/stop/restart)
- Configuration validation and error handling
- Signal handling and graceful shutdown
- Status reporting and monitoring
- Error aggregation and recovery
### Mock Testing
```python
import pytest
from unittest.mock import AsyncMock, patch
from data.collection_service import DataCollectionService
@pytest.mark.asyncio
async def test_service_with_mock_collectors():
with patch('data.collection_service.CollectorManager') as mock_manager:
# Mock successful initialization
mock_manager.return_value.start.return_value = True
service = DataCollectionService()
result = await service.start()
assert result is True
mock_manager.return_value.start.assert_called_once()
```
## Production Deployment
### Docker Deployment
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY . .
# Install dependencies
RUN pip install uv
RUN uv pip install -r requirements.txt
# Create logs and config directories
RUN mkdir -p logs config
# Copy production configuration
COPY config/production.json config/data_collection.json
# Health check
HEALTHCHECK --interval=60s --timeout=10s --start-period=30s --retries=3 \
CMD python scripts/health_check.py || exit 1
# Run service
CMD ["python", "scripts/start_data_collection.py", "--config", "config/data_collection.json"]
```
### Kubernetes Deployment
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: data-collection-service
spec:
replicas: 1
selector:
matchLabels:
app: data-collection-service
template:
metadata:
labels:
app: data-collection-service
spec:
containers:
- name: data-collector
image: crypto-dashboard/data-collector:latest
ports:
- containerPort: 8080
env:
- name: POSTGRES_HOST
value: "postgres-service"
- name: REDIS_HOST
value: "redis-service"
volumeMounts:
- name: config-volume
mountPath: /app/config
- name: logs-volume
mountPath: /app/logs
livenessProbe:
exec:
command:
- python
- scripts/health_check.py
initialDelaySeconds: 30
periodSeconds: 60
volumes:
- name: config-volume
configMap:
name: data-collection-config
- name: logs-volume
emptyDir: {}
```
### Systemd Service
```ini
[Unit]
Description=Cryptocurrency Data Collection Service
After=network.target postgres.service redis.service
Requires=postgres.service redis.service
[Service]
Type=simple
User=crypto-collector
Group=crypto-collector
WorkingDirectory=/opt/crypto-dashboard
ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=10
KillMode=mixed
TimeoutStopSec=30
# Environment
Environment=PYTHONPATH=/opt/crypto-dashboard
Environment=LOG_LEVEL=INFO
# Security
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ReadWritePaths=/opt/crypto-dashboard/logs
[Install]
WantedBy=multi-user.target
```
### Environment Configuration
```bash
# Production environment variables
export ENVIRONMENT=production
export POSTGRES_HOST=postgres.internal
export POSTGRES_PORT=5432
export POSTGRES_DB=crypto_dashboard
export POSTGRES_USER=dashboard_user
export POSTGRES_PASSWORD=secure_password
export REDIS_HOST=redis.internal
export REDIS_PORT=6379
# Service configuration
export DATA_COLLECTION_CONFIG=/etc/crypto-dashboard/data_collection.json
export LOG_LEVEL=INFO
export HEALTH_CHECK_INTERVAL=120
```
## Monitoring and Alerting
### Metrics Collection
The service exposes metrics for monitoring systems:
```python
# Service metrics
service_uptime_hours = 24.5
collectors_running = 5
collectors_total = 6
errors_per_hour = 0.2
data_points_processed = 15000
```
### Health Checks
```python
# External health check endpoint
async def health_check():
service = DataCollectionService()
status = service.get_status()
if not status['service_running']:
return {'status': 'unhealthy', 'reason': 'service_stopped'}
if status['collectors_failed'] > status['collectors_total'] * 0.5:
return {'status': 'degraded', 'reason': 'too_many_failed_collectors'}
return {'status': 'healthy'}
```
### Alerting Rules
```yaml
# Prometheus alerting rules
groups:
- name: data_collection_service
rules:
- alert: DataCollectionServiceDown
expr: up{job="data-collection-service"} == 0
for: 5m
annotations:
summary: "Data collection service is down"
- alert: TooManyFailedCollectors
expr: collectors_failed / collectors_total > 0.5
for: 10m
annotations:
summary: "More than 50% of collectors have failed"
- alert: HighErrorRate
expr: rate(errors_total[5m]) > 0.1
for: 15m
annotations:
summary: "High error rate in data collection service"
```
## Performance Considerations
### Resource Usage
- **Memory**: ~150MB base + ~15MB per trading pair (including service overhead)
- **CPU**: Low (async I/O bound, service orchestration)
- **Network**: ~1KB/s per trading pair
- **Storage**: Service logs ~10MB/day
### Scaling Strategies
1. **Horizontal Scaling**: Multiple service instances with different configurations
2. **Configuration Partitioning**: Separate services by exchange or asset class
3. **Load Balancing**: Distribute trading pairs across service instances
4. **Regional Deployment**: Deploy closer to exchange data centers
### Optimization Tips
1. **Configuration Tuning**: Optimize health check intervals and timeframes
2. **Resource Limits**: Set appropriate memory and CPU limits
3. **Batch Operations**: Use efficient database operations
4. **Monitoring Overhead**: Balance monitoring frequency with performance
## Troubleshooting
### Common Service Issues
#### Service Won't Start
```
❌ Failed to start data collection service
```
**Solutions:**
1. Check configuration file validity
2. Verify database connectivity
3. Ensure no port conflicts
4. Check file permissions
#### Configuration Loading Failed
```
❌ Failed to load config from config/data_collection.json: Invalid JSON
```
**Solutions:**
1. Validate JSON syntax
2. Check required fields
3. Verify file encoding (UTF-8)
4. Recreate default configuration
#### No Collectors Created
```
❌ No collectors were successfully initialized
```
**Solutions:**
1. Check exchange configuration
2. Verify trading pair symbols
3. Check network connectivity
4. Review collector creation logs
### Debug Mode
Enable verbose service debugging:
```json
{
"logging": {
"level": "DEBUG",
"log_errors_only": false,
"verbose_data_logging": true
}
}
```
### Service Diagnostics
```python
# Run diagnostic check
from data.collection_service import DataCollectionService
service = DataCollectionService()
status = service.get_status()
print(f"Service Running: {status['service_running']}")
print(f"Configuration File: {status['configuration']['config_file']}")
print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")
# Check individual collector health
for collector_name in service.manager.list_collectors():
collector_status = service.manager.get_collector_status(collector_name)
print(f"{collector_name}: {collector_status['status']}")
```
## Related Documentation
- [Data Collectors System](../components/data_collectors.md) - Core collector components
- [Logging System](../components/logging.md) - Logging configuration
- [Database Operations](../database/operations.md) - Database integration
- [Monitoring Guide](../monitoring/README.md) - System monitoring setup