TCPDashboard/config/collector_service_config.py
Vasily.onl f6cb1485b1 Implement data collection architecture with modular components
- Introduced a comprehensive data collection framework, including `CollectorServiceConfig`, `BaseDataCollector`, and `CollectorManager`, enhancing modularity and maintainability.
- Developed `CollectorFactory` for streamlined collector creation, promoting separation of concerns and improved configuration handling.
- Enhanced `DataCollectionService` to utilize the new architecture, ensuring robust error handling and logging practices.
- Added `TaskManager` for efficient management of asynchronous tasks, improving performance and resource management.
- Implemented health monitoring and auto-recovery features in `CollectorManager`, ensuring reliable operation of data collectors.
- Updated imports across the codebase to reflect the new structure, ensuring consistent access to components.

These changes significantly improve the architecture and maintainability of the data collection service, aligning with project standards for modularity, performance, and error handling.
2025-06-10 13:40:28 +08:00

474 lines
18 KiB
Python

"""
Service Configuration Manager for data collection service.
This module handles configuration loading, validation, schema management,
and default configuration creation with security enhancements.
"""
import json
import os
import stat
from pathlib import Path
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
@dataclass
class CollectorServiceConfigSchema:
"""Schema definition for service configuration."""
exchange: str = "okx"
connection: Dict[str, Any] = None
data_collection: Dict[str, Any] = None
trading_pairs: List[Dict[str, Any]] = None
logging: Dict[str, Any] = None
database: Dict[str, Any] = None
class CollectorServiceConfig:
"""Manages service configuration with validation and security."""
def __init__(self, config_path: str = "config/data_collection.json", logger=None):
"""
Initialize the service configuration manager.
Args:
config_path: Path to the configuration file
logger: Logger instance for logging operations
"""
self.config_path = config_path
self.logger = logger
self._config: Optional[Dict[str, Any]] = None
def load_config(self) -> Dict[str, Any]:
"""
Load and validate service configuration from JSON file.
Returns:
Dictionary containing the configuration
Raises:
Exception: If configuration loading or validation fails
"""
try:
config_file = Path(self.config_path)
# Create default config if it doesn't exist
if not config_file.exists():
self._create_default_config(config_file)
# Validate file permissions for security
self._validate_file_permissions(config_file)
# Load configuration
with open(config_file, 'r') as f:
config = json.load(f)
# Validate configuration schema
validated_config = self._validate_config_schema(config)
self._config = validated_config
if self.logger:
self.logger.info(f"✅ Configuration loaded from {self.config_path}")
return validated_config
except (FileNotFoundError, IsADirectoryError, PermissionError) as e:
# Handle file system related errors
if self.logger:
self.logger.error(f"❌ File system error loading configuration: {e}", exc_info=True)
raise
except (json.JSONDecodeError, UnicodeDecodeError) as e:
# Handle file parsing errors
if self.logger:
self.logger.error(f"❌ Configuration file parsing error: {e}", exc_info=True)
raise ValueError(f"Invalid configuration file format: {e}")
except (KeyError, ValueError, TypeError) as e:
# Handle configuration validation errors
if self.logger:
self.logger.error(f"❌ Configuration validation error: {e}", exc_info=True)
raise
except Exception as e:
# Catch any other unexpected errors
if self.logger:
self.logger.error(f"❌ Unexpected error loading configuration: {e}", exc_info=True)
raise
def _validate_file_permissions(self, config_file: Path) -> None:
"""
Validate configuration file permissions for security.
Args:
config_file: Path to the configuration file
Raises:
PermissionError: If file permissions are too permissive
ValueError: If file validation fails
"""
try:
file_stat = config_file.stat()
file_mode = file_stat.st_mode
# List of security violations
violations = []
# Check if file is readable by others (security risk)
if file_mode & stat.S_IROTH:
violations.append("readable by others")
if self.logger:
self.logger.warning(f"⚠️ Configuration file {config_file} is readable by others")
# Check if file is writable by others (security risk)
if file_mode & stat.S_IWOTH:
violations.append("writable by others")
if self.logger:
self.logger.warning(f"⚠️ Configuration file {config_file} is writable by others")
# Check if file is writable by group (security risk)
if file_mode & stat.S_IWGRP:
violations.append("writable by group")
if self.logger:
self.logger.warning(f"⚠️ Configuration file {config_file} is writable by group")
# Check if file is readable by group (potential security risk)
if file_mode & stat.S_IRGRP:
violations.append("readable by group")
if self.logger:
self.logger.warning(f"⚠️ Configuration file {config_file} is readable by group")
# Check if file is executable (unnecessary for config files)
if file_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH):
violations.append("executable")
if self.logger:
self.logger.warning(f"⚠️ Configuration file {config_file} has execute permissions")
# Enforce security policy - raise exception if critical violations found
critical_violations = [v for v in violations if "writable by" in v or "readable by others" in v]
if critical_violations:
error_msg = f"Configuration file {config_file} has insecure permissions: {', '.join(critical_violations)}. " \
f"File should only be readable/writable by owner (mode 600 recommended)."
if self.logger:
self.logger.error(f"🔒 Security violation: {error_msg}", exc_info=True)
# On Windows, permission checks might not work as expected, so we log but don't fail
if os.name == 'nt': # Windows
if self.logger:
self.logger.warning("⚠️ On Windows, file permission validation is limited. Please ensure config files are properly secured.")
else:
# On Unix-like systems, enforce strict permissions
raise PermissionError(error_msg)
# Validate file ownership (only on Unix-like systems)
if os.name != 'nt':
current_uid = os.getuid() if hasattr(os, 'getuid') else None
file_uid = file_stat.st_uid
if current_uid is not None and file_uid != current_uid:
warning_msg = f"Configuration file {config_file} is owned by different user (UID: {file_uid}, current: {current_uid})"
if self.logger:
self.logger.warning(f"⚠️ {warning_msg}")
# Log successful validation
if not violations and self.logger:
self.logger.debug(f"✅ Configuration file {config_file} has secure permissions")
except PermissionError:
# Re-raise permission errors
raise
except Exception as e:
error_msg = f"Could not validate file permissions for {config_file}: {e}"
if self.logger:
self.logger.error(f"{error_msg}", exc_info=True)
raise ValueError(error_msg)
def _validate_config_schema(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate configuration against schema and apply defaults.
Args:
config: Raw configuration dictionary
Returns:
Validated configuration with defaults applied
"""
validated = config.copy()
# Validate required fields
required_fields = ['exchange', 'trading_pairs']
for field in required_fields:
if field not in validated:
raise ValueError(f"Missing required configuration field: {field}")
# Apply defaults for optional sections
if 'connection' not in validated:
validated['connection'] = self._get_default_connection_config()
if 'data_collection' not in validated:
validated['data_collection'] = self._get_default_data_collection_config()
if 'logging' not in validated:
validated['logging'] = self._get_default_logging_config()
if 'database' not in validated:
validated['database'] = self._get_default_database_config()
# Validate trading pairs
self._validate_trading_pairs(validated['trading_pairs'])
return validated
def _validate_trading_pairs(self, trading_pairs: List[Dict[str, Any]]) -> None:
"""
Validate trading pairs configuration.
Args:
trading_pairs: List of trading pair configurations
Raises:
ValueError: If trading pairs configuration is invalid
"""
if not trading_pairs:
raise ValueError("At least one trading pair must be configured")
for i, pair in enumerate(trading_pairs):
if 'symbol' not in pair:
raise ValueError(f"Trading pair {i} missing required 'symbol' field")
symbol = pair['symbol']
if not isinstance(symbol, str) or '-' not in symbol:
raise ValueError(f"Invalid symbol format: {symbol}. Expected format: 'BASE-QUOTE'")
# Validate data types
data_types = pair.get('data_types', ['trade'])
valid_data_types = ['trade', 'orderbook', 'ticker', 'candle']
for dt in data_types:
if dt not in valid_data_types:
raise ValueError(f"Invalid data type '{dt}' for {symbol}. Valid types: {valid_data_types}")
def _create_default_config(self, config_file: Path) -> None:
"""
Create a default configuration file.
Args:
config_file: Path where the configuration file should be created
"""
default_config = {
"exchange": "okx",
"connection": self._get_default_connection_config(),
"data_collection": self._get_default_data_collection_config(),
"trading_pairs": self._get_default_trading_pairs(),
"logging": self._get_default_logging_config(),
"database": self._get_default_database_config()
}
# Ensure directory exists
config_file.parent.mkdir(parents=True, exist_ok=True)
# Write configuration file
with open(config_file, 'w') as f:
json.dump(default_config, f, indent=2)
# Set secure file permissions (owner read/write only)
self._set_secure_permissions(config_file)
if self.logger:
self.logger.info(f"📄 Created default configuration: {config_file}")
def _set_secure_permissions(self, config_file: Path) -> None:
"""
Set secure file permissions for configuration file.
Args:
config_file: Path to the configuration file
"""
try:
# Set permissions to owner read/write only (mode 600)
secure_mode = stat.S_IRUSR | stat.S_IWUSR
os.chmod(config_file, secure_mode)
if self.logger:
self.logger.debug(f"Set secure permissions (600) for {config_file}")
except (OSError, PermissionError) as e:
# Handle file system permission errors
if self.logger:
self.logger.warning(f"Could not set secure file permissions for {config_file}: {e}")
except Exception as e:
# Handle any other unexpected errors
if self.logger:
self.logger.warning(f"Unexpected error setting file permissions for {config_file}: {e}")
def fix_file_permissions(self, config_file_path: Optional[str] = None) -> bool:
"""
Fix file permissions for configuration file to be secure.
Args:
config_file_path: Optional specific file path, defaults to instance config_path
Returns:
bool: True if permissions were fixed successfully, False otherwise
"""
try:
file_path = Path(config_file_path or self.config_path)
if not file_path.exists():
if self.logger:
self.logger.error(f"Configuration file {file_path} does not exist", exc_info=True)
return False
# Set secure permissions
self._set_secure_permissions(file_path)
# Verify the fix worked
try:
self._validate_file_permissions(file_path)
if self.logger:
self.logger.info(f"✅ Successfully fixed permissions for {file_path}")
return True
except (PermissionError, ValueError):
if self.logger:
self.logger.warning(f"⚠️ Permissions may still be insecure for {file_path}")
return False
except (FileNotFoundError, PermissionError, OSError) as e:
# Handle file system related errors
if self.logger:
self.logger.error(f"❌ File system error fixing permissions: {e}", exc_info=True)
return False
except Exception as e:
# Handle any other unexpected errors
if self.logger:
self.logger.error(f"❌ Unexpected error fixing file permissions: {e}", exc_info=True)
return False
def _get_default_connection_config(self) -> Dict[str, Any]:
"""Get default connection configuration."""
return {
"public_ws_url": "wss://ws.okx.com:8443/ws/v5/public",
"private_ws_url": "wss://ws.okx.com:8443/ws/v5/private",
"ping_interval": 25.0,
"pong_timeout": 10.0,
"max_reconnect_attempts": 5,
"reconnect_delay": 5.0
}
def _get_default_data_collection_config(self) -> Dict[str, Any]:
"""Get default data collection configuration."""
return {
"store_raw_data": True,
"health_check_interval": 120.0,
"auto_restart": True,
"buffer_size": 1000
}
def _get_default_trading_pairs(self) -> List[Dict[str, Any]]:
"""Get default trading pairs configuration."""
return [
{
"symbol": "BTC-USDT",
"enabled": True,
"data_types": ["trade", "orderbook"],
"timeframes": ["1m", "5m", "15m", "1h"],
"channels": {
"trades": "trades",
"orderbook": "books5",
"ticker": "tickers"
}
},
{
"symbol": "ETH-USDT",
"enabled": True,
"data_types": ["trade", "orderbook"],
"timeframes": ["1m", "5m", "15m", "1h"],
"channels": {
"trades": "trades",
"orderbook": "books5",
"ticker": "tickers"
}
}
]
def _get_default_logging_config(self) -> Dict[str, Any]:
"""Get default logging configuration."""
return {
"component_name_template": "okx_collector_{symbol}",
"log_level": "INFO",
"verbose": False
}
def _get_default_database_config(self) -> Dict[str, Any]:
"""Get default database configuration."""
return {
"store_processed_data": True,
"store_raw_data": True,
"force_update_candles": False,
"batch_size": 100,
"flush_interval": 5.0
}
def get_config(self) -> Dict[str, Any]:
"""
Get the current configuration.
Returns:
Current configuration dictionary
Raises:
RuntimeError: If configuration has not been loaded
"""
if self._config is None:
raise RuntimeError("Configuration has not been loaded. Call load_config() first.")
return self._config.copy()
def get_exchange_config(self) -> Dict[str, Any]:
"""Get exchange-specific configuration."""
config = self.get_config()
return {
'exchange': config['exchange'],
'connection': config['connection']
}
def get_enabled_trading_pairs(self) -> List[Dict[str, Any]]:
"""Get list of enabled trading pairs."""
config = self.get_config()
trading_pairs = config.get('trading_pairs', [])
return [pair for pair in trading_pairs if pair.get('enabled', True)]
def get_data_collection_config(self) -> Dict[str, Any]:
"""Get data collection configuration."""
config = self.get_config()
return config.get('data_collection', {})
def update_config(self, updates: Dict[str, Any]) -> None:
"""
Update configuration with new values.
Args:
updates: Dictionary of configuration updates
"""
if self._config is None:
raise RuntimeError("Configuration has not been loaded. Call load_config() first.")
self._config.update(updates)
# Optionally save to file
if self.logger:
self.logger.info("Configuration updated in memory")
def save_config(self) -> None:
"""Save current configuration to file with secure permissions."""
if self._config is None:
raise RuntimeError("Configuration has not been loaded. Call load_config() first.")
config_file = Path(self.config_path)
# Save configuration
with open(config_file, 'w') as f:
json.dump(self._config, f, indent=2)
# Ensure secure permissions after saving
self._set_secure_permissions(config_file)
if self.logger:
self.logger.info(f"Configuration saved to {self.config_path} with secure permissions")