- Updated `pyproject.toml` to include the new `data` package in the build configuration, ensuring all components are properly included. - Introduced `ADR-004` documentation outlining the rationale for refactoring the data collection system into a modular architecture, addressing complexity and maintainability issues. - Enhanced `data_collectors.md` to reflect the new component structure, detailing responsibilities of `CollectorLifecycleManager`, `ManagerHealthMonitor`, `ManagerStatsTracker`, and `ManagerLogger`. - Refactored `DataCollectionService` to utilize the new modular components, improving orchestration and error handling. - Removed the obsolete `collector-service-tasks-optimization.md` and `refactor-common-package.md` files, streamlining the tasks documentation. These changes significantly improve the architecture and maintainability of the data collection service, aligning with project standards for modularity, performance, and documentation clarity.
1024 lines
32 KiB
Markdown
1024 lines
32 KiB
Markdown
# Data Collection Service
|
|
|
|
**Service for collecting and storing real-time market data from multiple exchanges.**
|
|
|
|
## Architecture Overview
|
|
|
|
The data collection service has been refactored into a **modular, component-based architecture** to collect data for multiple trading pairs concurrently with improved maintainability, scalability, and testability.
|
|
|
|
- **`DataCollectionService`**: The primary orchestration layer, responsible for initializing and coordinating core service components. It delegates specific functionalities to dedicated managers and factories.
|
|
- **`CollectorManager`**: Now acts as an orchestrator for individual data collectors, utilizing its own set of internal components (e.g., `CollectorLifecycleManager`, `ManagerHealthMonitor`, `ManagerStatsTracker`, `ManagerLogger`).
|
|
- **Dedicated Components**: Specific concerns like configuration, collector creation, and asynchronous task management are handled by new, specialized classes (`ServiceConfig`, `CollectorFactory`, `AsyncTaskManager`).
|
|
- **`OKXCollector`**: A dedicated worker responsible for collecting data for a single trading pair from the OKX exchange, now built upon a more robust `BaseDataCollector` and its internal components (`ConnectionManager`, `CollectorStateAndTelemetry`, `CallbackDispatcher`).
|
|
|
|
This modular architecture allows for high scalability, fault tolerance, and clear separation of concerns.
|
|
|
|
## Key Components
|
|
|
|
### `DataCollectionService`
|
|
|
|
- **Location**: `data/collection_service.py`
|
|
- **Responsibilities**:
|
|
- Orchestrates the overall data collection process.
|
|
- Initializes and coordinates `ServiceConfig`, `CollectorFactory`, `CollectorManager`, and `AsyncTaskManager`.
|
|
- Manages the main service loop and graceful shutdown.
|
|
- Provides a high-level API for running and monitoring the service.
|
|
|
|
### `ServiceConfig`
|
|
|
|
- **Location**: `config/service_config.py`
|
|
- **Responsibilities**:
|
|
- Handles loading, creating, and validating service configurations.
|
|
- Ensures configuration file integrity, including file permission validation.
|
|
- Manages default configuration generation and runtime updates.
|
|
|
|
### `CollectorFactory`
|
|
|
|
- **Location**: `data/collector_factory.py`
|
|
- **Responsibilities**:
|
|
- Encapsulates the logic for creating individual data collector instances (e.g., `OKXCollector`).
|
|
- Decouples collector instantiation from the `DataCollectionService`.
|
|
- Ensures collectors are created with correct configurations and dependencies.
|
|
|
|
### `AsyncTaskManager`
|
|
|
|
- **Location**: `utils/async_task_manager.py`
|
|
- **Responsibilities**:
|
|
- Manages and tracks `asyncio.Task` instances throughout the application.
|
|
- Prevents potential memory leaks by ensuring proper task lifecycle management.
|
|
- Facilitates robust asynchronous operations for both `DataCollectionService` and `CollectorManager`.
|
|
|
|
### `CollectorManager`
|
|
|
|
- **Location**: `data/collector_manager.py`
|
|
- **Responsibilities**:
|
|
- Acts as an orchestrator for all active data collectors.
|
|
- Delegates specific responsibilities to its new internal components:
|
|
- `CollectorLifecycleManager`: Manages adding, removing, starting, and stopping collectors.
|
|
- `ManagerHealthMonitor`: Encapsulates global health monitoring and auto-restart logic.
|
|
- `ManagerStatsTracker`: Handles performance statistics collection and caching.
|
|
- `ManagerLogger`: Centralizes logging operations for the manager and its collectors.
|
|
- Provides a unified interface for controlling and monitoring managed collectors.
|
|
|
|
### `OKXCollector`
|
|
|
|
- **Location**: `data/exchanges/okx/collector.py`
|
|
- **Responsibilities**:
|
|
- Inherits from `BaseDataCollector` and implements exchange-specific data collection logic.
|
|
- Utilizes `ConnectionManager` for robust WebSocket connection management.
|
|
- Leverages `CollectorStateAndTelemetry` for internal status, health, and logging.
|
|
- Uses `CallbackDispatcher` to notify registered consumers of processed data.
|
|
- Subscribes to real-time data channels specific to OKX.
|
|
- Processes and standardizes incoming OKX data before dispatching.
|
|
- Stores processed data in the database.
|
|
|
|
## Configuration
|
|
|
|
The service is configured through `config/bot_configs/data_collector_config.json`:
|
|
|
|
```json
|
|
{
|
|
"service_name": "data_collection_service",
|
|
"enabled": true,
|
|
"manager_config": {
|
|
"component_name": "collector_manager",
|
|
"health_check_interval": 60,
|
|
"log_level": "INFO",
|
|
"verbose": true
|
|
},
|
|
"collectors": [
|
|
{
|
|
"exchange": "okx",
|
|
"symbol": "BTC-USDT",
|
|
"data_types": ["trade", "orderbook"],
|
|
"enabled": true
|
|
},
|
|
{
|
|
"exchange": "okx",
|
|
"symbol": "ETH-USDT",
|
|
"data_types": ["trade"],
|
|
"enabled": true
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
## Usage
|
|
|
|
The `DataCollectionService` is the main entry point for running the data collection system.
|
|
|
|
Start the service from a script (e.g., `scripts/start_data_collection.py`):
|
|
|
|
```python
|
|
# scripts/start_data_collection.py
|
|
import asyncio
|
|
from data.collection_service import DataCollectionService
|
|
from utils.logger import setup_logging # Assuming this exists or is created
|
|
|
|
async def main():
|
|
setup_logging() # Initialize logging
|
|
service = DataCollectionService(config_path="config/data_collection.json")
|
|
await service.run() # Or run with a duration: await service.run(duration_hours=24)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|
|
```
|
|
|
|
## Health & Monitoring
|
|
|
|
The `DataCollectionService` and `CollectorManager` provide comprehensive health and monitoring capabilities through their dedicated components.
|
|
|
|
## Features
|
|
|
|
- **Service Lifecycle Management**: Start, stop, and monitor data collection operations
|
|
- **JSON Configuration**: File-based configuration with automatic defaults
|
|
- **Clean Production Logging**: Only essential operational information
|
|
- **Health Monitoring**: Service-level health checks and auto-recovery
|
|
- **Graceful Shutdown**: Proper signal handling and cleanup
|
|
- **Multi-Exchange Orchestration**: Coordinate collectors across multiple exchanges
|
|
- **Production Ready**: Designed for 24/7 operation with monitoring
|
|
|
|
## Quick Start
|
|
|
|
### Basic Usage
|
|
|
|
```bash
|
|
# Start with default configuration (indefinite run)
|
|
python scripts/start_data_collection.py
|
|
|
|
# Run for 8 hours
|
|
python scripts/start_data_collection.py --hours 8
|
|
|
|
# Use custom configuration
|
|
python scripts/start_data_collection.py --config config/my_config.json
|
|
```
|
|
|
|
### Monitoring
|
|
|
|
```bash
|
|
# Check status once
|
|
python scripts/monitor_clean.py
|
|
|
|
# Monitor continuously every 60 seconds
|
|
python scripts/monitor_clean.py --interval 60
|
|
```
|
|
|
|
## Configuration
|
|
|
|
The service uses JSON configuration files with automatic default creation if none exists.
|
|
|
|
### Default Configuration Location
|
|
|
|
`config/data_collection.json`
|
|
|
|
### Configuration Structure
|
|
|
|
```json
|
|
{
|
|
"exchanges": {
|
|
"okx": {
|
|
"enabled": true,
|
|
"trading_pairs": [
|
|
{
|
|
"symbol": "BTC-USDT",
|
|
"enabled": true,
|
|
"data_types": ["trade"],
|
|
"timeframes": ["1m", "5m", "15m", "1h"]
|
|
},
|
|
{
|
|
"symbol": "ETH-USDT",
|
|
"enabled": true,
|
|
"data_types": ["trade"],
|
|
"timeframes": ["1m", "5m", "15m", "1h"]
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"collection_settings": {
|
|
"health_check_interval": 120,
|
|
"store_raw_data": true,
|
|
"auto_restart": true,
|
|
"max_restart_attempts": 3
|
|
},
|
|
"logging": {
|
|
"level": "INFO",
|
|
"log_errors_only": true,
|
|
"verbose_data_logging": false
|
|
}
|
|
}
|
|
```
|
|
|
|
### Configuration Options
|
|
|
|
#### Exchange Settings
|
|
|
|
- **enabled**: Whether to enable this exchange
|
|
- **trading_pairs**: Array of trading pair configurations
|
|
|
|
#### Trading Pair Settings
|
|
|
|
- **symbol**: Trading pair symbol (e.g., "BTC-USDT")
|
|
- **enabled**: Whether to collect data for this pair
|
|
- **data_types**: Types of data to collect (["trade"], ["ticker"], etc.)
|
|
- **timeframes**: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"])
|
|
|
|
#### Collection Settings
|
|
|
|
- **health_check_interval**: Health check frequency in seconds
|
|
- **store_raw_data**: Whether to store raw trade data
|
|
- **auto_restart**: Enable automatic restart on failures
|
|
- **max_restart_attempts**: Maximum restart attempts before giving up
|
|
|
|
#### Logging Settings
|
|
|
|
- **level**: Log level ("DEBUG", "INFO", "WARNING", "ERROR")
|
|
- **log_errors_only**: Only log errors and essential events
|
|
- **verbose_data_logging**: Enable verbose logging of individual trades/candles
|
|
|
|
## Service Architecture
|
|
|
|
### Service Layer Components
|
|
|
|
```mermaid
|
|
graph TD
|
|
subgraph DataCollectionService
|
|
SC[ServiceConfig] -- Manages --> Conf(Configuration)
|
|
SCF[CollectorFactory] -- Creates --> Collectors(Data Collectors)
|
|
ATM[AsyncTaskManager] -- Manages --> Tasks(Async Tasks)
|
|
DCS[DataCollectionService] -- Uses --> SC
|
|
DCS -- Uses --> SCF
|
|
DCS -- Uses --> ATM
|
|
DCS -- Orchestrates --> CM(CollectorManager)
|
|
end
|
|
|
|
subgraph CollectorManager
|
|
CM --> CLM(CollectorLifecycleManager)
|
|
CM --> MHM(ManagerHealthMonitor)
|
|
CM --> MST(ManagerStatsTracker)
|
|
CM --> ML(ManagerLogger)
|
|
CLM -- Manages --> BC[BaseDataCollector]
|
|
MHM -- Monitors --> BC
|
|
MST -- Tracks --> BC
|
|
ML -- Logs For --> BC
|
|
end
|
|
|
|
subgraph BaseDataCollector (Core Data Collector)
|
|
BC --> ConM(ConnectionManager)
|
|
BC --> CST(CollectorStateAndTelemetry)
|
|
BC --> CD(CallbackDispatcher)
|
|
end
|
|
|
|
Conf -- Provides --> DCS
|
|
Collectors -- Created By --> SCF
|
|
Tasks -- Managed By --> ATM
|
|
CM -- Manages --> BaseDataCollector
|
|
BaseDataCollector -- Collects Data --> Database
|
|
BaseDataCollector -- Publishes Data --> Redis(Redis Pub/Sub)
|
|
|
|
style DCS fill:#f9f,stroke:#333,stroke-width:2px
|
|
style CM fill:#bbf,stroke:#333,stroke-width:2px
|
|
style BC fill:#cfc,stroke:#333,stroke-width:2px
|
|
style SC fill:#FFD700,stroke:#333,stroke-width:1px
|
|
style SCF fill:#90EE90,stroke:#333,stroke-width:1px
|
|
style ATM fill:#ADD8E6,stroke:#333,stroke-width:1px
|
|
style CLM fill:#FFC0CB,stroke:#333,stroke-width:1px
|
|
style MHM fill:#C0C0C0,stroke:#333,stroke-width:1px
|
|
style MST fill:#DA70D6,stroke:#333,stroke-width:1px
|
|
style ML fill:#DDA0DD,stroke:#333,stroke-width:1px
|
|
style ConM fill:#F0F8FF,stroke:#333,stroke-width:1px
|
|
style CST fill:#FFE4E1,stroke:#333,stroke-width:1px
|
|
style CD fill:#FAFAD2,stroke:#333,stroke-width:1px
|
|
style DB fill:#A9A9A9,stroke:#333,stroke-width:1px
|
|
style Redis fill:#FF6347,stroke:#333,stroke-width:1px
|
|
```
|
|
|
|
### Data Flow
|
|
|
|
```mermaid
|
|
graph LR
|
|
Config(Configuration) --> ServiceConfig
|
|
ServiceConfig --> DataCollectionService
|
|
DataCollectionService -- Initializes --> CollectorManager
|
|
DataCollectionService -- Initializes --> CollectorFactory
|
|
DataCollectionService -- Initializes --> AsyncTaskManager
|
|
CollectorFactory -- Creates --> BaseDataCollector
|
|
CollectorManager -- Manages --> BaseDataCollector
|
|
BaseDataCollector -- Collects Data --> Database
|
|
BaseDataCollector -- Publishes Data --> RedisPubSub(Redis Pub/Sub)
|
|
HealthMonitor(Health Monitoring) --> DataCollectionService
|
|
HealthMonitor --> CollectorManager
|
|
HealthMonitor --> BaseDataCollector
|
|
ErrorHandling(Error Handling) --> DataCollectionService
|
|
ErrorHandling --> CollectorManager
|
|
ErrorHandling --> BaseDataCollector
|
|
```
|
|
|
|
### Storage Integration
|
|
|
|
- **Raw Data**: PostgreSQL `raw_trades` table via repository pattern
|
|
- **Candles**: PostgreSQL `market_data` table with multiple timeframes
|
|
- **Real-time**: Redis pub/sub for live data distribution
|
|
- **Service Metrics**: Service uptime, error counts, collector statistics
|
|
|
|
## Logging Philosophy
|
|
|
|
The service implements **clean production logging** focused on operational needs:
|
|
|
|
### What Gets Logged
|
|
|
|
✅ **Service Lifecycle**
|
|
- Service start/stop events
|
|
- Configuration loading
|
|
- Service initialization
|
|
|
|
✅ **Collector Orchestration**
|
|
- Collector creation and destruction
|
|
- Service-level health summaries
|
|
- Recovery operations
|
|
|
|
✅ **Configuration Events**
|
|
- Config file changes
|
|
- Runtime configuration updates
|
|
- Validation errors
|
|
|
|
✅ **Service Statistics**
|
|
- Periodic uptime reports
|
|
- Collection summary statistics
|
|
- Performance metrics
|
|
|
|
### What Doesn't Get Logged
|
|
|
|
❌ **Individual Data Points**
|
|
- Every trade received
|
|
- Every candle generated
|
|
- Raw market data
|
|
|
|
❌ **Internal Operations**
|
|
- Individual collector heartbeats
|
|
- Routine database operations
|
|
- Internal processing steps
|
|
|
|
## API Reference
|
|
|
|
### DataCollectionService
|
|
|
|
The main service class for managing data collection operations, now orchestrating through specialized components.
|
|
|
|
#### Constructor
|
|
|
|
```python
|
|
DataCollectionService(
|
|
config_path: str = "config/data_collection.json",
|
|
service_config: Optional[ServiceConfig] = None,
|
|
collector_factory: Optional[CollectorFactory] = None,
|
|
collector_manager: Optional[CollectorManager] = None,
|
|
async_task_manager: Optional[AsyncTaskManager] = None
|
|
)
|
|
```
|
|
|
|
**Parameters:**
|
|
- `config_path`: Path to JSON configuration file. Used if `service_config` is not provided.
|
|
- `service_config`: An instance of `ServiceConfig`. If None, one will be created.
|
|
- `collector_factory`: An instance of `CollectorFactory`. If None, one will be created.
|
|
- `collector_manager`: An instance of `CollectorManager`. If None, one will be created.
|
|
- `async_task_manager`: An instance of `AsyncTaskManager`. If None, one will be created.
|
|
|
|
#### Methods
|
|
|
|
##### `async run(duration_hours: Optional[float] = None) -> None`
|
|
|
|
Runs the service for a specified duration or indefinitely. This method now coordinates the main event loop and lifecycle of all internal components.
|
|
|
|
**Parameters:**
|
|
- `duration_hours`: Optional duration in hours (None = indefinite).
|
|
|
|
**Returns:**
|
|
- `None`
|
|
|
|
**Example:**
|
|
```python
|
|
from data.collection_service import DataCollectionService
|
|
import asyncio
|
|
|
|
async def run_service():
|
|
service = DataCollectionService()
|
|
await service.run(duration_hours=24) # Run for 24 hours
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_service())
|
|
```
|
|
|
|
##### `async start() -> None`
|
|
|
|
Initializes and starts the data collection service and all configured collectors. This method delegates to internal components for their respective startup procedures.
|
|
|
|
**Returns:**
|
|
- `None`
|
|
|
|
##### `async stop() -> None`
|
|
|
|
Stops the service gracefully, including all collectors and internal cleanup. Ensures all asynchronous tasks are properly cancelled and resources released.
|
|
|
|
**Returns:**
|
|
- `None`
|
|
|
|
##### `get_status() -> Dict[str, Any]`
|
|
|
|
Gets current service status, including uptime, collector counts, and errors, aggregated from underlying components.
|
|
|
|
**Returns:**
|
|
```python
|
|
{
|
|
'service_running': True,
|
|
'uptime_hours': 12.5,
|
|
'collectors_total': 6,
|
|
'collectors_running': 5,
|
|
'collectors_failed': 1,
|
|
'errors_count': 2,
|
|
'last_error': 'Connection timeout for ETH-USDT',
|
|
'configuration': {
|
|
'config_file': 'config/data_collection.json',
|
|
'exchanges_enabled': ['okx'],
|
|
'total_trading_pairs': 6
|
|
},
|
|
'detailed_collector_statuses': { # New field for detailed statuses
|
|
'okx_BTC-USDT': {'status': 'RUNNING', 'health_score': 95},
|
|
'okx_ETH-USDT': {'status': 'ERROR', 'last_error': 'Connection refused'}
|
|
}
|
|
}
|
|
```
|
|
|
|
##### `_run_main_loop(duration_hours: Optional[float])`
|
|
|
|
Internal method extracted from `run()` to manage the core asynchronous loop.
|
|
|
|
**Parameters:**
|
|
- `duration_hours`: Optional duration in hours for the loop.
|
|
|
|
**Returns:**
|
|
- `None`
|
|
|
|
### Standalone Function
|
|
|
|
#### `run_data_collection_service(config_path, duration_hours)`
|
|
|
|
```python
|
|
async def run_data_collection_service(
|
|
config_path: str = "config/data_collection.json",
|
|
duration_hours: Optional[float] = None
|
|
) -> None
|
|
```
|
|
|
|
Convenience function to run the service with minimal setup, internally creating a `DataCollectionService` instance.
|
|
|
|
**Parameters:**
|
|
- `config_path`: Path to configuration file.
|
|
- `duration_hours`: Optional duration in hours.
|
|
|
|
**Returns:**
|
|
- `None`
|
|
|
|
## Integration Examples
|
|
|
|
### Basic Service Integration
|
|
|
|
```python
|
|
import asyncio
|
|
from data.collection_service import DataCollectionService
|
|
from utils.logger import setup_logging # Assuming this exists or is created
|
|
|
|
async def main():
|
|
setup_logging()
|
|
service = DataCollectionService("config/my_config.json")
|
|
|
|
# Run for 24 hours
|
|
await service.run(duration_hours=24)
|
|
|
|
print("Service run finished.")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|
|
```
|
|
|
|
### Custom Status Monitoring
|
|
|
|
```python
|
|
import asyncio
|
|
from data.collection_service import DataCollectionService
|
|
from utils.logger import setup_logging
|
|
|
|
async def monitor_service():
|
|
setup_logging()
|
|
service = DataCollectionService()
|
|
|
|
# Start service in background
|
|
start_task = asyncio.create_task(service.run())
|
|
|
|
# Monitor status every 60 seconds
|
|
try:
|
|
while True:
|
|
status = service.get_status()
|
|
print(f"Service Uptime: {status['uptime_hours']:.1f}h")
|
|
print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")
|
|
print(f"Errors: {status['errors_count']}")
|
|
if status['errors_count'] > 0:
|
|
print(f"Last error: {status['last_error']}")
|
|
print("Detailed Collector Statuses:")
|
|
for name, details in status.get('detailed_collector_statuses', {}).items():
|
|
print(f" - {name}: Status={details.get('status')}, Health Score={details.get('health_score')}")
|
|
|
|
await asyncio.sleep(60)
|
|
except asyncio.CancelledError:
|
|
print("Monitoring cancelled.")
|
|
finally:
|
|
await service.stop()
|
|
await start_task # Ensure the main service task is awaited
|
|
|
|
asyncio.run(monitor_service())
|
|
```
|
|
|
|
### Programmatic Control
|
|
|
|
```python
|
|
import asyncio
|
|
from data.collection_service import DataCollectionService
|
|
from utils.logger import setup_logging
|
|
|
|
async def controlled_collection():
|
|
setup_logging()
|
|
service = DataCollectionService()
|
|
|
|
try:
|
|
# Start the service
|
|
await service.start()
|
|
print("Data collection service started.")
|
|
|
|
# Monitor and control
|
|
while True:
|
|
status = service.get_status()
|
|
print(f"Current Service Status: {status['service_running']}, Collectors Running: {status['collectors_running']}")
|
|
|
|
# Example: Stop if certain condition met (e.g., specific error, or after a duration)
|
|
if status['collectors_failed'] > 0:
|
|
print("Some collectors failed, service is recovering...")
|
|
# The service's internal health monitor and task manager will handle restarts
|
|
# For demonstration, stop after 5 minutes
|
|
await asyncio.sleep(300)
|
|
print("Stopping service after 5 minutes of operation.")
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
print("Manual shutdown requested.")
|
|
finally:
|
|
print("Shutting down service gracefully...")
|
|
await service.stop()
|
|
print("Service stopped.")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(controlled_collection())
|
|
```
|
|
|
|
### Configuration Management
|
|
|
|
```python
|
|
import asyncio
|
|
import json
|
|
from data.collection_service import DataCollectionService
|
|
from utils.logger import setup_logging
|
|
from config.service_config import ServiceConfig # Import the new ServiceConfig
|
|
|
|
async def dynamic_configuration():
|
|
setup_logging()
|
|
# Instantiate ServiceConfig directly or let DataCollectionService create it
|
|
service_config_instance = ServiceConfig(config_path="config/data_collection.json")
|
|
service = DataCollectionService(service_config=service_config_instance)
|
|
|
|
print("Initial configuration loaded:")
|
|
print(json.dumps(service_config_instance.get_config(), indent=2))
|
|
|
|
# Load and modify configuration
|
|
config = service_config_instance.get_config()
|
|
|
|
# Add new trading pair if not already present
|
|
new_pair = {
|
|
'symbol': 'SOL-USDT',
|
|
'enabled': True,
|
|
'data_types': ['trade'],
|
|
'timeframes': ['1m', '5m']
|
|
}
|
|
if new_pair not in config['exchanges']['okx']['trading_pairs']:
|
|
config['exchanges']['okx']['trading_pairs'].append(new_pair)
|
|
print("Added SOL-USDT to configuration.")
|
|
else:
|
|
print("SOL-USDT already in configuration.")
|
|
|
|
# Save updated configuration
|
|
service_config_instance.save_config(config) # Use ServiceConfig to save
|
|
|
|
print("Updated configuration saved. Restarting service with new config...")
|
|
await service.stop()
|
|
await service.start()
|
|
print("Service restarted with updated configuration.")
|
|
|
|
# Verify new pair is active (logic would be in get_status or similar)
|
|
status = service.get_status()
|
|
print(f"Current active collectors count: {status['collectors_total']}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(dynamic_configuration())
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
The service implements robust error handling at multiple layers, leveraging the new component structure for more precise error management and recovery.
|
|
|
|
### Service Level Errors
|
|
|
|
- **Configuration Errors**: Invalid JSON, missing required fields, file permission issues (handled by `ServiceConfig`).
|
|
- **Initialization Errors**: Failed collector creation (handled by `CollectorFactory`), database connectivity.
|
|
- **Runtime Errors**: Service-level exceptions, resource exhaustion, unhandled exceptions in asynchronous tasks (managed by `AsyncTaskManager`).
|
|
|
|
### Error Recovery Strategies
|
|
|
|
1. **Graceful Degradation**: Continue with healthy collectors while attempting to recover failed ones.
|
|
2. **Configuration Validation**: `ServiceConfig` validates configurations before application, preventing common startup issues.
|
|
3. **Automated Restarts**: `ManagerHealthMonitor` and `AsyncTaskManager` coordinate automatic restarts for failed collectors/tasks.
|
|
4. **Error Aggregation**: `ManagerStatsTracker` collects and reports errors across all collectors, providing a unified view.
|
|
5. **Sanitized Error Messages**: `ManagerLogger` ensures sensitive internal details are not leaked in logs or public interfaces.
|
|
|
|
### Error Reporting
|
|
|
|
```python
|
|
# Service status includes aggregated error information
|
|
status = service.get_status()
|
|
|
|
if status['errors_count'] > 0:
|
|
print(f"Service has {status['errors_count']} errors.")
|
|
print(f"Last service error: {status['last_error']}")
|
|
|
|
# Get detailed error information from individual collectors if available
|
|
if 'detailed_collector_statuses' in status:
|
|
for collector_name, details in status['detailed_collector_statuses'].items():
|
|
if details.get('status') == 'ERROR' and 'last_error' in details:
|
|
print(f"Collector {collector_name} error: {details['last_error']}")
|
|
```
|
|
|
|
## Testing
|
|
|
|
The testing approach now emphasizes unit tests for individual components and integration tests for component interactions, ensuring thorough coverage of the modular architecture.
|
|
|
|
### Running Service Tests
|
|
|
|
```bash
|
|
# Run all data collection service tests
|
|
uv run pytest tests/data/collection_service -v # Assuming tests are in a 'collection_service' subdir
|
|
|
|
# Run specific component tests, e.g., for ServiceConfig
|
|
uv run pytest tests/config/test_service_config.py -v
|
|
|
|
# Run with coverage for the entire data collection module
|
|
uv run pytest --cov=data --cov=config --cov=utils tests/
|
|
```
|
|
|
|
### Test Coverage
|
|
|
|
The expanded test suite now covers:
|
|
- **Component Unit Tests**: Individual tests for `ServiceConfig`, `CollectorFactory`, `AsyncTaskManager`, `CollectorLifecycleManager`, `ManagerHealthMonitor`, `ManagerStatsTracker`, `ManagerLogger`.
|
|
- **Service Integration Tests**: Testing `DataCollectionService`'s orchestration of its components.
|
|
- Service initialization and configuration loading/validation.
|
|
- Collector orchestration and management via `CollectorManager` and `CollectorLifecycleManager`.
|
|
- Asynchronous task management and error recovery.
|
|
- Service lifecycle (start/stop/restart) and signal handling.
|
|
- Status reporting and monitoring, including detailed collector statuses.
|
|
- Error aggregation and recovery strategies.
|
|
|
|
### Mock Testing
|
|
|
|
```python
|
|
import pytest
|
|
from unittest.mock import AsyncMock, patch
|
|
from data.collection_service import DataCollectionService
|
|
from config.service_config import ServiceConfig # Ensure new components are imported for mocking
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_service_with_mock_components():
|
|
with patch('data.collection_service.ServiceConfig') as MockServiceConfig, \
|
|
patch('data.collection_service.CollectorFactory') as MockCollectorFactory, \
|
|
patch('data.collection_service.CollectorManager') as MockCollectorManager, \
|
|
patch('data.collection_service.AsyncTaskManager') as MockAsyncTaskManager:
|
|
|
|
# Configure mocks for successful operation
|
|
MockServiceConfig.return_value.load_config.return_value = {"collectors": []}
|
|
MockServiceConfig.return_value.get_config.return_value = {"collectors": []}
|
|
MockCollectorManager.return_value.start_all.return_value = None
|
|
MockCollectorManager.return_value.stop_all.return_value = None
|
|
MockAsyncTaskManager.return_value.start.return_value = None
|
|
MockAsyncTaskManager.return_value.stop.return_value = None
|
|
|
|
service = DataCollectionService(
|
|
service_config=MockServiceConfig.return_value,
|
|
collector_factory=MockCollectorFactory.return_value,
|
|
collector_manager=MockCollectorManager.return_value,
|
|
async_task_manager=MockAsyncTaskManager.return_value
|
|
)
|
|
await service.start()
|
|
|
|
# Assertions to ensure components were called correctly
|
|
MockServiceConfig.return_value.load_config.assert_called_once()
|
|
MockCollectorManager.return_value.start_all.assert_called_once()
|
|
MockAsyncTaskManager.return_value.start.assert_called_once()
|
|
|
|
await service.stop()
|
|
MockCollectorManager.return_value.stop_all.assert_called_once()
|
|
MockAsyncTaskManager.return_value.stop.assert_called_once()
|
|
```
|
|
|
|
## Production Deployment
|
|
|
|
### Docker Deployment
|
|
|
|
```dockerfile
|
|
FROM python:3.11-slim
|
|
|
|
WORKDIR /app
|
|
COPY . .
|
|
|
|
# Install dependencies
|
|
RUN pip install uv
|
|
RUN uv pip install -r requirements.txt
|
|
|
|
# Create logs and config directories
|
|
RUN mkdir -p logs config
|
|
|
|
# Copy production configuration
|
|
COPY config/production.json config/data_collection.json
|
|
|
|
# Health check
|
|
HEALTHCHECK --interval=60s --timeout=10s --start-period=30s --retries=3 \
|
|
CMD python scripts/health_check.py || exit 1
|
|
|
|
# Run service
|
|
CMD ["python", "scripts/start_data_collection.py", "--config", "config/data_collection.json"]
|
|
```
|
|
|
|
### Kubernetes Deployment
|
|
|
|
```yaml
|
|
apiVersion: apps/v1
|
|
kind: Deployment
|
|
metadata:
|
|
name: data-collection-service
|
|
spec:
|
|
replicas: 1
|
|
selector:
|
|
matchLabels:
|
|
app: data-collection-service
|
|
template:
|
|
metadata:
|
|
labels:
|
|
app: data-collection-service
|
|
spec:
|
|
containers:
|
|
- name: data-collector
|
|
image: crypto-dashboard/data-collector:latest
|
|
ports:
|
|
- containerPort: 8080
|
|
env:
|
|
- name: POSTGRES_HOST
|
|
value: "postgres-service"
|
|
- name: REDIS_HOST
|
|
value: "redis-service"
|
|
volumeMounts:
|
|
- name: config-volume
|
|
mountPath: /app/config
|
|
- name: logs-volume
|
|
mountPath: /app/logs
|
|
livenessProbe:
|
|
exec:
|
|
command:
|
|
- python
|
|
- scripts/health_check.py
|
|
initialDelaySeconds: 30
|
|
periodSeconds: 60
|
|
volumes:
|
|
- name: config-volume
|
|
configMap:
|
|
name: data-collection-config
|
|
- name: logs-volume
|
|
emptyDir: {}
|
|
```
|
|
|
|
### Systemd Service
|
|
|
|
```ini
|
|
[Unit]
|
|
Description=Cryptocurrency Data Collection Service
|
|
After=network.target postgres.service redis.service
|
|
Requires=postgres.service redis.service
|
|
|
|
[Service]
|
|
Type=simple
|
|
User=crypto-collector
|
|
Group=crypto-collector
|
|
WorkingDirectory=/opt/crypto-dashboard
|
|
ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json
|
|
ExecReload=/bin/kill -HUP $MAINPID
|
|
Restart=always
|
|
RestartSec=10
|
|
KillMode=mixed
|
|
TimeoutStopSec=30
|
|
|
|
# Environment
|
|
Environment=PYTHONPATH=/opt/crypto-dashboard
|
|
Environment=LOG_LEVEL=INFO
|
|
|
|
# Security
|
|
NoNewPrivileges=true
|
|
PrivateTmp=true
|
|
ProtectSystem=strict
|
|
ReadWritePaths=/opt/crypto-dashboard/logs
|
|
|
|
[Install]
|
|
WantedBy=multi-user.target
|
|
```
|
|
|
|
### Environment Configuration
|
|
|
|
```bash
|
|
# Production environment variables
|
|
export ENVIRONMENT=production
|
|
export POSTGRES_HOST=postgres.internal
|
|
export POSTGRES_PORT=5432
|
|
export POSTGRES_DB=crypto_dashboard
|
|
export POSTGRES_USER=dashboard_user
|
|
export POSTGRES_PASSWORD=secure_password
|
|
export REDIS_HOST=redis.internal
|
|
export REDIS_PORT=6379
|
|
|
|
# Service configuration
|
|
export DATA_COLLECTION_CONFIG=/etc/crypto-dashboard/data_collection.json
|
|
export LOG_LEVEL=INFO
|
|
export HEALTH_CHECK_INTERVAL=120
|
|
```
|
|
|
|
## Monitoring and Alerting
|
|
|
|
### Metrics Collection
|
|
|
|
The service exposes metrics for monitoring systems:
|
|
|
|
```python
|
|
# Service metrics
|
|
service_uptime_hours = 24.5
|
|
collectors_running = 5
|
|
collectors_total = 6
|
|
errors_per_hour = 0.2
|
|
data_points_processed = 15000
|
|
```
|
|
|
|
### Health Checks
|
|
|
|
```python
|
|
# External health check endpoint
|
|
async def health_check():
|
|
service = DataCollectionService()
|
|
status = service.get_status()
|
|
|
|
if not status['service_running']:
|
|
return {'status': 'unhealthy', 'reason': 'service_stopped'}
|
|
|
|
if status['collectors_failed'] > status['collectors_total'] * 0.5:
|
|
return {'status': 'degraded', 'reason': 'too_many_failed_collectors'}
|
|
|
|
return {'status': 'healthy'}
|
|
```
|
|
|
|
### Alerting Rules
|
|
|
|
```yaml
|
|
# Prometheus alerting rules
|
|
groups:
|
|
- name: data_collection_service
|
|
rules:
|
|
- alert: DataCollectionServiceDown
|
|
expr: up{job="data-collection-service"} == 0
|
|
for: 5m
|
|
annotations:
|
|
summary: "Data collection service is down"
|
|
|
|
- alert: TooManyFailedCollectors
|
|
expr: collectors_failed / collectors_total > 0.5
|
|
for: 10m
|
|
annotations:
|
|
summary: "More than 50% of collectors have failed"
|
|
|
|
- alert: HighErrorRate
|
|
expr: rate(errors_total[5m]) > 0.1
|
|
for: 15m
|
|
annotations:
|
|
summary: "High error rate in data collection service"
|
|
```
|
|
|
|
## Performance Considerations
|
|
|
|
### Resource Usage
|
|
|
|
- **Memory**: ~150MB base + ~15MB per trading pair (including service overhead)
|
|
- **CPU**: Low (async I/O bound, service orchestration)
|
|
- **Network**: ~1KB/s per trading pair
|
|
- **Storage**: Service logs ~10MB/day
|
|
|
|
### Scaling Strategies
|
|
|
|
1. **Horizontal Scaling**: Multiple service instances with different configurations
|
|
2. **Configuration Partitioning**: Separate services by exchange or asset class
|
|
3. **Load Balancing**: Distribute trading pairs across service instances
|
|
4. **Regional Deployment**: Deploy closer to exchange data centers
|
|
|
|
### Optimization Tips
|
|
|
|
1. **Configuration Tuning**: Optimize health check intervals and timeframes
|
|
2. **Resource Limits**: Set appropriate memory and CPU limits
|
|
3. **Batch Operations**: Use efficient database operations
|
|
4. **Monitoring Overhead**: Balance monitoring frequency with performance
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Service Issues
|
|
|
|
#### Service Won't Start
|
|
|
|
```
|
|
❌ Failed to start data collection service
|
|
```
|
|
|
|
**Solutions:**
|
|
1. Check configuration file validity
|
|
2. Verify database connectivity
|
|
3. Ensure no port conflicts
|
|
4. Check file permissions
|
|
|
|
#### Configuration Loading Failed
|
|
|
|
```
|
|
❌ Failed to load config from config/data_collection.json: Invalid JSON
|
|
```
|
|
|
|
**Solutions:**
|
|
1. Validate JSON syntax
|
|
2. Check required fields
|
|
3. Verify file encoding (UTF-8)
|
|
4. Recreate default configuration
|
|
|
|
#### No Collectors Created
|
|
|
|
```
|
|
❌ No collectors were successfully initialized
|
|
```
|
|
|
|
**Solutions:**
|
|
1. Check exchange configuration
|
|
2. Verify trading pair symbols
|
|
3. Check network connectivity
|
|
4. Review collector creation logs
|
|
|
|
### Debug Mode
|
|
|
|
Enable verbose service debugging:
|
|
|
|
```json
|
|
{
|
|
"logging": {
|
|
"level": "DEBUG",
|
|
"log_errors_only": false,
|
|
"verbose_data_logging": true
|
|
}
|
|
}
|
|
```
|
|
|
|
### Service Diagnostics
|
|
|
|
```python
|
|
# Run diagnostic check
|
|
from data.collection_service import DataCollectionService
|
|
|
|
service = DataCollectionService()
|
|
status = service.get_status()
|
|
|
|
print(f"Service Running: {status['service_running']}")
|
|
print(f"Configuration File: {status['configuration']['config_file']}")
|
|
print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}")
|
|
|
|
# Check individual collector health
|
|
for collector_name in service.manager.list_collectors():
|
|
collector_status = service.manager.get_collector_status(collector_name)
|
|
print(f"{collector_name}: {collector_status['status']}")
|
|
```
|
|
|
|
## Related Documentation
|
|
|
|
- [Data Collectors System](../data_collectors.md) - Comprehensive documentation on core collector components and their modular internal structure.
|
|
- [Logging System](../logging.md) - Details on logging configuration and philosophy.
|
|
- [Database Operations](../../database/operations.md) - Information on database integration and persistence.
|
|
- [Monitoring Guide](../../monitoring/README.md) - Setup for system monitoring and alerting.
|
|
- [ADR-004: Modular Data Collector System Refactoring](../../decisions/ADR-004-modular-data-collector-system.md) - Rationale and implications of the modular architecture. |