TCPDashboard/data/manager_components/collector_lifecycle_manager.py
Vasily.onl f6cb1485b1 Implement data collection architecture with modular components
- Introduced a comprehensive data collection framework, including `CollectorServiceConfig`, `BaseDataCollector`, and `CollectorManager`, enhancing modularity and maintainability.
- Developed `CollectorFactory` for streamlined collector creation, promoting separation of concerns and improved configuration handling.
- Enhanced `DataCollectionService` to utilize the new architecture, ensuring robust error handling and logging practices.
- Added `TaskManager` for efficient management of asynchronous tasks, improving performance and resource management.
- Implemented health monitoring and auto-recovery features in `CollectorManager`, ensuring reliable operation of data collectors.
- Updated imports across the codebase to reflect the new structure, ensuring consistent access to components.

These changes significantly improve the architecture and maintainability of the data collection service, aligning with project standards for modularity, performance, and error handling.
2025-06-10 13:40:28 +08:00

342 lines
13 KiB
Python

"""
Collector Lifecycle Manager for handling collector lifecycle operations.
This module handles the lifecycle of data collectors including adding, removing,
enabling, disabling, starting, and restarting collectors.
"""
import asyncio
import time
from typing import Dict, Set, Optional
from ..collector.base_collector import BaseDataCollector, CollectorStatus
from ..collector.collector_types import CollectorConfig
class CollectorLifecycleManager:
"""Manages the lifecycle of data collectors."""
def __init__(self, logger_manager=None):
"""
Initialize the lifecycle manager.
Args:
logger_manager: Logger manager instance for logging operations
"""
self.logger_manager = logger_manager
# Collector storage
self._collectors: Dict[str, BaseDataCollector] = {}
self._collector_configs: Dict[str, CollectorConfig] = {}
self._enabled_collectors: Set[str] = set()
# Manager state
self._running = False
self._stats = {'total_collectors': 0, 'restarts_performed': 0}
def set_running_state(self, running: bool) -> None:
"""Set the running state of the manager."""
self._running = running
def get_stats(self) -> Dict:
"""Get lifecycle statistics."""
return self._stats.copy()
def add_collector(self,
collector: BaseDataCollector,
config: Optional[CollectorConfig] = None) -> None:
"""
Add a collector to be managed.
Args:
collector: Data collector instance
config: Optional configuration (will create default if not provided)
"""
# Use a more unique name to avoid duplicates
collector_name = f"{collector.exchange_name}_{int(time.time() * 1000000) % 1000000}"
# Ensure unique name
counter = 1
base_name = collector_name
while collector_name in self._collectors:
collector_name = f"{base_name}_{counter}"
counter += 1
if config is None:
config = CollectorConfig(
name=collector_name,
exchange=collector.exchange_name,
symbols=list(collector.symbols),
data_types=[dt.value for dt in collector.data_types],
auto_restart=collector.auto_restart,
health_check_interval=collector._state_telemetry.health_check_interval
)
self._collectors[collector_name] = collector
self._collector_configs[collector_name] = config
if config.enabled:
self._enabled_collectors.add(collector_name)
self._stats['total_collectors'] = len(self._collectors)
if self.logger_manager:
self.logger_manager.log_info(
f"Added collector: {collector_name} ({collector.exchange_name}) - "
f"Symbols: {', '.join(collector.symbols)} - Enabled: {config.enabled}"
)
def remove_collector(self, collector_name: str) -> bool:
"""
Remove a collector from management.
Args:
collector_name: Name of the collector to remove
Returns:
True if removed successfully, False if not found
"""
if collector_name not in self._collectors:
if self.logger_manager:
self.logger_manager.log_warning(f"Collector not found: {collector_name}")
return False
# Stop the collector first (only if event loop is running)
collector = self._collectors[collector_name]
if collector.status != CollectorStatus.STOPPED:
try:
asyncio.create_task(collector.stop(force=True))
except RuntimeError:
# No event loop running, just log
if self.logger_manager:
self.logger_manager.log_info(
f"Collector {collector_name} will be removed without stopping (no event loop)"
)
# Remove from management
del self._collectors[collector_name]
del self._collector_configs[collector_name]
self._enabled_collectors.discard(collector_name)
self._stats['total_collectors'] = len(self._collectors)
if self.logger_manager:
self.logger_manager.log_info(f"Removed collector: {collector_name}")
return True
def enable_collector(self, collector_name: str) -> bool:
"""
Enable a collector (will be started if manager is running).
Args:
collector_name: Name of the collector to enable
Returns:
True if enabled successfully, False if not found
"""
if collector_name not in self._collectors:
if self.logger_manager:
self.logger_manager.log_warning(f"Collector not found: {collector_name}")
return False
self._enabled_collectors.add(collector_name)
self._collector_configs[collector_name].enabled = True
# Start the collector if manager is running (only if event loop is running)
if self._running:
try:
asyncio.create_task(self._start_collector(collector_name))
except RuntimeError:
# No event loop running, will be started when manager starts
if self.logger_manager:
self.logger_manager.log_debug(
f"Collector {collector_name} enabled but will start when manager starts"
)
if self.logger_manager:
self.logger_manager.log_info(f"Enabled collector: {collector_name}")
return True
def disable_collector(self, collector_name: str) -> bool:
"""
Disable a collector (will be stopped if running).
Args:
collector_name: Name of the collector to disable
Returns:
True if disabled successfully, False if not found
"""
if collector_name not in self._collectors:
if self.logger_manager:
self.logger_manager.log_warning(f"Collector not found: {collector_name}")
return False
self._enabled_collectors.discard(collector_name)
self._collector_configs[collector_name].enabled = False
# Stop the collector (only if event loop is running)
collector = self._collectors[collector_name]
try:
asyncio.create_task(collector.stop(force=True))
except RuntimeError:
# No event loop running, just log
if self.logger_manager:
self.logger_manager.log_debug(
f"Collector {collector_name} disabled but cannot stop (no event loop)"
)
if self.logger_manager:
self.logger_manager.log_info(f"Disabled collector: {collector_name}")
return True
async def _start_collector(self, collector_name: str) -> bool:
"""
Start a specific collector.
Args:
collector_name: Name of the collector to start
Returns:
True if started successfully, False otherwise
"""
if collector_name not in self._collectors:
if self.logger_manager:
self.logger_manager.log_warning(f"Collector not found: {collector_name}")
return False
collector = self._collectors[collector_name]
try:
success = await collector.start()
if success:
if self.logger_manager:
self.logger_manager.log_info(f"Started collector: {collector_name}")
else:
if self.logger_manager:
self.logger_manager.log_error(f"Failed to start collector: {collector_name}")
return success
except Exception as e:
if self.logger_manager:
self.logger_manager.log_error(f"Error starting collector {collector_name}: {e}", exc_info=True)
return False
async def restart_collector(self, collector_name: str) -> bool:
"""
Restart a specific collector.
Args:
collector_name: Name of the collector to restart
Returns:
True if restarted successfully, False otherwise
"""
if collector_name not in self._collectors:
if self.logger_manager:
self.logger_manager.log_warning(f"Collector not found: {collector_name}")
return False
collector = self._collectors[collector_name]
if self.logger_manager:
self.logger_manager.log_info(f"Restarting collector: {collector_name}")
try:
success = await collector.restart()
if success:
self._stats['restarts_performed'] += 1
if self.logger_manager:
self.logger_manager.log_info(f"Successfully restarted collector: {collector_name}")
else:
if self.logger_manager:
self.logger_manager.log_error(f"Failed to restart collector: {collector_name}")
return success
except Exception as e:
if self.logger_manager:
self.logger_manager.log_error(f"Error restarting collector {collector_name}: {e}", exc_info=True)
return False
async def restart_all_collectors(self) -> Dict[str, bool]:
"""
Restart all enabled collectors.
Returns:
Dictionary mapping collector names to restart success status
"""
if self.logger_manager:
self.logger_manager.log_info("Restarting all enabled collectors")
results = {}
restart_tasks = []
for collector_name in self._enabled_collectors:
task = asyncio.create_task(self.restart_collector(collector_name))
restart_tasks.append((collector_name, task))
# Wait for all restarts to complete
for collector_name, task in restart_tasks:
try:
results[collector_name] = await task
except Exception as e:
if self.logger_manager:
self.logger_manager.log_error(f"Error restarting {collector_name}: {e}", exc_info=True)
results[collector_name] = False
successful_restarts = sum(1 for success in results.values() if success)
if self.logger_manager:
self.logger_manager.log_info(
f"Restart complete - {successful_restarts}/{len(results)} collectors restarted successfully"
)
return results
async def start_all_enabled_collectors(self) -> None:
"""Start all enabled collectors."""
start_tasks = []
for collector_name in self._enabled_collectors:
task = asyncio.create_task(self._start_collector(collector_name))
start_tasks.append(task)
# Wait for all collectors to start (with timeout)
if start_tasks:
try:
await asyncio.wait_for(asyncio.gather(*start_tasks, return_exceptions=True), timeout=30.0)
except asyncio.TimeoutError:
if self.logger_manager:
self.logger_manager.log_warning("Some collectors took too long to start")
async def stop_all_collectors(self) -> None:
"""Stop all collectors."""
stop_tasks = []
for collector in self._collectors.values():
task = asyncio.create_task(collector.stop(force=True))
stop_tasks.append(task)
# Wait for all collectors to stop (with timeout)
if stop_tasks:
try:
await asyncio.wait_for(asyncio.gather(*stop_tasks, return_exceptions=True), timeout=30.0)
except asyncio.TimeoutError:
if self.logger_manager:
self.logger_manager.log_warning("Some collectors took too long to stop")
# Getters for data access
def get_collectors(self) -> Dict[str, BaseDataCollector]:
"""Get all collectors."""
return self._collectors
def get_collector_configs(self) -> Dict[str, CollectorConfig]:
"""Get all collector configurations."""
return self._collector_configs
def get_enabled_collectors(self) -> Set[str]:
"""Get enabled collector names."""
return self._enabled_collectors
def get_collector(self, name: str) -> Optional[BaseDataCollector]:
"""Get a specific collector by name."""
return self._collectors.get(name)
def get_collector_config(self, name: str) -> Optional[CollectorConfig]:
"""Get a specific collector config by name."""
return self._collector_configs.get(name)