Implement data collection architecture with modular components

- Introduced a comprehensive data collection framework, including `CollectorServiceConfig`, `BaseDataCollector`, and `CollectorManager`, enhancing modularity and maintainability.
- Developed `CollectorFactory` for streamlined collector creation, promoting separation of concerns and improved configuration handling.
- Enhanced `DataCollectionService` to utilize the new architecture, ensuring robust error handling and logging practices.
- Added `TaskManager` for efficient management of asynchronous tasks, improving performance and resource management.
- Implemented health monitoring and auto-recovery features in `CollectorManager`, ensuring reliable operation of data collectors.
- Updated imports across the codebase to reflect the new structure, ensuring consistent access to components.

These changes significantly improve the architecture and maintainability of the data collection service, aligning with project standards for modularity, performance, and error handling.
This commit is contained in:
Vasily.onl
2025-06-10 13:40:28 +08:00
parent c28e4a9aaf
commit f6cb1485b1
18 changed files with 384 additions and 45 deletions

View File

@@ -0,0 +1,529 @@
"""
Abstract base class for data collectors.
This module provides a common interface for all data collection implementations,
ensuring consistency across different exchange connectors and data sources.
"""
import asyncio
from abc import ABC, abstractmethod
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Dict, List, Optional, Any, Callable, Set
from dataclasses import dataclass
from enum import Enum
from utils.logger import get_logger
from .collector_state_telemetry import CollectorStatus, CollectorStateAndTelemetry
from .collector_connection_manager import ConnectionManager
from .collector_callback_dispatcher import CallbackDispatcher
from ..common.data_types import DataType, MarketDataPoint
from ..common.ohlcv_data import OHLCVData, DataValidationError, validate_ohlcv_data
class DataCollectorError(Exception):
"""Base exception for data collector errors."""
pass
class ConnectionError(DataCollectorError):
"""Exception raised when connection to data source fails."""
pass
class BaseDataCollector(ABC):
"""
Abstract base class for all data collectors.
This class defines the interface that all data collection implementations
must follow, providing consistency across different exchanges and data sources.
"""
def __init__(self,
exchange_name: str,
symbols: List[str],
data_types: Optional[List[DataType]] = None,
timeframes: Optional[List[str]] = None,
component_name: Optional[str] = None,
auto_restart: bool = True,
health_check_interval: float = 30.0,
logger = None,
log_errors_only: bool = False):
"""
Initialize the base data collector.
Args:
exchange_name: Name of the exchange (e.g., 'okx', 'binance')
symbols: List of trading symbols to collect data for
data_types: Types of data to collect (default: [DataType.CANDLE])
timeframes: List of timeframes to collect (e.g., ['1s', '1m', '5m'])
component_name: Name for logging (default: based on exchange_name)
auto_restart: Enable automatic restart on failures (default: True)
health_check_interval: Seconds between health checks (default: 30.0)
logger: Logger instance. If None, no logging will be performed.
log_errors_only: If True and logger is provided, only log error-level messages
"""
self.exchange_name = exchange_name.lower()
self.symbols = set(symbols)
self.data_types = data_types or [DataType.CANDLE]
self.timeframes = timeframes or ['1m', '5m'] # Default timeframes if none provided
self.auto_restart = auto_restart
# Initialize logger based on parameters
if logger is not None:
self.logger = logger
else:
self.logger = get_logger(self.exchange_name) # Ensure a logger is always available
# Initialize state and telemetry manager
component = component_name or f"{self.exchange_name}_collector"
self._state_telemetry = CollectorStateAndTelemetry(
exchange_name=self.exchange_name,
component_name=component,
health_check_interval=health_check_interval,
logger=self.logger, # Pass the actual logger instance
log_errors_only=log_errors_only
)
self.component_name = component # Keep for external access
# Initialize connection manager
self._connection_manager = ConnectionManager(
exchange_name=self.exchange_name,
component_name=component,
max_reconnect_attempts=5, # Default, can be made configurable later
reconnect_delay=5.0, # Default, can be made configurable later
logger=self.logger,
state_telemetry=self._state_telemetry
)
# Initialize callback dispatcher
self._callback_dispatcher = CallbackDispatcher(
component_name=component,
logger=self.logger
)
# Collector state (now managed by _state_telemetry)
self._tasks: Set[asyncio.Task] = set()
# Log initialization if logger is available
if self._state_telemetry.logger:
if not self._state_telemetry.log_errors_only:
self._state_telemetry._log_info(f"{self.component_name}: Initialized {self.exchange_name} data collector for symbols: {', '.join(symbols)}")
self._state_telemetry._log_info(f"{self.component_name}: Using timeframes: {', '.join(self.timeframes)}")
@property
def status(self) -> CollectorStatus:
return self._state_telemetry.status
def _log_debug(self, message: str) -> None:
self._state_telemetry._log_debug(message)
def _log_info(self, message: str) -> None:
self._state_telemetry._log_info(message)
def _log_warning(self, message: str) -> None:
self._state_telemetry._log_warning(message)
def _log_error(self, message: str, exc_info: bool = False) -> None:
self._state_telemetry._log_error(message, exc_info=exc_info)
def _log_critical(self, message: str, exc_info: bool = False) -> None:
self._state_telemetry._log_critical(message, exc_info=exc_info)
@abstractmethod
async def connect(self) -> bool:
"""
Establish connection to the data source.
Returns:
True if connection successful, False otherwise
"""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Disconnect from the data source."""
pass
@abstractmethod
async def subscribe_to_data(self, symbols: List[str], data_types: List[DataType]) -> bool:
"""
Subscribe to data streams for specified symbols and data types.
Args:
symbols: Trading symbols to subscribe to
data_types: Types of data to subscribe to
Returns:
True if subscription successful, False otherwise
"""
pass
@abstractmethod
async def unsubscribe_from_data(self, symbols: List[str], data_types: List[DataType]) -> bool:
"""
Unsubscribe from data streams.
Args:
symbols: Trading symbols to unsubscribe from
data_types: Types of data to unsubscribe from
Returns:
True if unsubscription successful, False otherwise
"""
pass
@abstractmethod
async def _process_message(self, message: Any) -> Optional[MarketDataPoint]:
"""
Process incoming message from the data source.
Args:
message: Raw message from the data source
Returns:
Processed MarketDataPoint or None if message should be ignored
"""
pass
async def start(self) -> bool:
"""
Start the data collector.
Returns:
True if started successfully, False otherwise
"""
# Check if already running or starting
if self._state_telemetry.status in [CollectorStatus.RUNNING, CollectorStatus.STARTING]:
self._log_warning("Data collector is already running or starting")
return True
self._log_info(f"Starting {self.exchange_name} data collector")
self._state_telemetry.update_status(CollectorStatus.STARTING)
self._state_telemetry.set_should_be_running(True)
try:
# Connect to data source
if not await self._connection_manager.connect(self._actual_connect):
self._log_error("Failed to connect to data source")
self._state_telemetry.update_status(CollectorStatus.ERROR)
return False
# Subscribe to data streams
if not await self.subscribe_to_data(list(self.symbols), self.data_types):
self._log_error("Failed to subscribe to data streams")
self._state_telemetry.update_status(CollectorStatus.ERROR)
await self._connection_manager.disconnect(self._actual_disconnect)
return False
# Start background tasks
self._state_telemetry.set_running_state(True)
self._state_telemetry.update_status(CollectorStatus.RUNNING)
# Start message processing task
message_task = asyncio.create_task(self._message_loop())
self._tasks.add(message_task)
message_task.add_done_callback(self._tasks.discard)
# Start health monitoring task
if self._state_telemetry.health_check_interval > 0:
health_task = asyncio.create_task(self._health_monitor())
self._tasks.add(health_task)
health_task.add_done_callback(self._tasks.discard)
self._log_info(f"{self.exchange_name} data collector started successfully")
return True
except Exception as e:
self._log_error(f"Failed to start data collector: {e}")
self._state_telemetry.update_status(CollectorStatus.ERROR)
self._state_telemetry.set_should_be_running(False)
return False
async def stop(self, force: bool = False) -> None:
"""
Stop the data collector and cleanup resources.
Args:
force: Force stop even if not graceful
"""
if self._state_telemetry.status == CollectorStatus.STOPPED:
self._log_warning("Data collector is already stopped")
return
self._log_info(f"Stopping {self.exchange_name} data collector")
self._state_telemetry.update_status(CollectorStatus.STOPPING)
self._state_telemetry.set_should_be_running(False)
try:
# Stop background tasks
self._state_telemetry.set_running_state(False)
# Cancel all tasks
for task in list(self._tasks):
if not task.done():
task.cancel()
if not force:
try:
await task
except asyncio.CancelledError:
pass
self._tasks.clear()
# Unsubscribe and disconnect
await self.unsubscribe_from_data(list(self.symbols), self.data_types)
await self._connection_manager.disconnect(self._actual_disconnect)
self._state_telemetry.update_status(CollectorStatus.STOPPED)
self._log_info(f"{self.exchange_name} data collector stopped")
except Exception as e:
self._log_error(f"Error stopping data collector: {e}")
self._state_telemetry.update_status(CollectorStatus.ERROR)
async def restart(self) -> bool:
"""
Restart the data collector.
Returns:
True if restarted successfully, False otherwise
"""
self._log_info(f"Restarting {self.exchange_name} data collector")
self._state_telemetry.increment_restarts()
# Stop first
await self.stop()
# Wait a bit before restarting
await asyncio.sleep(self._connection_manager._reconnect_delay)
# Start again
return await self.start()
async def _message_loop(self) -> None:
"""Main message processing loop."""
try:
self._log_debug("Starting message processing loop")
while self._state_telemetry._running:
try:
await self._handle_messages()
except asyncio.CancelledError:
break
except Exception as e:
self._state_telemetry.increment_errors(str(e))
self._log_error(f"Error processing messages: {e}")
# Small delay to prevent tight error loops
await asyncio.sleep(0.1)
except asyncio.CancelledError:
self._log_debug("Message loop cancelled")
raise
except Exception as e:
self._log_error(f"Error in message loop: {e}")
self._state_telemetry.update_status(CollectorStatus.ERROR)
async def _health_monitor(self) -> None:
"""Monitor collector health and restart if needed."""
try:
self._log_debug("Starting health monitor")
while self._state_telemetry._running:
try:
await asyncio.sleep(self._state_telemetry.health_check_interval)
# Check if collector should be running but isn't
if self._state_telemetry._should_be_running and self._state_telemetry.status != CollectorStatus.RUNNING:
self._log_warning("Collector should be running but isn't - restarting")
if self.auto_restart:
asyncio.create_task(self.restart())
continue
# Check heartbeat
time_since_heartbeat = datetime.now(timezone.utc) - self._state_telemetry._last_heartbeat
if time_since_heartbeat > timedelta(seconds=self._state_telemetry.health_check_interval * 2):
self._log_warning(f"No heartbeat for {time_since_heartbeat.total_seconds():.1f}s - restarting")
if self.auto_restart:
asyncio.create_task(self.restart())
continue
# Check data reception
if self._state_telemetry._last_data_received:
time_since_data = datetime.now(timezone.utc) - self._state_telemetry._last_data_received
if time_since_data > self._state_telemetry._max_silence_duration:
self._log_warning(f"No data received for {time_since_data.total_seconds():.1f}s - restarting")
if self.auto_restart:
asyncio.create_task(self.restart())
continue
# Check for error status
if self._state_telemetry.status == CollectorStatus.ERROR:
self._log_warning(f"Collector in {self._state_telemetry.status.value} status - restarting")
if self.auto_restart:
asyncio.create_task(self.restart())
except asyncio.CancelledError:
break
except asyncio.CancelledError:
self._log_debug("Health monitor cancelled")
raise
except Exception as e:
self._log_error(f"Error in health monitor: {e}")
@abstractmethod
async def _handle_messages(self) -> None:
"""
Handle incoming messages from the data source.
This method should be implemented by subclasses to handle their specific message format.
"""
pass
async def _handle_connection_error(self) -> bool:
"""
Handle connection errors and attempt reconnection.
Returns:
True if reconnection successful, False if max attempts exceeded
"""
return await self._connection_manager.handle_connection_error(
connect_logic=self._actual_connect,
subscribe_logic=self.subscribe_to_data,
symbols=list(self.symbols),
data_types=self.data_types
)
@abstractmethod
async def _actual_connect(self) -> bool:
"""
Abstract method for subclasses to implement actual connection logic.
"""
pass
@abstractmethod
async def _actual_disconnect(self) -> None:
"""
Abstract method for subclasses to implement actual disconnection logic.
"""
pass
def add_data_callback(self, data_type: DataType, callback: Callable[[MarketDataPoint], None]) -> None:
"""
Add a callback function for specific data type.
Args:
data_type: Type of data to monitor
callback: Function to call when data is received
"""
self._callback_dispatcher.add_data_callback(data_type, callback)
def remove_data_callback(self, data_type: DataType, callback: Callable[[MarketDataPoint], None]) -> None:
"""
Remove a callback function for specific data type.
Args:
data_type: Type of data to stop monitoring
callback: Function to remove
"""
self._callback_dispatcher.remove_data_callback(data_type, callback)
async def _notify_callbacks(self, data_point: MarketDataPoint) -> None:
"""
Notify all registered callbacks for a data point.
Args:
data_point: Market data to distribute
"""
await self._callback_dispatcher.notify_callbacks(data_point)
# Update statistics
self._state_telemetry.increment_messages_processed()
self._state_telemetry._stats['last_message_time'] = data_point.timestamp # Direct update for now, will refactor
self._state_telemetry.update_data_received_timestamp()
self._state_telemetry.update_heartbeat()
def get_status(self) -> Dict[str, Any]:
"""
Get current collector status and statistics.
Returns:
Dictionary containing status information
"""
status = self._state_telemetry.get_status()
# Add BaseDataCollector specific information
status.update({
'symbols': list(self.symbols),
'data_types': [dt.value for dt in self.data_types],
'timeframes': self.timeframes,
'auto_restart': self.auto_restart
})
return status
def get_health_status(self) -> Dict[str, Any]:
"""
Get detailed health status for monitoring.
Returns:
Dictionary containing health information
"""
return self._state_telemetry.get_health_status()
def add_symbol(self, symbol: str) -> None:
"""
Add a new symbol to collect data for.
Args:
symbol: Trading symbol to add
"""
if symbol not in self.symbols:
self.symbols.add(symbol)
self._log_info(f"Added symbol: {symbol}")
# If collector is running, subscribe to new symbol
if self._state_telemetry.status == CollectorStatus.RUNNING:
# Note: This needs to be called from an async context
# Users should handle this appropriately
pass
def remove_symbol(self, symbol: str) -> None:
"""
Remove a symbol from data collection.
Args:
symbol: Trading symbol to remove
"""
if symbol in self.symbols:
self.symbols.remove(symbol)
self._log_info(f"Removed symbol: {symbol}")
# If collector is running, unsubscribe from symbol
if self._state_telemetry.status == CollectorStatus.RUNNING:
# Note: This needs to be called from an async context
# Users should handle this appropriately
pass
def validate_ohlcv_data(self, data: Dict[str, Any], symbol: str, timeframe: str) -> OHLCVData:
"""
Validate and convert raw OHLCV data to standardized format.
Args:
data: Raw OHLCV data dictionary
symbol: Trading symbol
timeframe: Timeframe (e.g., '1m', '5m', '1h')
Returns:
Validated OHLCVData object
Raises:
DataValidationError: If data validation fails
"""
return validate_ohlcv_data(data, symbol, timeframe)
def __repr__(self) -> str:
"""String representation of the collector."""
return f"<{self.__class__.__name__}({self.exchange_name}, {len(self.symbols)} symbols, {self.status.value})>"

View File

@@ -0,0 +1,365 @@
#!/usr/bin/env python3
"""
Data Collection Service
Production-ready service for cryptocurrency market data collection
with clean logging and robust error handling.
"""
import asyncio
import signal
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict, Any
import logging
import json
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# Set environment for clean production logging
import os
os.environ['DEBUG'] = 'false'
# Suppress verbose SQLAlchemy logging
logging.getLogger('sqlalchemy').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy.pool').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy.dialects').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy.orm').setLevel(logging.WARNING)
from .collector_manager import CollectorManager
from config.collector_service_config import CollectorServiceConfig
from .collector_factory import CollectorFactory
from database.connection import init_database
from utils.logger import get_logger
from utils.async_task_manager import TaskManager
class DataCollectionService:
"""Production data collection service with modular architecture."""
def __init__(self, config_path: str = "config/data_collection.json"):
"""Initialize the data collection service."""
self.config_path = config_path
self.logger = get_logger("data_collection_service", log_level="INFO", verbose=False)
# Initialize configuration and factory
self.service_config = CollectorServiceConfig(config_path, logger=self.logger)
self.config = self.service_config.load_config()
self.collector_factory = CollectorFactory(logger=self.logger)
# Core components
self.task_manager = TaskManager("data_collection_service", logger=self.logger)
self.collector_manager = CollectorManager(logger=self.logger, log_errors_only=True)
self.collectors: List = []
# Service state
self.running = False
self.start_time = None
self.shutdown_event = asyncio.Event()
# Statistics for monitoring
self.stats = {
'collectors_created': 0,
'collectors_running': 0,
'total_uptime_seconds': 0,
'last_activity': None,
'errors_count': 0
}
self.logger.info("🚀 Data Collection Service initialized")
self.logger.info(f"📁 Configuration: {config_path}")
def _sanitize_error(self, message: str) -> str:
"""
Sanitize error message to prevent leaking internal details.
Args:
message: Original error message
Returns:
Sanitized error message
"""
# Remove sensitive patterns that might leak internal information
sensitive_patterns = [
'password=',
'token=',
'key=',
'secret=',
'auth=',
'api_key=',
'api_secret=',
'access_token=',
'refresh_token='
]
sanitized = message
for pattern in sensitive_patterns:
if pattern.lower() in sanitized.lower():
# Replace the value part after = with [REDACTED]
parts = sanitized.split(pattern)
if len(parts) > 1:
# Find the end of the value (space, comma, or end of string)
value_part = parts[1]
end_chars = [' ', ',', ')', ']', '}', '\n', '\t']
end_idx = len(value_part)
for char in end_chars:
char_idx = value_part.find(char)
if char_idx != -1 and char_idx < end_idx:
end_idx = char_idx
# Replace the value with [REDACTED]
sanitized = parts[0] + pattern + '[REDACTED]' + value_part[end_idx:]
return sanitized
async def initialize_collectors(self) -> bool:
"""Initialize all data collectors based on configuration."""
try:
collectors = await self.collector_factory.create_collectors_from_config(self.config)
if not collectors:
self.logger.error("❌ No collectors were successfully created", exc_info=True)
return False
for collector in collectors:
self.collector_manager.add_collector(collector)
self.collectors.append(collector)
self.stats['collectors_created'] = len(collectors)
self.logger.info(f"✅ Successfully initialized {len(collectors)} data collectors")
return True
except (KeyError, AttributeError, TypeError) as e:
# Handle configuration and data structure errors
sanitized_message = self._sanitize_error(f"❌ Configuration error initializing collectors: {e}")
self.logger.error(sanitized_message, exc_info=True)
self.stats['errors_count'] += 1
return False
except (ConnectionError, OSError, IOError) as e:
# Handle connection and I/O related errors
sanitized_message = self._sanitize_error(f"❌ Connection/IO error initializing collectors: {e}")
self.logger.error(sanitized_message, exc_info=True)
self.stats['errors_count'] += 1
return False
except Exception as e:
# Catch any other unexpected errors
sanitized_message = self._sanitize_error(f"❌ Unexpected error initializing collectors: {e}")
self.logger.error(sanitized_message, exc_info=True)
self.stats['errors_count'] += 1
return False
async def start(self) -> bool:
"""Start the data collection service."""
try:
self.start_time = time.time()
self.running = True
self.logger.info("🚀 Starting Data Collection Service...")
self.logger.info("📊 Initializing database connection...")
init_database()
self.logger.info("✅ Database connection established")
# Start collector manager
self.logger.info("🔌 Starting data collectors...")
success = await self.collector_manager.start()
if success:
self.stats['collectors_running'] = len(self.collectors)
self.stats['last_activity'] = datetime.now()
self.logger.info("✅ Data Collection Service started successfully")
self.logger.info(f"📈 Active collectors: {self.stats['collectors_running']}")
return True
else:
self.logger.error("Failed to start data collectors", exc_info=True)
self.stats['errors_count'] += 1
return False
except (ConnectionError, OSError, IOError) as e:
# Handle database and connection errors
sanitized_message = self._sanitize_error(f"Database/Connection error starting service: {e}")
self.logger.error(sanitized_message, exc_info=True)
self.stats['errors_count'] += 1
return False
except (AttributeError, TypeError, ValueError) as e:
# Handle configuration and data validation errors
sanitized_message = self._sanitize_error(f"❌ Configuration error starting service: {e}")
self.logger.error(sanitized_message, exc_info=True)
self.stats['errors_count'] += 1
return False
except Exception as e:
# Catch any other unexpected errors
sanitized_message = self._sanitize_error(f"❌ Unexpected error starting service: {e}")
self.logger.error(sanitized_message, exc_info=True)
self.stats['errors_count'] += 1
return False
async def stop(self) -> None:
"""Stop the data collection service gracefully."""
try:
self.logger.info("🛑 Stopping Data Collection Service...")
self.running = False
# Stop all collectors
await self.collector_manager.stop()
# Update statistics
if self.start_time:
self.stats['total_uptime_seconds'] = time.time() - self.start_time
self.stats['collectors_running'] = 0
self.logger.info("✅ Data Collection Service stopped gracefully")
self.logger.info(f"📊 Total uptime: {self.stats['total_uptime_seconds']:.1f} seconds")
except (asyncio.CancelledError, KeyboardInterrupt):
# Handle graceful shutdown scenarios
self.logger.warning("Service shutdown was interrupted")
self.stats['errors_count'] += 1
except (ConnectionError, OSError, IOError) as e:
# Handle connection and I/O related errors during shutdown
sanitized_message = self._sanitize_error(f"Connection/IO error during service shutdown: {e}")
self.logger.error(sanitized_message, exc_info=True)
self.stats['errors_count'] += 1
except Exception as e:
# Catch any other unexpected errors during shutdown
sanitized_message = self._sanitize_error(f"Unexpected error during service shutdown: {e}")
self.logger.error(sanitized_message, exc_info=True)
self.stats['errors_count'] += 1
finally:
# Always cleanup task manager
await self.task_manager.shutdown(graceful=True)
def get_status(self) -> Dict[str, Any]:
"""Get current service status."""
current_time = time.time()
uptime = current_time - self.start_time if self.start_time else 0
return {
'running': self.running,
'uptime_seconds': uptime,
'uptime_hours': uptime / 3600,
'collectors_total': len(self.collectors),
'collectors_running': self.stats['collectors_running'],
'errors_count': self.stats['errors_count'],
'last_activity': self.stats['last_activity'],
'start_time': datetime.fromtimestamp(self.start_time) if self.start_time else None
}
def setup_signal_handlers(self) -> None:
"""Setup signal handlers for graceful shutdown."""
def signal_handler(signum, frame):
self.logger.info(f"📡 Received shutdown signal ({signum}), stopping gracefully...")
self.shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def run(self, duration_hours: Optional[float] = None) -> bool:
"""
Run the data collection service.
Args:
duration_hours: Optional duration to run (None = indefinite)
Returns:
bool: True if successful, False if error occurred
"""
self.setup_signal_handlers()
try:
# Initialize collectors
if not await self.initialize_collectors():
return False
# Start service
if not await self.start():
return False
# Service running notification
status = self.get_status()
if duration_hours:
self.logger.info(f"⏱️ Service will run for {duration_hours} hours")
else:
self.logger.info("⏱️ Service running indefinitely (until stopped)")
self.logger.info(f"📊 Active collectors: {status['collectors_running']}")
self.logger.info("🔍 Monitor with: python scripts/monitor_clean.py")
# Main service loop
update_interval = 600 # Status update every 10 minutes
last_update = time.time()
while not self.shutdown_event.is_set():
# Wait for shutdown signal or timeout
try:
await asyncio.wait_for(self.shutdown_event.wait(), timeout=1.0)
break
except asyncio.TimeoutError:
pass
current_time = time.time()
# Check duration limit
if duration_hours and self.start_time:
elapsed_hours = (current_time - self.start_time) / 3600
if elapsed_hours >= duration_hours:
self.logger.info(f"⏰ Completed {duration_hours} hour run")
break
# Periodic status update
if current_time - last_update >= update_interval:
if self.start_time:
elapsed_hours = (current_time - self.start_time) / 3600
self.logger.info(f"⏱️ Service uptime: {elapsed_hours:.1f} hours")
last_update = current_time
return True
except (asyncio.CancelledError, KeyboardInterrupt):
# Handle graceful shutdown scenarios
self.logger.info("Service run was cancelled gracefully")
return True
except (asyncio.TimeoutError, ConnectionError, OSError, IOError) as e:
# Handle timeout, connection and I/O related errors
sanitized_message = self._sanitize_error(f"Connection/Timeout error during service run: {e}")
self.logger.error(sanitized_message, exc_info=True)
self.stats['errors_count'] += 1
return False
except Exception as e:
# Catch any other unexpected errors
sanitized_message = self._sanitize_error(f"Unexpected service error: {e}")
self.logger.error(sanitized_message, exc_info=True)
self.stats['errors_count'] += 1
return False
finally:
await self.stop()
# Service entry point function
async def run_data_collection_service(
config_path: str = "config/data_collection.json",
duration_hours: Optional[float] = None
) -> bool:
"""Run the data collection service."""
service = DataCollectionService(config_path)
return await service.run(duration_hours)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Data Collection Service")
parser.add_argument("--config", default="config/data_collection.json", help="Configuration file path")
parser.add_argument("--duration", type=float, help="Duration to run in hours (default: indefinite)")
args = parser.parse_args()
# Run service
asyncio.run(run_data_collection_service(args.config, args.duration))

View File

@@ -0,0 +1,322 @@
"""
Collector Factory for data collection service.
This module handles the creation of data collectors with proper configuration
and error handling, separating collector creation logic from the main service.
"""
from typing import Dict, Any, List, Optional
from ..exchanges.factory import ExchangeFactory, ExchangeCollectorConfig
from .base_collector import DataType
class CollectorFactory:
"""Factory for creating and configuring data collectors."""
def __init__(self, logger=None):
"""
Initialize the collector factory.
Args:
logger: Logger instance for logging operations
"""
self.logger = logger
async def create_collector(
self,
exchange_name: str,
pair_config: Dict[str, Any],
data_collection_config: Dict[str, Any],
database_config: Dict[str, Any]
) -> Optional[Any]:
"""
Create a single data collector for a trading pair.
Args:
exchange_name: Name of the exchange (e.g., 'okx')
pair_config: Configuration for the trading pair
data_collection_config: Data collection settings
database_config: Database configuration settings
Returns:
Created collector instance or None if creation failed
"""
try:
symbol = pair_config['symbol']
# Validate and parse configuration
validated_config = self._validate_pair_config(pair_config)
if not validated_config:
return None
data_types = [DataType(dt) for dt in validated_config['data_types']]
timeframes = validated_config['timeframes']
# Create collector configuration
collector_config = self._create_collector_config(
exchange_name=exchange_name,
symbol=symbol,
data_types=data_types,
timeframes=timeframes,
data_collection_config=data_collection_config,
database_config=database_config
)
# Create collector using exchange factory
collector = ExchangeFactory.create_collector(collector_config)
if collector:
if self.logger:
self.logger.info(f"Created collector: {symbol} [{'/'.join(timeframes)}]")
return collector
else:
if self.logger:
self.logger.error(f"Failed to create collector for {symbol}", exc_info=True)
return None
except Exception as e:
symbol = pair_config.get('symbol', 'unknown')
if self.logger:
self.logger.error(f"Error creating collector for {symbol}: {e}", exc_info=True)
return None
def _validate_pair_config(self, pair_config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Validate trading pair configuration.
Args:
pair_config: Raw trading pair configuration
Returns:
Validated configuration or None if invalid
"""
try:
# Validate required fields
if 'symbol' not in pair_config:
if self.logger:
self.logger.error("Trading pair missing required 'symbol' field", exc_info=True)
return None
symbol = pair_config['symbol']
if not isinstance(symbol, str) or '-' not in symbol:
if self.logger:
self.logger.error(f"Invalid symbol format: {symbol}. Expected format: 'BASE-QUOTE'", exc_info=True)
return None
# Apply defaults and validate data types
data_types = pair_config.get('data_types', ['trade'])
valid_data_types = ['trade', 'orderbook', 'ticker', 'candle']
validated_data_types = []
for dt in data_types:
if dt in valid_data_types:
validated_data_types.append(dt)
else:
if self.logger:
self.logger.warning(f"Invalid data type '{dt}' for {symbol}, skipping")
if not validated_data_types:
validated_data_types = ['trade'] # Default fallback
# Validate timeframes
timeframes = pair_config.get('timeframes', ['1m', '5m'])
# OKX supports second-level timeframes for real-time candle aggregation
valid_timeframes = ['1s', '5s', '1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d']
validated_timeframes = []
for tf in timeframes:
if tf in valid_timeframes:
validated_timeframes.append(tf)
else:
if self.logger:
self.logger.warning(f"Invalid timeframe '{tf}' for {symbol}, skipping")
if not validated_timeframes:
validated_timeframes = ['1m', '5m'] # Default fallback
return {
'symbol': symbol,
'data_types': validated_data_types,
'timeframes': validated_timeframes,
'enabled': pair_config.get('enabled', True),
'channels': pair_config.get('channels', {})
}
except Exception as e:
if self.logger:
self.logger.error(f"Error validating pair config: {e}", exc_info=True)
return None
def _create_collector_config(
self,
exchange_name: str,
symbol: str,
data_types: List[DataType],
timeframes: List[str],
data_collection_config: Dict[str, Any],
database_config: Dict[str, Any]
) -> ExchangeCollectorConfig:
"""
Create exchange collector configuration.
Args:
exchange_name: Name of the exchange
symbol: Trading pair symbol
data_types: List of data types to collect
timeframes: List of timeframes for candle data
data_collection_config: Data collection settings
database_config: Database configuration
Returns:
Configured ExchangeCollectorConfig instance
"""
# Generate component name
component_name = f"{exchange_name}_collector_{symbol.replace('-', '_').lower()}"
# Build custom parameters - only include parameters the collector accepts
custom_params = {
'component_name': component_name,
'logger': self.logger,
'log_errors_only': True, # Clean logging - only errors and essential events
'force_update_candles': database_config.get('force_update_candles', False),
}
# Create and return collector configuration
return ExchangeCollectorConfig(
exchange=exchange_name,
symbol=symbol,
data_types=data_types,
timeframes=timeframes,
auto_restart=data_collection_config.get('auto_restart', True),
health_check_interval=data_collection_config.get('health_check_interval', 120.0),
store_raw_data=data_collection_config.get('store_raw_data', True),
custom_params=custom_params
)
async def create_collectors_from_config(
self,
config: Dict[str, Any]
) -> List[Any]:
"""
Create multiple collectors from service configuration.
Args:
config: Complete service configuration dictionary
Returns:
List of created collector instances
"""
collectors = []
try:
exchange_name = config.get('exchange', 'okx')
trading_pairs = config.get('trading_pairs', [])
data_collection_config = config.get('data_collection', {})
database_config = config.get('database', {})
# Filter enabled pairs
enabled_pairs = [pair for pair in trading_pairs if pair.get('enabled', True)]
if not enabled_pairs:
if self.logger:
self.logger.warning(f"No enabled trading pairs for {exchange_name}")
return collectors
if self.logger:
self.logger.info(f"Creating {len(enabled_pairs)} collectors for {exchange_name.upper()}")
# Create collectors for each enabled pair
for pair_config in enabled_pairs:
collector = await self.create_collector(
exchange_name=exchange_name,
pair_config=pair_config,
data_collection_config=data_collection_config,
database_config=database_config
)
if collector:
collectors.append(collector)
else:
symbol = pair_config.get('symbol', 'unknown')
if self.logger:
self.logger.error(f"Failed to create collector for {symbol}", exc_info=True)
if self.logger:
self.logger.info(f"Successfully created {len(collectors)} collectors")
return collectors
except Exception as e:
if self.logger:
self.logger.error(f"Error creating collectors from config: {e}", exc_info=True)
return collectors
def get_supported_exchanges(self) -> List[str]:
"""
Get list of supported exchanges.
Returns:
List of supported exchange names
"""
# This could be enhanced to dynamically discover available exchanges
return ['okx', 'binance'] # Add more as they become available
def get_supported_data_types(self) -> List[str]:
"""
Get list of supported data types.
Returns:
List of supported data type names
"""
return ['trade', 'orderbook', 'ticker', 'candle']
def get_supported_timeframes(self) -> List[str]:
"""
Get list of supported timeframes.
Returns:
List of supported timeframe strings
"""
return ['1s', '5s', '1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d']
def validate_collector_requirements(
self,
exchange: str,
data_types: List[str],
timeframes: List[str]
) -> Dict[str, Any]:
"""
Validate collector requirements against supported features.
Args:
exchange: Exchange name
data_types: Required data types
timeframes: Required timeframes
Returns:
Validation result with details
"""
result = {
'valid': True,
'errors': [],
'warnings': []
}
# Validate exchange
if exchange not in self.get_supported_exchanges():
result['valid'] = False
result['errors'].append(f"Unsupported exchange: {exchange}")
# Validate data types
supported_data_types = self.get_supported_data_types()
for dt in data_types:
if dt not in supported_data_types:
result['warnings'].append(f"Unsupported data type: {dt}")
# Validate timeframes
supported_timeframes = self.get_supported_timeframes()
for tf in timeframes:
if tf not in supported_timeframes:
result['warnings'].append(f"Unsupported timeframe: {tf}")
return result

View File

@@ -0,0 +1,226 @@
"""
Data Collector Manager for supervising and managing multiple data collectors.
This module provides centralized management of data collectors with health monitoring,
auto-recovery, and coordinated lifecycle management.
"""
import asyncio
from typing import Dict, List, Optional, Any, Set
from utils.logger import get_logger
from utils.async_task_manager import TaskManager
from .base_collector import BaseDataCollector, CollectorStatus
from .collector_types import ManagerStatus, CollectorConfig
from ..manager_components import (
CollectorLifecycleManager,
ManagerHealthMonitor,
ManagerStatsTracker,
ManagerLogger
)
class CollectorManager:
"""
Manages multiple data collectors with health monitoring and auto-recovery.
The manager is responsible for:
- Starting and stopping collectors
- Health monitoring and auto-restart
- Coordinated lifecycle management
- Status reporting and metrics
"""
def __init__(self,
manager_name: str = "collector_manager",
global_health_check_interval: float = 60.0,
restart_delay: float = 5.0,
logger = None,
log_errors_only: bool = False):
"""Initialize the collector manager with component-based architecture."""
self.manager_name = manager_name
self.restart_delay = restart_delay
# Initialize components
self.logger_manager = ManagerLogger(logger, log_errors_only)
self.task_manager = TaskManager(f"{manager_name}_tasks", logger=logger)
self.lifecycle_manager = CollectorLifecycleManager(self.logger_manager)
self.health_monitor = ManagerHealthMonitor(
global_health_check_interval, self.logger_manager, self.lifecycle_manager)
self.stats_tracker = ManagerStatsTracker(
30.0, self.logger_manager, self.lifecycle_manager, self.health_monitor)
# Manager state
self.status = ManagerStatus.STOPPED
self._running = False
if self.logger_manager.is_debug_enabled():
self.logger_manager.log_info(f"Initialized collector manager: {manager_name}")
def _sanitize_error(self, message: str) -> str:
"""
Sanitize error message to prevent leaking internal details.
Args:
message: Original error message
Returns:
Sanitized error message
"""
# Delegate to the logger manager's sanitization method
return self.logger_manager._sanitize_error(message)
def add_collector(self, collector: BaseDataCollector, config: Optional[CollectorConfig] = None) -> None:
"""Add a collector to be managed."""
self.lifecycle_manager.add_collector(collector, config)
def remove_collector(self, collector_name: str) -> bool:
"""Remove a collector from management."""
return self.lifecycle_manager.remove_collector(collector_name)
def enable_collector(self, collector_name: str) -> bool:
"""Enable a collector (will be started if manager is running)."""
return self.lifecycle_manager.enable_collector(collector_name)
def disable_collector(self, collector_name: str) -> bool:
"""Disable a collector (will be stopped if running)."""
return self.lifecycle_manager.disable_collector(collector_name)
async def start(self) -> bool:
"""Start the collector manager and all enabled collectors."""
if self.status in [ManagerStatus.RUNNING, ManagerStatus.STARTING]:
self.logger_manager.log_warning("Collector manager is already running or starting")
return True
self.logger_manager.log_info("Starting collector manager")
self.status = ManagerStatus.STARTING
try:
self._running = True
# Set running state for all components
self.lifecycle_manager.set_running_state(True)
self.health_monitor.set_running_state(True)
self.stats_tracker.set_running_state(True)
# Start collectors and monitoring
await self.lifecycle_manager.start_all_enabled_collectors()
await self.health_monitor.start_monitoring()
# Track health monitoring task with task manager
health_task = self.health_monitor.get_health_task()
if health_task:
# Transfer task to task manager for better tracking
self.task_manager._tasks.add(health_task)
self.task_manager._task_names[health_task] = "health_monitor"
health_task.add_done_callback(self.task_manager._task_done_callback)
# Start statistics cache updates
await self.stats_tracker.start_cache_updates()
self.status = ManagerStatus.RUNNING
enabled_count = len(self.lifecycle_manager.get_enabled_collectors())
self.logger_manager.log_info(f"Collector manager started - Managing {enabled_count} collectors")
return True
except (asyncio.CancelledError, KeyboardInterrupt):
# Handle graceful shutdown scenarios
self.status = ManagerStatus.ERROR
self.logger_manager.log_warning("Collector manager startup was cancelled")
return False
except (ConnectionError, OSError, IOError) as e:
# Handle connection and I/O related errors
self.status = ManagerStatus.ERROR
sanitized_message = self._sanitize_error(f"Connection/IO error starting collector manager: {e}")
self.logger_manager.log_error(sanitized_message, exc_info=True)
return False
except (AttributeError, TypeError, ValueError) as e:
# Handle configuration and data validation errors
self.status = ManagerStatus.ERROR
sanitized_message = self._sanitize_error(f"Configuration error starting collector manager: {e}")
self.logger_manager.log_error(sanitized_message, exc_info=True)
return False
except Exception as e:
# Catch any other unexpected errors
self.status = ManagerStatus.ERROR
sanitized_message = self._sanitize_error(f"Unexpected error starting collector manager: {e}")
self.logger_manager.log_error(sanitized_message, exc_info=True)
return False
async def stop(self) -> None:
"""Stop the collector manager and all collectors."""
if self.status == ManagerStatus.STOPPED:
self.logger_manager.log_warning("Collector manager is already stopped")
return
self.logger_manager.log_info("Stopping collector manager")
self.status = ManagerStatus.STOPPING
self._running = False
try:
# Set running state for all components
self.lifecycle_manager.set_running_state(False)
self.health_monitor.set_running_state(False)
self.stats_tracker.set_running_state(False)
# Stop monitoring and statistics
await self.health_monitor.stop_monitoring()
await self.stats_tracker.stop_cache_updates()
# Gracefully shutdown task manager
await self.task_manager.shutdown(graceful=True)
# Stop all collectors
await self.lifecycle_manager.stop_all_collectors()
self.status = ManagerStatus.STOPPED
self.logger_manager.log_info("Collector manager stopped")
except (asyncio.CancelledError, KeyboardInterrupt):
# Handle graceful shutdown scenarios
self.status = ManagerStatus.ERROR
self.logger_manager.log_warning("Collector manager shutdown was interrupted")
except (ConnectionError, OSError, IOError) as e:
# Handle connection and I/O related errors during shutdown
self.status = ManagerStatus.ERROR
sanitized_message = self._sanitize_error(f"Connection/IO error stopping collector manager: {e}")
self.logger_manager.log_error(sanitized_message, exc_info=True)
except Exception as e:
# Catch any other unexpected errors during shutdown
self.status = ManagerStatus.ERROR
sanitized_message = self._sanitize_error(f"Unexpected error stopping collector manager: {e}")
self.logger_manager.log_error(sanitized_message, exc_info=True)
async def restart_collector(self, collector_name: str) -> bool:
"""Restart a specific collector."""
return await self.lifecycle_manager.restart_collector(collector_name)
async def restart_all_collectors(self) -> Dict[str, bool]:
"""Restart all enabled collectors."""
return await self.lifecycle_manager.restart_all_collectors()
def get_status(self, force_refresh: bool = False) -> Dict[str, Any]:
"""Get manager status and statistics."""
status_dict = self.stats_tracker.get_status(force_refresh)
status_dict['manager_status'] = self.status.value
return status_dict
def get_collector_status(self, collector_name: str) -> Optional[Dict[str, Any]]:
"""Get status for a specific collector."""
return self.stats_tracker.get_collector_status(collector_name)
def list_collectors(self) -> List[str]:
"""List all managed collector names."""
return self.stats_tracker.list_collectors()
def get_running_collectors(self) -> List[str]:
"""Get names of currently running collectors."""
return self.stats_tracker.get_running_collectors()
def get_failed_collectors(self) -> List[str]:
"""Get names of failed or unhealthy collectors."""
return self.stats_tracker.get_failed_collectors()
def __repr__(self) -> str:
"""String representation of the manager."""
return f"CollectorManager(name={self.manager_name}, status={self.status.value})"

View File

@@ -0,0 +1,30 @@
"""
Data types and structures for collector management.
This module contains shared data structures used across the collector management system.
"""
from dataclasses import dataclass
from enum import Enum
from typing import List
class ManagerStatus(Enum):
"""Status of the collector manager."""
STOPPED = "stopped"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
ERROR = "error"
@dataclass
class CollectorConfig:
"""Configuration for a data collector."""
name: str
exchange: str
symbols: List[str]
data_types: List[str]
auto_restart: bool = True
health_check_interval: float = 30.0
enabled: bool = True