- Introduced a comprehensive data collection framework, including `CollectorServiceConfig`, `BaseDataCollector`, and `CollectorManager`, enhancing modularity and maintainability. - Developed `CollectorFactory` for streamlined collector creation, promoting separation of concerns and improved configuration handling. - Enhanced `DataCollectionService` to utilize the new architecture, ensuring robust error handling and logging practices. - Added `TaskManager` for efficient management of asynchronous tasks, improving performance and resource management. - Implemented health monitoring and auto-recovery features in `CollectorManager`, ensuring reliable operation of data collectors. - Updated imports across the codebase to reflect the new structure, ensuring consistent access to components. These changes significantly improve the architecture and maintainability of the data collection service, aligning with project standards for modularity, performance, and error handling.
474 lines
18 KiB
Python
474 lines
18 KiB
Python
"""
|
|
Service Configuration Manager for data collection service.
|
|
|
|
This module handles configuration loading, validation, schema management,
|
|
and default configuration creation with security enhancements.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import stat
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional, List
|
|
from dataclasses import dataclass
|
|
|
|
|
|
@dataclass
|
|
class CollectorServiceConfigSchema:
|
|
"""Schema definition for service configuration."""
|
|
exchange: str = "okx"
|
|
connection: Dict[str, Any] = None
|
|
data_collection: Dict[str, Any] = None
|
|
trading_pairs: List[Dict[str, Any]] = None
|
|
logging: Dict[str, Any] = None
|
|
database: Dict[str, Any] = None
|
|
|
|
|
|
class CollectorServiceConfig:
|
|
"""Manages service configuration with validation and security."""
|
|
|
|
def __init__(self, config_path: str = "config/data_collection.json", logger=None):
|
|
"""
|
|
Initialize the service configuration manager.
|
|
|
|
Args:
|
|
config_path: Path to the configuration file
|
|
logger: Logger instance for logging operations
|
|
"""
|
|
self.config_path = config_path
|
|
self.logger = logger
|
|
self._config: Optional[Dict[str, Any]] = None
|
|
|
|
def load_config(self) -> Dict[str, Any]:
|
|
"""
|
|
Load and validate service configuration from JSON file.
|
|
|
|
Returns:
|
|
Dictionary containing the configuration
|
|
|
|
Raises:
|
|
Exception: If configuration loading or validation fails
|
|
"""
|
|
try:
|
|
config_file = Path(self.config_path)
|
|
|
|
# Create default config if it doesn't exist
|
|
if not config_file.exists():
|
|
self._create_default_config(config_file)
|
|
|
|
# Validate file permissions for security
|
|
self._validate_file_permissions(config_file)
|
|
|
|
# Load configuration
|
|
with open(config_file, 'r') as f:
|
|
config = json.load(f)
|
|
|
|
# Validate configuration schema
|
|
validated_config = self._validate_config_schema(config)
|
|
|
|
self._config = validated_config
|
|
|
|
if self.logger:
|
|
self.logger.info(f"✅ Configuration loaded from {self.config_path}")
|
|
|
|
return validated_config
|
|
|
|
except (FileNotFoundError, IsADirectoryError, PermissionError) as e:
|
|
# Handle file system related errors
|
|
if self.logger:
|
|
self.logger.error(f"❌ File system error loading configuration: {e}", exc_info=True)
|
|
raise
|
|
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
|
# Handle file parsing errors
|
|
if self.logger:
|
|
self.logger.error(f"❌ Configuration file parsing error: {e}", exc_info=True)
|
|
raise ValueError(f"Invalid configuration file format: {e}")
|
|
except (KeyError, ValueError, TypeError) as e:
|
|
# Handle configuration validation errors
|
|
if self.logger:
|
|
self.logger.error(f"❌ Configuration validation error: {e}", exc_info=True)
|
|
raise
|
|
except Exception as e:
|
|
# Catch any other unexpected errors
|
|
if self.logger:
|
|
self.logger.error(f"❌ Unexpected error loading configuration: {e}", exc_info=True)
|
|
raise
|
|
|
|
def _validate_file_permissions(self, config_file: Path) -> None:
|
|
"""
|
|
Validate configuration file permissions for security.
|
|
|
|
Args:
|
|
config_file: Path to the configuration file
|
|
|
|
Raises:
|
|
PermissionError: If file permissions are too permissive
|
|
ValueError: If file validation fails
|
|
"""
|
|
try:
|
|
file_stat = config_file.stat()
|
|
file_mode = file_stat.st_mode
|
|
|
|
# List of security violations
|
|
violations = []
|
|
|
|
# Check if file is readable by others (security risk)
|
|
if file_mode & stat.S_IROTH:
|
|
violations.append("readable by others")
|
|
if self.logger:
|
|
self.logger.warning(f"⚠️ Configuration file {config_file} is readable by others")
|
|
|
|
# Check if file is writable by others (security risk)
|
|
if file_mode & stat.S_IWOTH:
|
|
violations.append("writable by others")
|
|
if self.logger:
|
|
self.logger.warning(f"⚠️ Configuration file {config_file} is writable by others")
|
|
|
|
# Check if file is writable by group (security risk)
|
|
if file_mode & stat.S_IWGRP:
|
|
violations.append("writable by group")
|
|
if self.logger:
|
|
self.logger.warning(f"⚠️ Configuration file {config_file} is writable by group")
|
|
|
|
# Check if file is readable by group (potential security risk)
|
|
if file_mode & stat.S_IRGRP:
|
|
violations.append("readable by group")
|
|
if self.logger:
|
|
self.logger.warning(f"⚠️ Configuration file {config_file} is readable by group")
|
|
|
|
# Check if file is executable (unnecessary for config files)
|
|
if file_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH):
|
|
violations.append("executable")
|
|
if self.logger:
|
|
self.logger.warning(f"⚠️ Configuration file {config_file} has execute permissions")
|
|
|
|
# Enforce security policy - raise exception if critical violations found
|
|
critical_violations = [v for v in violations if "writable by" in v or "readable by others" in v]
|
|
if critical_violations:
|
|
error_msg = f"Configuration file {config_file} has insecure permissions: {', '.join(critical_violations)}. " \
|
|
f"File should only be readable/writable by owner (mode 600 recommended)."
|
|
|
|
if self.logger:
|
|
self.logger.error(f"🔒 Security violation: {error_msg}", exc_info=True)
|
|
|
|
# On Windows, permission checks might not work as expected, so we log but don't fail
|
|
if os.name == 'nt': # Windows
|
|
if self.logger:
|
|
self.logger.warning("⚠️ On Windows, file permission validation is limited. Please ensure config files are properly secured.")
|
|
else:
|
|
# On Unix-like systems, enforce strict permissions
|
|
raise PermissionError(error_msg)
|
|
|
|
# Validate file ownership (only on Unix-like systems)
|
|
if os.name != 'nt':
|
|
current_uid = os.getuid() if hasattr(os, 'getuid') else None
|
|
file_uid = file_stat.st_uid
|
|
|
|
if current_uid is not None and file_uid != current_uid:
|
|
warning_msg = f"Configuration file {config_file} is owned by different user (UID: {file_uid}, current: {current_uid})"
|
|
if self.logger:
|
|
self.logger.warning(f"⚠️ {warning_msg}")
|
|
|
|
# Log successful validation
|
|
if not violations and self.logger:
|
|
self.logger.debug(f"✅ Configuration file {config_file} has secure permissions")
|
|
|
|
except PermissionError:
|
|
# Re-raise permission errors
|
|
raise
|
|
except Exception as e:
|
|
error_msg = f"Could not validate file permissions for {config_file}: {e}"
|
|
if self.logger:
|
|
self.logger.error(f"❌ {error_msg}", exc_info=True)
|
|
raise ValueError(error_msg)
|
|
|
|
def _validate_config_schema(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Validate configuration against schema and apply defaults.
|
|
|
|
Args:
|
|
config: Raw configuration dictionary
|
|
|
|
Returns:
|
|
Validated configuration with defaults applied
|
|
"""
|
|
validated = config.copy()
|
|
|
|
# Validate required fields
|
|
required_fields = ['exchange', 'trading_pairs']
|
|
for field in required_fields:
|
|
if field not in validated:
|
|
raise ValueError(f"Missing required configuration field: {field}")
|
|
|
|
# Apply defaults for optional sections
|
|
if 'connection' not in validated:
|
|
validated['connection'] = self._get_default_connection_config()
|
|
|
|
if 'data_collection' not in validated:
|
|
validated['data_collection'] = self._get_default_data_collection_config()
|
|
|
|
if 'logging' not in validated:
|
|
validated['logging'] = self._get_default_logging_config()
|
|
|
|
if 'database' not in validated:
|
|
validated['database'] = self._get_default_database_config()
|
|
|
|
# Validate trading pairs
|
|
self._validate_trading_pairs(validated['trading_pairs'])
|
|
|
|
return validated
|
|
|
|
def _validate_trading_pairs(self, trading_pairs: List[Dict[str, Any]]) -> None:
|
|
"""
|
|
Validate trading pairs configuration.
|
|
|
|
Args:
|
|
trading_pairs: List of trading pair configurations
|
|
|
|
Raises:
|
|
ValueError: If trading pairs configuration is invalid
|
|
"""
|
|
if not trading_pairs:
|
|
raise ValueError("At least one trading pair must be configured")
|
|
|
|
for i, pair in enumerate(trading_pairs):
|
|
if 'symbol' not in pair:
|
|
raise ValueError(f"Trading pair {i} missing required 'symbol' field")
|
|
|
|
symbol = pair['symbol']
|
|
if not isinstance(symbol, str) or '-' not in symbol:
|
|
raise ValueError(f"Invalid symbol format: {symbol}. Expected format: 'BASE-QUOTE'")
|
|
|
|
# Validate data types
|
|
data_types = pair.get('data_types', ['trade'])
|
|
valid_data_types = ['trade', 'orderbook', 'ticker', 'candle']
|
|
for dt in data_types:
|
|
if dt not in valid_data_types:
|
|
raise ValueError(f"Invalid data type '{dt}' for {symbol}. Valid types: {valid_data_types}")
|
|
|
|
def _create_default_config(self, config_file: Path) -> None:
|
|
"""
|
|
Create a default configuration file.
|
|
|
|
Args:
|
|
config_file: Path where the configuration file should be created
|
|
"""
|
|
default_config = {
|
|
"exchange": "okx",
|
|
"connection": self._get_default_connection_config(),
|
|
"data_collection": self._get_default_data_collection_config(),
|
|
"trading_pairs": self._get_default_trading_pairs(),
|
|
"logging": self._get_default_logging_config(),
|
|
"database": self._get_default_database_config()
|
|
}
|
|
|
|
# Ensure directory exists
|
|
config_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Write configuration file
|
|
with open(config_file, 'w') as f:
|
|
json.dump(default_config, f, indent=2)
|
|
|
|
# Set secure file permissions (owner read/write only)
|
|
self._set_secure_permissions(config_file)
|
|
|
|
if self.logger:
|
|
self.logger.info(f"📄 Created default configuration: {config_file}")
|
|
|
|
def _set_secure_permissions(self, config_file: Path) -> None:
|
|
"""
|
|
Set secure file permissions for configuration file.
|
|
|
|
Args:
|
|
config_file: Path to the configuration file
|
|
"""
|
|
try:
|
|
# Set permissions to owner read/write only (mode 600)
|
|
secure_mode = stat.S_IRUSR | stat.S_IWUSR
|
|
os.chmod(config_file, secure_mode)
|
|
|
|
if self.logger:
|
|
self.logger.debug(f"Set secure permissions (600) for {config_file}")
|
|
|
|
except (OSError, PermissionError) as e:
|
|
# Handle file system permission errors
|
|
if self.logger:
|
|
self.logger.warning(f"Could not set secure file permissions for {config_file}: {e}")
|
|
except Exception as e:
|
|
# Handle any other unexpected errors
|
|
if self.logger:
|
|
self.logger.warning(f"Unexpected error setting file permissions for {config_file}: {e}")
|
|
|
|
def fix_file_permissions(self, config_file_path: Optional[str] = None) -> bool:
|
|
"""
|
|
Fix file permissions for configuration file to be secure.
|
|
|
|
Args:
|
|
config_file_path: Optional specific file path, defaults to instance config_path
|
|
|
|
Returns:
|
|
bool: True if permissions were fixed successfully, False otherwise
|
|
"""
|
|
try:
|
|
file_path = Path(config_file_path or self.config_path)
|
|
|
|
if not file_path.exists():
|
|
if self.logger:
|
|
self.logger.error(f"Configuration file {file_path} does not exist", exc_info=True)
|
|
return False
|
|
|
|
# Set secure permissions
|
|
self._set_secure_permissions(file_path)
|
|
|
|
# Verify the fix worked
|
|
try:
|
|
self._validate_file_permissions(file_path)
|
|
if self.logger:
|
|
self.logger.info(f"✅ Successfully fixed permissions for {file_path}")
|
|
return True
|
|
except (PermissionError, ValueError):
|
|
if self.logger:
|
|
self.logger.warning(f"⚠️ Permissions may still be insecure for {file_path}")
|
|
return False
|
|
|
|
except (FileNotFoundError, PermissionError, OSError) as e:
|
|
# Handle file system related errors
|
|
if self.logger:
|
|
self.logger.error(f"❌ File system error fixing permissions: {e}", exc_info=True)
|
|
return False
|
|
except Exception as e:
|
|
# Handle any other unexpected errors
|
|
if self.logger:
|
|
self.logger.error(f"❌ Unexpected error fixing file permissions: {e}", exc_info=True)
|
|
return False
|
|
|
|
def _get_default_connection_config(self) -> Dict[str, Any]:
|
|
"""Get default connection configuration."""
|
|
return {
|
|
"public_ws_url": "wss://ws.okx.com:8443/ws/v5/public",
|
|
"private_ws_url": "wss://ws.okx.com:8443/ws/v5/private",
|
|
"ping_interval": 25.0,
|
|
"pong_timeout": 10.0,
|
|
"max_reconnect_attempts": 5,
|
|
"reconnect_delay": 5.0
|
|
}
|
|
|
|
def _get_default_data_collection_config(self) -> Dict[str, Any]:
|
|
"""Get default data collection configuration."""
|
|
return {
|
|
"store_raw_data": True,
|
|
"health_check_interval": 120.0,
|
|
"auto_restart": True,
|
|
"buffer_size": 1000
|
|
}
|
|
|
|
def _get_default_trading_pairs(self) -> List[Dict[str, Any]]:
|
|
"""Get default trading pairs configuration."""
|
|
return [
|
|
{
|
|
"symbol": "BTC-USDT",
|
|
"enabled": True,
|
|
"data_types": ["trade", "orderbook"],
|
|
"timeframes": ["1m", "5m", "15m", "1h"],
|
|
"channels": {
|
|
"trades": "trades",
|
|
"orderbook": "books5",
|
|
"ticker": "tickers"
|
|
}
|
|
},
|
|
{
|
|
"symbol": "ETH-USDT",
|
|
"enabled": True,
|
|
"data_types": ["trade", "orderbook"],
|
|
"timeframes": ["1m", "5m", "15m", "1h"],
|
|
"channels": {
|
|
"trades": "trades",
|
|
"orderbook": "books5",
|
|
"ticker": "tickers"
|
|
}
|
|
}
|
|
]
|
|
|
|
def _get_default_logging_config(self) -> Dict[str, Any]:
|
|
"""Get default logging configuration."""
|
|
return {
|
|
"component_name_template": "okx_collector_{symbol}",
|
|
"log_level": "INFO",
|
|
"verbose": False
|
|
}
|
|
|
|
def _get_default_database_config(self) -> Dict[str, Any]:
|
|
"""Get default database configuration."""
|
|
return {
|
|
"store_processed_data": True,
|
|
"store_raw_data": True,
|
|
"force_update_candles": False,
|
|
"batch_size": 100,
|
|
"flush_interval": 5.0
|
|
}
|
|
|
|
def get_config(self) -> Dict[str, Any]:
|
|
"""
|
|
Get the current configuration.
|
|
|
|
Returns:
|
|
Current configuration dictionary
|
|
|
|
Raises:
|
|
RuntimeError: If configuration has not been loaded
|
|
"""
|
|
if self._config is None:
|
|
raise RuntimeError("Configuration has not been loaded. Call load_config() first.")
|
|
return self._config.copy()
|
|
|
|
def get_exchange_config(self) -> Dict[str, Any]:
|
|
"""Get exchange-specific configuration."""
|
|
config = self.get_config()
|
|
return {
|
|
'exchange': config['exchange'],
|
|
'connection': config['connection']
|
|
}
|
|
|
|
def get_enabled_trading_pairs(self) -> List[Dict[str, Any]]:
|
|
"""Get list of enabled trading pairs."""
|
|
config = self.get_config()
|
|
trading_pairs = config.get('trading_pairs', [])
|
|
return [pair for pair in trading_pairs if pair.get('enabled', True)]
|
|
|
|
def get_data_collection_config(self) -> Dict[str, Any]:
|
|
"""Get data collection configuration."""
|
|
config = self.get_config()
|
|
return config.get('data_collection', {})
|
|
|
|
def update_config(self, updates: Dict[str, Any]) -> None:
|
|
"""
|
|
Update configuration with new values.
|
|
|
|
Args:
|
|
updates: Dictionary of configuration updates
|
|
"""
|
|
if self._config is None:
|
|
raise RuntimeError("Configuration has not been loaded. Call load_config() first.")
|
|
|
|
self._config.update(updates)
|
|
|
|
# Optionally save to file
|
|
if self.logger:
|
|
self.logger.info("Configuration updated in memory")
|
|
|
|
def save_config(self) -> None:
|
|
"""Save current configuration to file with secure permissions."""
|
|
if self._config is None:
|
|
raise RuntimeError("Configuration has not been loaded. Call load_config() first.")
|
|
|
|
config_file = Path(self.config_path)
|
|
|
|
# Save configuration
|
|
with open(config_file, 'w') as f:
|
|
json.dump(self._config, f, indent=2)
|
|
|
|
# Ensure secure permissions after saving
|
|
self._set_secure_permissions(config_file)
|
|
|
|
if self.logger:
|
|
self.logger.info(f"Configuration saved to {self.config_path} with secure permissions") |