- Updated `pyproject.toml` to include the new `data` package in the build configuration, ensuring all components are properly included. - Introduced `ADR-004` documentation outlining the rationale for refactoring the data collection system into a modular architecture, addressing complexity and maintainability issues. - Enhanced `data_collectors.md` to reflect the new component structure, detailing responsibilities of `CollectorLifecycleManager`, `ManagerHealthMonitor`, `ManagerStatsTracker`, and `ManagerLogger`. - Refactored `DataCollectionService` to utilize the new modular components, improving orchestration and error handling. - Removed the obsolete `collector-service-tasks-optimization.md` and `refactor-common-package.md` files, streamlining the tasks documentation. These changes significantly improve the architecture and maintainability of the data collection service, aligning with project standards for modularity, performance, and documentation clarity.
32 KiB
Data Collection Service
Service for collecting and storing real-time market data from multiple exchanges.
Architecture Overview
The data collection service has been refactored into a modular, component-based architecture to collect data for multiple trading pairs concurrently with improved maintainability, scalability, and testability.
DataCollectionService: The primary orchestration layer, responsible for initializing and coordinating core service components. It delegates specific functionalities to dedicated managers and factories.CollectorManager: Now acts as an orchestrator for individual data collectors, utilizing its own set of internal components (e.g.,CollectorLifecycleManager,ManagerHealthMonitor,ManagerStatsTracker,ManagerLogger).- Dedicated Components: Specific concerns like configuration, collector creation, and asynchronous task management are handled by new, specialized classes (
ServiceConfig,CollectorFactory,AsyncTaskManager). OKXCollector: A dedicated worker responsible for collecting data for a single trading pair from the OKX exchange, now built upon a more robustBaseDataCollectorand its internal components (ConnectionManager,CollectorStateAndTelemetry,CallbackDispatcher).
This modular architecture allows for high scalability, fault tolerance, and clear separation of concerns.
Key Components
DataCollectionService
- Location:
data/collection_service.py - Responsibilities:
- Orchestrates the overall data collection process.
- Initializes and coordinates
ServiceConfig,CollectorFactory,CollectorManager, andAsyncTaskManager. - Manages the main service loop and graceful shutdown.
- Provides a high-level API for running and monitoring the service.
ServiceConfig
- Location:
config/service_config.py - Responsibilities:
- Handles loading, creating, and validating service configurations.
- Ensures configuration file integrity, including file permission validation.
- Manages default configuration generation and runtime updates.
CollectorFactory
- Location:
data/collector_factory.py - Responsibilities:
- Encapsulates the logic for creating individual data collector instances (e.g.,
OKXCollector). - Decouples collector instantiation from the
DataCollectionService. - Ensures collectors are created with correct configurations and dependencies.
- Encapsulates the logic for creating individual data collector instances (e.g.,
AsyncTaskManager
- Location:
utils/async_task_manager.py - Responsibilities:
- Manages and tracks
asyncio.Taskinstances throughout the application. - Prevents potential memory leaks by ensuring proper task lifecycle management.
- Facilitates robust asynchronous operations for both
DataCollectionServiceandCollectorManager.
- Manages and tracks
CollectorManager
- Location:
data/collector_manager.py - Responsibilities:
- Acts as an orchestrator for all active data collectors.
- Delegates specific responsibilities to its new internal components:
CollectorLifecycleManager: Manages adding, removing, starting, and stopping collectors.ManagerHealthMonitor: Encapsulates global health monitoring and auto-restart logic.ManagerStatsTracker: Handles performance statistics collection and caching.ManagerLogger: Centralizes logging operations for the manager and its collectors.
- Provides a unified interface for controlling and monitoring managed collectors.
OKXCollector
- Location:
data/exchanges/okx/collector.py - Responsibilities:
- Inherits from
BaseDataCollectorand implements exchange-specific data collection logic. - Utilizes
ConnectionManagerfor robust WebSocket connection management. - Leverages
CollectorStateAndTelemetryfor internal status, health, and logging. - Uses
CallbackDispatcherto notify registered consumers of processed data. - Subscribes to real-time data channels specific to OKX.
- Processes and standardizes incoming OKX data before dispatching.
- Stores processed data in the database.
- Inherits from
Configuration
The service is configured through config/bot_configs/data_collector_config.json:
{
"service_name": "data_collection_service",
"enabled": true,
"manager_config": {
"component_name": "collector_manager",
"health_check_interval": 60,
"log_level": "INFO",
"verbose": true
},
"collectors": [
{
"exchange": "okx",
"symbol": "BTC-USDT",
"data_types": ["trade", "orderbook"],
"enabled": true
},
{
"exchange": "okx",
"symbol": "ETH-USDT",
"data_types": ["trade"],
"enabled": true
}
]
}
Usage
The DataCollectionService is the main entry point for running the data collection system.
Start the service from a script (e.g., scripts/start_data_collection.py):
# scripts/start_data_collection.py
import asyncio
from data.collection_service import DataCollectionService
from utils.logger import setup_logging # Assuming this exists or is created
async def main():
setup_logging() # Initialize logging
service = DataCollectionService(config_path="config/data_collection.json")
await service.run() # Or run with a duration: await service.run(duration_hours=24)
if __name__ == "__main__":
asyncio.run(main())
Health & Monitoring
The DataCollectionService and CollectorManager provide comprehensive health and monitoring capabilities through their dedicated components.
Features
- Service Lifecycle Management: Start, stop, and monitor data collection operations
- JSON Configuration: File-based configuration with automatic defaults
- Clean Production Logging: Only essential operational information
- Health Monitoring: Service-level health checks and auto-recovery
- Graceful Shutdown: Proper signal handling and cleanup
- Multi-Exchange Orchestration: Coordinate collectors across multiple exchanges
- Production Ready: Designed for 24/7 operation with monitoring
Quick Start
Basic Usage
# Start with default configuration (indefinite run)
python scripts/start_data_collection.py
# Run for 8 hours
python scripts/start_data_collection.py --hours 8
# Use custom configuration
python scripts/start_data_collection.py --config config/my_config.json
Monitoring
# Check status once
python scripts/monitor_clean.py
# Monitor continuously every 60 seconds
python scripts/monitor_clean.py --interval 60
Configuration
The service uses JSON configuration files with automatic default creation if none exists.
Default Configuration Location
config/data_collection.json
Configuration Structure
{
"exchanges": {
"okx": {
"enabled": true,
"trading_pairs": [
{
"symbol": "BTC-USDT",
"enabled": true,
"data_types": ["trade"],
"timeframes": ["1m", "5m", "15m", "1h"]
},
{
"symbol": "ETH-USDT",
"enabled": true,
"data_types": ["trade"],
"timeframes": ["1m", "5m", "15m", "1h"]
}
]
}
},
"collection_settings": {
"health_check_interval": 120,
"store_raw_data": true,
"auto_restart": true,
"max_restart_attempts": 3
},
"logging": {
"level": "INFO",
"log_errors_only": true,
"verbose_data_logging": false
}
}
Configuration Options
Exchange Settings
- enabled: Whether to enable this exchange
- trading_pairs: Array of trading pair configurations
Trading Pair Settings
- symbol: Trading pair symbol (e.g., "BTC-USDT")
- enabled: Whether to collect data for this pair
- data_types: Types of data to collect (["trade"], ["ticker"], etc.)
- timeframes: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"])
Collection Settings
- health_check_interval: Health check frequency in seconds
- store_raw_data: Whether to store raw trade data
- auto_restart: Enable automatic restart on failures
- max_restart_attempts: Maximum restart attempts before giving up
Logging Settings
- level: Log level ("DEBUG", "INFO", "WARNING", "ERROR")
- log_errors_only: Only log errors and essential events
- verbose_data_logging: Enable verbose logging of individual trades/candles
Service Architecture
Service Layer Components
graph TD
subgraph DataCollectionService
SC[ServiceConfig] -- Manages --> Conf(Configuration)
SCF[CollectorFactory] -- Creates --> Collectors(Data Collectors)
ATM[AsyncTaskManager] -- Manages --> Tasks(Async Tasks)
DCS[DataCollectionService] -- Uses --> SC
DCS -- Uses --> SCF
DCS -- Uses --> ATM
DCS -- Orchestrates --> CM(CollectorManager)
end
subgraph CollectorManager
CM --> CLM(CollectorLifecycleManager)
CM --> MHM(ManagerHealthMonitor)
CM --> MST(ManagerStatsTracker)
CM --> ML(ManagerLogger)
CLM -- Manages --> BC[BaseDataCollector]
MHM -- Monitors --> BC
MST -- Tracks --> BC
ML -- Logs For --> BC
end
subgraph BaseDataCollector (Core Data Collector)
BC --> ConM(ConnectionManager)
BC --> CST(CollectorStateAndTelemetry)
BC --> CD(CallbackDispatcher)
end
Conf -- Provides --> DCS
Collectors -- Created By --> SCF
Tasks -- Managed By --> ATM
CM -- Manages --> BaseDataCollector
BaseDataCollector -- Collects Data --> Database
BaseDataCollector -- Publishes Data --> Redis(Redis Pub/Sub)
style DCS fill:#f9f,stroke:#333,stroke-width:2px
style CM fill:#bbf,stroke:#333,stroke-width:2px
style BC fill:#cfc,stroke:#333,stroke-width:2px
style SC fill:#FFD700,stroke:#333,stroke-width:1px
style SCF fill:#90EE90,stroke:#333,stroke-width:1px
style ATM fill:#ADD8E6,stroke:#333,stroke-width:1px
style CLM fill:#FFC0CB,stroke:#333,stroke-width:1px
style MHM fill:#C0C0C0,stroke:#333,stroke-width:1px
style MST fill:#DA70D6,stroke:#333,stroke-width:1px
style ML fill:#DDA0DD,stroke:#333,stroke-width:1px
style ConM fill:#F0F8FF,stroke:#333,stroke-width:1px
style CST fill:#FFE4E1,stroke:#333,stroke-width:1px
style CD fill:#FAFAD2,stroke:#333,stroke-width:1px
style DB fill:#A9A9A9,stroke:#333,stroke-width:1px
style Redis fill:#FF6347,stroke:#333,stroke-width:1px
Data Flow
graph LR
Config(Configuration) --> ServiceConfig
ServiceConfig --> DataCollectionService
DataCollectionService -- Initializes --> CollectorManager
DataCollectionService -- Initializes --> CollectorFactory
DataCollectionService -- Initializes --> AsyncTaskManager
CollectorFactory -- Creates --> BaseDataCollector
CollectorManager -- Manages --> BaseDataCollector
BaseDataCollector -- Collects Data --> Database
BaseDataCollector -- Publishes Data --> RedisPubSub(Redis Pub/Sub)
HealthMonitor(Health Monitoring) --> DataCollectionService
HealthMonitor --> CollectorManager
HealthMonitor --> BaseDataCollector
ErrorHandling(Error Handling) --> DataCollectionService
ErrorHandling --> CollectorManager
ErrorHandling --> BaseDataCollector
Storage Integration
- Raw Data: PostgreSQL
raw_tradestable via repository pattern - Candles: PostgreSQL
market_datatable with multiple timeframes - Real-time: Redis pub/sub for live data distribution
- Service Metrics: Service uptime, error counts, collector statistics
Logging Philosophy
The service implements clean production logging focused on operational needs:
What Gets Logged
✅ Service Lifecycle
- Service start/stop events
- Configuration loading
- Service initialization
✅ Collector Orchestration
- Collector creation and destruction
- Service-level health summaries
- Recovery operations
✅ Configuration Events
- Config file changes
- Runtime configuration updates
- Validation errors
✅ Service Statistics
- Periodic uptime reports
- Collection summary statistics
- Performance metrics
What Doesn't Get Logged
❌ Individual Data Points
- Every trade received
- Every candle generated
- Raw market data
❌ Internal Operations
- Individual collector heartbeats
- Routine database operations
- Internal processing steps
API Reference
DataCollectionService
The main service class for managing data collection operations, now orchestrating through specialized components.
Constructor
DataCollectionService(
config_path: str = "config/data_collection.json",
service_config: Optional[ServiceConfig] = None,
collector_factory: Optional[CollectorFactory] = None,
collector_manager: Optional[CollectorManager] = None,
async_task_manager: Optional[AsyncTaskManager] = None
)
Parameters:
config_path: Path to JSON configuration file. Used ifservice_configis not provided.service_config: An instance ofServiceConfig. If None, one will be created.collector_factory: An instance ofCollectorFactory. If None, one will be created.collector_manager: An instance ofCollectorManager. If None, one will be created.async_task_manager: An instance ofAsyncTaskManager. If None, one will be created.
Methods
async run(duration_hours: Optional[float] = None) -> None
Runs the service for a specified duration or indefinitely. This method now coordinates the main event loop and lifecycle of all internal components.
Parameters:
duration_hours: Optional duration in hours (None = indefinite).
Returns:
None
Example:
from data.collection_service import DataCollectionService
import asyncio
async def run_service():
service = DataCollectionService()
await service.run(duration_hours=24) # Run for 24 hours
if __name__ == "__main__":
asyncio.run(run_service())
async start() -> None
Initializes and starts the data collection service and all configured collectors. This method delegates to internal components for their respective startup procedures.
Returns:
None
async stop() -> None
Stops the service gracefully, including all collectors and internal cleanup. Ensures all asynchronous tasks are properly cancelled and resources released.
Returns:
None
get_status() -> Dict[str, Any]
Gets current service status, including uptime, collector counts, and errors, aggregated from underlying components.
Returns:
{
'service_running': True,
'uptime_hours': 12.5,
'collectors_total': 6,
'collectors_running': 5,
'collectors_failed': 1,
'errors_count': 2,
'last_error': 'Connection timeout for ETH-USDT',
'configuration': {
'config_file': 'config/data_collection.json',
'exchanges_enabled': ['okx'],
'total_trading_pairs': 6
},
'detailed_collector_statuses': { # New field for detailed statuses
'okx_BTC-USDT': {'status': 'RUNNING', 'health_score': 95},
'okx_ETH-USDT': {'status': 'ERROR', 'last_error': 'Connection refused'}
}
}
_run_main_loop(duration_hours: Optional[float])
Internal method extracted from run() to manage the core asynchronous loop.
Parameters:
duration_hours: Optional duration in hours for the loop.
Returns:
None
Standalone Function
run_data_collection_service(config_path, duration_hours)
async def run_data_collection_service(
config_path: str = "config/data_collection.json",
duration_hours: Optional[float] = None
) -> None
Convenience function to run the service with minimal setup, internally creating a DataCollectionService instance.
Parameters:
config_path: Path to configuration file.duration_hours: Optional duration in hours.
Returns:
None
Integration Examples
Basic Service Integration
import asyncio
from data.collection_service import DataCollectionService
from utils.logger import setup_logging # Assuming this exists or is created
async def main():
setup_logging()
service = DataCollectionService("config/my_config.json")
# Run for 24 hours
await service.run(duration_hours=24)
print("Service run finished.")
if __name__ == "__main__":
asyncio.run(main())
Custom Status Monitoring
import asyncio
from data.collection_service import DataCollectionService
from utils.logger import setup_logging
async def monitor_service():
setup_logging()
service = DataCollectionService()
# Start service in background
start_task = asyncio.create_task(service.run())
# Monitor status every 60 seconds
try:
while True:
status = service.get_status()
print(f"Service Uptime: {status['uptime_hours']:.1f}h")
print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")
print(f"Errors: {status['errors_count']}")
if status['errors_count'] > 0:
print(f"Last error: {status['last_error']}")
print("Detailed Collector Statuses:")
for name, details in status.get('detailed_collector_statuses', {}).items():
print(f" - {name}: Status={details.get('status')}, Health Score={details.get('health_score')}")
await asyncio.sleep(60)
except asyncio.CancelledError:
print("Monitoring cancelled.")
finally:
await service.stop()
await start_task # Ensure the main service task is awaited
asyncio.run(monitor_service())
Programmatic Control
import asyncio
from data.collection_service import DataCollectionService
from utils.logger import setup_logging
async def controlled_collection():
setup_logging()
service = DataCollectionService()
try:
# Start the service
await service.start()
print("Data collection service started.")
# Monitor and control
while True:
status = service.get_status()
print(f"Current Service Status: {status['service_running']}, Collectors Running: {status['collectors_running']}")
# Example: Stop if certain condition met (e.g., specific error, or after a duration)
if status['collectors_failed'] > 0:
print("Some collectors failed, service is recovering...")
# The service's internal health monitor and task manager will handle restarts
# For demonstration, stop after 5 minutes
await asyncio.sleep(300)
print("Stopping service after 5 minutes of operation.")
break
except KeyboardInterrupt:
print("Manual shutdown requested.")
finally:
print("Shutting down service gracefully...")
await service.stop()
print("Service stopped.")
if __name__ == "__main__":
asyncio.run(controlled_collection())
Configuration Management
import asyncio
import json
from data.collection_service import DataCollectionService
from utils.logger import setup_logging
from config.service_config import ServiceConfig # Import the new ServiceConfig
async def dynamic_configuration():
setup_logging()
# Instantiate ServiceConfig directly or let DataCollectionService create it
service_config_instance = ServiceConfig(config_path="config/data_collection.json")
service = DataCollectionService(service_config=service_config_instance)
print("Initial configuration loaded:")
print(json.dumps(service_config_instance.get_config(), indent=2))
# Load and modify configuration
config = service_config_instance.get_config()
# Add new trading pair if not already present
new_pair = {
'symbol': 'SOL-USDT',
'enabled': True,
'data_types': ['trade'],
'timeframes': ['1m', '5m']
}
if new_pair not in config['exchanges']['okx']['trading_pairs']:
config['exchanges']['okx']['trading_pairs'].append(new_pair)
print("Added SOL-USDT to configuration.")
else:
print("SOL-USDT already in configuration.")
# Save updated configuration
service_config_instance.save_config(config) # Use ServiceConfig to save
print("Updated configuration saved. Restarting service with new config...")
await service.stop()
await service.start()
print("Service restarted with updated configuration.")
# Verify new pair is active (logic would be in get_status or similar)
status = service.get_status()
print(f"Current active collectors count: {status['collectors_total']}")
if __name__ == "__main__":
asyncio.run(dynamic_configuration())
Error Handling
The service implements robust error handling at multiple layers, leveraging the new component structure for more precise error management and recovery.
Service Level Errors
- Configuration Errors: Invalid JSON, missing required fields, file permission issues (handled by
ServiceConfig). - Initialization Errors: Failed collector creation (handled by
CollectorFactory), database connectivity. - Runtime Errors: Service-level exceptions, resource exhaustion, unhandled exceptions in asynchronous tasks (managed by
AsyncTaskManager).
Error Recovery Strategies
- Graceful Degradation: Continue with healthy collectors while attempting to recover failed ones.
- Configuration Validation:
ServiceConfigvalidates configurations before application, preventing common startup issues. - Automated Restarts:
ManagerHealthMonitorandAsyncTaskManagercoordinate automatic restarts for failed collectors/tasks. - Error Aggregation:
ManagerStatsTrackercollects and reports errors across all collectors, providing a unified view. - Sanitized Error Messages:
ManagerLoggerensures sensitive internal details are not leaked in logs or public interfaces.
Error Reporting
# Service status includes aggregated error information
status = service.get_status()
if status['errors_count'] > 0:
print(f"Service has {status['errors_count']} errors.")
print(f"Last service error: {status['last_error']}")
# Get detailed error information from individual collectors if available
if 'detailed_collector_statuses' in status:
for collector_name, details in status['detailed_collector_statuses'].items():
if details.get('status') == 'ERROR' and 'last_error' in details:
print(f"Collector {collector_name} error: {details['last_error']}")
Testing
The testing approach now emphasizes unit tests for individual components and integration tests for component interactions, ensuring thorough coverage of the modular architecture.
Running Service Tests
# Run all data collection service tests
uv run pytest tests/data/collection_service -v # Assuming tests are in a 'collection_service' subdir
# Run specific component tests, e.g., for ServiceConfig
uv run pytest tests/config/test_service_config.py -v
# Run with coverage for the entire data collection module
uv run pytest --cov=data --cov=config --cov=utils tests/
Test Coverage
The expanded test suite now covers:
- Component Unit Tests: Individual tests for
ServiceConfig,CollectorFactory,AsyncTaskManager,CollectorLifecycleManager,ManagerHealthMonitor,ManagerStatsTracker,ManagerLogger. - Service Integration Tests: Testing
DataCollectionService's orchestration of its components. - Service initialization and configuration loading/validation.
- Collector orchestration and management via
CollectorManagerandCollectorLifecycleManager. - Asynchronous task management and error recovery.
- Service lifecycle (start/stop/restart) and signal handling.
- Status reporting and monitoring, including detailed collector statuses.
- Error aggregation and recovery strategies.
Mock Testing
import pytest
from unittest.mock import AsyncMock, patch
from data.collection_service import DataCollectionService
from config.service_config import ServiceConfig # Ensure new components are imported for mocking
@pytest.mark.asyncio
async def test_service_with_mock_components():
with patch('data.collection_service.ServiceConfig') as MockServiceConfig, \
patch('data.collection_service.CollectorFactory') as MockCollectorFactory, \
patch('data.collection_service.CollectorManager') as MockCollectorManager, \
patch('data.collection_service.AsyncTaskManager') as MockAsyncTaskManager:
# Configure mocks for successful operation
MockServiceConfig.return_value.load_config.return_value = {"collectors": []}
MockServiceConfig.return_value.get_config.return_value = {"collectors": []}
MockCollectorManager.return_value.start_all.return_value = None
MockCollectorManager.return_value.stop_all.return_value = None
MockAsyncTaskManager.return_value.start.return_value = None
MockAsyncTaskManager.return_value.stop.return_value = None
service = DataCollectionService(
service_config=MockServiceConfig.return_value,
collector_factory=MockCollectorFactory.return_value,
collector_manager=MockCollectorManager.return_value,
async_task_manager=MockAsyncTaskManager.return_value
)
await service.start()
# Assertions to ensure components were called correctly
MockServiceConfig.return_value.load_config.assert_called_once()
MockCollectorManager.return_value.start_all.assert_called_once()
MockAsyncTaskManager.return_value.start.assert_called_once()
await service.stop()
MockCollectorManager.return_value.stop_all.assert_called_once()
MockAsyncTaskManager.return_value.stop.assert_called_once()
Production Deployment
Docker Deployment
FROM python:3.11-slim
WORKDIR /app
COPY . .
# Install dependencies
RUN pip install uv
RUN uv pip install -r requirements.txt
# Create logs and config directories
RUN mkdir -p logs config
# Copy production configuration
COPY config/production.json config/data_collection.json
# Health check
HEALTHCHECK --interval=60s --timeout=10s --start-period=30s --retries=3 \
CMD python scripts/health_check.py || exit 1
# Run service
CMD ["python", "scripts/start_data_collection.py", "--config", "config/data_collection.json"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: data-collection-service
spec:
replicas: 1
selector:
matchLabels:
app: data-collection-service
template:
metadata:
labels:
app: data-collection-service
spec:
containers:
- name: data-collector
image: crypto-dashboard/data-collector:latest
ports:
- containerPort: 8080
env:
- name: POSTGRES_HOST
value: "postgres-service"
- name: REDIS_HOST
value: "redis-service"
volumeMounts:
- name: config-volume
mountPath: /app/config
- name: logs-volume
mountPath: /app/logs
livenessProbe:
exec:
command:
- python
- scripts/health_check.py
initialDelaySeconds: 30
periodSeconds: 60
volumes:
- name: config-volume
configMap:
name: data-collection-config
- name: logs-volume
emptyDir: {}
Systemd Service
[Unit]
Description=Cryptocurrency Data Collection Service
After=network.target postgres.service redis.service
Requires=postgres.service redis.service
[Service]
Type=simple
User=crypto-collector
Group=crypto-collector
WorkingDirectory=/opt/crypto-dashboard
ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=10
KillMode=mixed
TimeoutStopSec=30
# Environment
Environment=PYTHONPATH=/opt/crypto-dashboard
Environment=LOG_LEVEL=INFO
# Security
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ReadWritePaths=/opt/crypto-dashboard/logs
[Install]
WantedBy=multi-user.target
Environment Configuration
# Production environment variables
export ENVIRONMENT=production
export POSTGRES_HOST=postgres.internal
export POSTGRES_PORT=5432
export POSTGRES_DB=crypto_dashboard
export POSTGRES_USER=dashboard_user
export POSTGRES_PASSWORD=secure_password
export REDIS_HOST=redis.internal
export REDIS_PORT=6379
# Service configuration
export DATA_COLLECTION_CONFIG=/etc/crypto-dashboard/data_collection.json
export LOG_LEVEL=INFO
export HEALTH_CHECK_INTERVAL=120
Monitoring and Alerting
Metrics Collection
The service exposes metrics for monitoring systems:
# Service metrics
service_uptime_hours = 24.5
collectors_running = 5
collectors_total = 6
errors_per_hour = 0.2
data_points_processed = 15000
Health Checks
# External health check endpoint
async def health_check():
service = DataCollectionService()
status = service.get_status()
if not status['service_running']:
return {'status': 'unhealthy', 'reason': 'service_stopped'}
if status['collectors_failed'] > status['collectors_total'] * 0.5:
return {'status': 'degraded', 'reason': 'too_many_failed_collectors'}
return {'status': 'healthy'}
Alerting Rules
# Prometheus alerting rules
groups:
- name: data_collection_service
rules:
- alert: DataCollectionServiceDown
expr: up{job="data-collection-service"} == 0
for: 5m
annotations:
summary: "Data collection service is down"
- alert: TooManyFailedCollectors
expr: collectors_failed / collectors_total > 0.5
for: 10m
annotations:
summary: "More than 50% of collectors have failed"
- alert: HighErrorRate
expr: rate(errors_total[5m]) > 0.1
for: 15m
annotations:
summary: "High error rate in data collection service"
Performance Considerations
Resource Usage
- Memory: ~150MB base + ~15MB per trading pair (including service overhead)
- CPU: Low (async I/O bound, service orchestration)
- Network: ~1KB/s per trading pair
- Storage: Service logs ~10MB/day
Scaling Strategies
- Horizontal Scaling: Multiple service instances with different configurations
- Configuration Partitioning: Separate services by exchange or asset class
- Load Balancing: Distribute trading pairs across service instances
- Regional Deployment: Deploy closer to exchange data centers
Optimization Tips
- Configuration Tuning: Optimize health check intervals and timeframes
- Resource Limits: Set appropriate memory and CPU limits
- Batch Operations: Use efficient database operations
- Monitoring Overhead: Balance monitoring frequency with performance
Troubleshooting
Common Service Issues
Service Won't Start
❌ Failed to start data collection service
Solutions:
- Check configuration file validity
- Verify database connectivity
- Ensure no port conflicts
- Check file permissions
Configuration Loading Failed
❌ Failed to load config from config/data_collection.json: Invalid JSON
Solutions:
- Validate JSON syntax
- Check required fields
- Verify file encoding (UTF-8)
- Recreate default configuration
No Collectors Created
❌ No collectors were successfully initialized
Solutions:
- Check exchange configuration
- Verify trading pair symbols
- Check network connectivity
- Review collector creation logs
Debug Mode
Enable verbose service debugging:
{
"logging": {
"level": "DEBUG",
"log_errors_only": false,
"verbose_data_logging": true
}
}
Service Diagnostics
# Run diagnostic check
from data.collection_service import DataCollectionService
service = DataCollectionService()
status = service.get_status()
print(f"Service Running: {status['service_running']}")
print(f"Configuration File: {status['configuration']['config_file']}")
print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")
# Check individual collector health
for collector_name in service.manager.list_collectors():
collector_status = service.manager.get_collector_status(collector_name)
print(f"{collector_name}: {collector_status['status']}")
Related Documentation
- Data Collectors System - Comprehensive documentation on core collector components and their modular internal structure.
- Logging System - Details on logging configuration and philosophy.
- Database Operations - Information on database integration and persistence.
- Monitoring Guide - Setup for system monitoring and alerting.
- ADR-004: Modular Data Collector System Refactoring - Rationale and implications of the modular architecture.