From 74d7e1ab2c21a44e2124b53621884612c4a938d4 Mon Sep 17 00:00:00 2001 From: "Vasily.onl" Date: Tue, 3 Jun 2025 12:08:43 +0800 Subject: [PATCH] docs --- docs/components/data_collectors.md | 795 ++++++++++++----------- docs/components/logging.md | 535 ++++++++------- docs/data-collection-service.md | 481 -------------- docs/logging_system.md | 292 --------- docs/services/data_collection_service.md | 782 ++++++++++++++++++++++ 5 files changed, 1476 insertions(+), 1409 deletions(-) delete mode 100644 docs/data-collection-service.md delete mode 100644 docs/logging_system.md create mode 100644 docs/services/data_collection_service.md diff --git a/docs/components/data_collectors.md b/docs/components/data_collectors.md index c28931e..0dae42a 100644 --- a/docs/components/data_collectors.md +++ b/docs/components/data_collectors.md @@ -4,6 +4,8 @@ The Data Collector System provides a robust, scalable framework for collecting real-time market data from cryptocurrency exchanges. It features comprehensive health monitoring, automatic recovery, centralized management, and a modular exchange-based architecture designed for production trading environments. +This documentation covers the **core collector components**. For the high-level service layer that orchestrates these collectors, see [Data Collection Service](../services/data_collection_service.md). + ## Key Features ### 🏗️ **Modular Exchange Architecture** @@ -28,7 +30,7 @@ The Data Collector System provides a robust, scalable framework for collecting r - **Real-time Status**: Detailed status reporting for all collectors - **Performance Metrics**: Message counts, uptime, error rates, restart counts - **Health Analytics**: Connection state, data freshness, error tracking -- **Logging Integration**: Enhanced logging with configurable verbosity +- **Conditional Logging**: Enhanced logging with configurable verbosity (see [Logging System](logging.md)) - **Multi-Timeframe Support**: Sub-second to daily candle aggregation (1s, 5s, 10s, 15s, 30s, 1m, 5m, 15m, 1h, 4h, 1d) ### 🛢️ **Database Integration** @@ -74,7 +76,7 @@ The Data Collector System provides a robust, scalable framework for collecting r ### Exchange Module Structure -The new modular architecture organizes exchange implementations: +The modular architecture organizes exchange implementations: ``` data/ @@ -103,8 +105,12 @@ data/ import asyncio from data.exchanges import ExchangeFactory, ExchangeCollectorConfig, create_okx_collector from data.base_collector import DataType +from utils.logger import get_logger async def main(): + # Create logger for the collector + logger = get_logger('okx_collector', verbose=True) + # Method 1: Using factory with configuration config = ExchangeCollectorConfig( exchange='okx', @@ -115,12 +121,13 @@ async def main(): store_raw_data=True ) - collector = ExchangeFactory.create_collector(config) + collector = ExchangeFactory.create_collector(config, logger=logger) # Method 2: Using convenience function okx_collector = create_okx_collector( symbol='ETH-USDT', - data_types=[DataType.TRADE, DataType.ORDERBOOK] + data_types=[DataType.TRADE, DataType.ORDERBOOK], + logger=logger ) # Add data callback @@ -141,14 +148,20 @@ async def main(): asyncio.run(main()) ``` -### 2. Creating Multiple Collectors +### 2. Creating Multiple Collectors with Manager ```python import asyncio from data.exchanges import ExchangeFactory, ExchangeCollectorConfig from data.base_collector import DataType +from data.collector_manager import CollectorManager +from utils.logger import get_logger async def main(): + # Create manager with logging + manager_logger = get_logger('collector_manager', verbose=True) + manager = CollectorManager(logger=manager_logger) + # Create multiple collectors using factory configs = [ ExchangeCollectorConfig('okx', 'BTC-USDT', [DataType.TRADE, DataType.ORDERBOOK]), @@ -156,20 +169,22 @@ async def main(): ExchangeCollectorConfig('okx', 'SOL-USDT', [DataType.ORDERBOOK]) ] - collectors = ExchangeFactory.create_multiple_collectors(configs) + # Create collectors with individual loggers + for config in configs: + collector_logger = get_logger(f'okx_{config.symbol.lower().replace("-", "_")}') + collector = ExchangeFactory.create_collector(config, logger=collector_logger) + manager.add_collector(collector) - print(f"Created {len(collectors)} collectors") + print(f"Created {len(manager.list_collectors())} collectors") # Start all collectors - for collector in collectors: - await collector.start() + await manager.start() # Monitor await asyncio.sleep(60) # Stop all - for collector in collectors: - await collector.stop() + await manager.stop() asyncio.run(main()) ``` @@ -189,7 +204,9 @@ def __init__(self, data_types: Optional[List[DataType]] = None, component_name: Optional[str] = None, auto_restart: bool = True, - health_check_interval: float = 30.0) + health_check_interval: float = 30.0, + logger: Optional[logging.Logger] = None, + log_errors_only: bool = False) ``` **Parameters:** @@ -199,6 +216,8 @@ def __init__(self, - `component_name`: Name for logging (default: based on exchange_name) - `auto_restart`: Enable automatic restart on failures (default: True) - `health_check_interval`: Seconds between health checks (default: 30.0) +- `logger`: Logger instance for conditional logging (default: None) +- `log_errors_only`: Only log error-level messages (default: False) #### Abstract Methods @@ -236,6 +255,18 @@ def get_health_status(self) -> Dict[str, Any] def validate_ohlcv_data(self, data: Dict[str, Any], symbol: str, timeframe: str) -> OHLCVData ``` +#### Conditional Logging Methods + +All collectors support conditional logging (see [Logging System](logging.md) for details): + +```python +def _log_debug(self, message: str) -> None # Debug messages (if not errors-only) +def _log_info(self, message: str) -> None # Info messages (if not errors-only) +def _log_warning(self, message: str) -> None # Warning messages (if not errors-only) +def _log_error(self, message: str, exc_info: bool = False) -> None # Always logged +def _log_critical(self, message: str, exc_info: bool = False) -> None # Always logged +``` + #### Status Information The `get_status()` method returns comprehensive status information: @@ -294,9 +325,18 @@ Manages multiple data collectors with coordinated lifecycle and health monitorin def __init__(self, manager_name: str = "collector_manager", global_health_check_interval: float = 60.0, - restart_delay: float = 5.0) + restart_delay: float = 5.0, + logger: Optional[logging.Logger] = None, + log_errors_only: bool = False) ``` +**Parameters:** +- `manager_name`: Name for the manager (used in logging) +- `global_health_check_interval`: Seconds between global health checks +- `restart_delay`: Delay between restart attempts +- `logger`: Logger instance for conditional logging (default: None) +- `log_errors_only`: Only log error-level messages (default: False) + #### Public Methods ```python @@ -419,18 +459,24 @@ Collectors are automatically restarted when: ### Failure Handling ```python -# Configure failure handling +# Configure failure handling with conditional logging +from utils.logger import get_logger + +logger = get_logger('my_collector', verbose=True) + collector = MyCollector( symbols=["BTC-USDT"], auto_restart=True, # Enable auto-restart - health_check_interval=30.0 # Check every 30 seconds + health_check_interval=30.0, # Check every 30 seconds + logger=logger, # Enable logging + log_errors_only=False # Log all levels ) # The collector will automatically: # 1. Detect failures within 30 seconds # 2. Attempt reconnection with exponential backoff # 3. Restart up to 5 times (configurable) -# 4. Log all recovery attempts +# 4. Log all recovery attempts (if logger provided) # 5. Report status to manager ``` @@ -441,10 +487,9 @@ collector = MyCollector( The system respects these environment variables: ```bash -# Logging configuration -LOG_LEVEL=INFO # Logging level (DEBUG, INFO, WARN, ERROR) -LOG_CLEANUP=true # Enable automatic log cleanup -LOG_MAX_FILES=30 # Maximum log files to retain +# Logging configuration (see logging.md for details) +VERBOSE_LOGGING=true # Enable console logging +LOG_TO_CONSOLE=true # Alternative verbose setting # Health monitoring DEFAULT_HEALTH_CHECK_INTERVAL=30 # Default health check interval (seconds) @@ -456,20 +501,29 @@ RECONNECT_DELAY=5 # Delay between reconnect attempts (seconds) ### Programmatic Configuration ```python -# Configure individual collector +from utils.logger import get_logger + +# Configure individual collector with conditional logging +logger = get_logger('custom_collector', verbose=True) + collector = MyCollector( exchange_name="custom_exchange", symbols=["BTC-USDT", "ETH-USDT"], data_types=[DataType.TICKER, DataType.TRADE], auto_restart=True, - health_check_interval=15.0 # Check every 15 seconds + health_check_interval=15.0, # Check every 15 seconds + logger=logger, # Enable logging + log_errors_only=False # Log all message types ) -# Configure manager +# Configure manager with conditional logging +manager_logger = get_logger('production_manager', verbose=False) manager = CollectorManager( manager_name="production_manager", global_health_check_interval=30.0, # Global checks every 30s - restart_delay=10.0 # 10s delay between restarts + restart_delay=10.0, # 10s delay between restarts + logger=manager_logger, # Manager logging + log_errors_only=True # Only log errors for manager ) # Configure specific collector in manager @@ -488,17 +542,22 @@ manager.add_collector(collector, config) ## Best Practices -### 1. Collector Implementation +### 1. Collector Implementation with Conditional Logging ```python +from utils.logger import get_logger +from data.base_collector import BaseDataCollector, DataType + class ProductionCollector(BaseDataCollector): - def __init__(self, exchange_name: str, symbols: list): + def __init__(self, exchange_name: str, symbols: list, logger=None): super().__init__( exchange_name=exchange_name, symbols=symbols, data_types=[DataType.TICKER, DataType.TRADE], auto_restart=True, # Always enable auto-restart - health_check_interval=30.0 # Reasonable interval + health_check_interval=30.0, # Reasonable interval + logger=logger, # Pass logger for conditional logging + log_errors_only=False # Log all levels ) # Connection management @@ -514,6 +573,8 @@ class ProductionCollector(BaseDataCollector): async def connect(self) -> bool: """Implement robust connection logic.""" try: + self._log_info("Establishing connection to exchange") + # Use connection pooling for reliability self.connection_pool = await create_connection_pool( self.exchange_name, @@ -523,10 +584,11 @@ class ProductionCollector(BaseDataCollector): # Test connection await self.connection_pool.ping() + self._log_info("Connection established successfully") return True except Exception as e: - self.logger.error(f"Connection failed: {e}") + self._log_error(f"Connection failed: {e}", exc_info=True) return False async def _process_message(self, message) -> Optional[MarketDataPoint]: @@ -537,14 +599,17 @@ class ProductionCollector(BaseDataCollector): # Data validation if not self.data_validator.validate(message): - self.logger.warning(f"Invalid message: {message}") + self._log_warning(f"Invalid message format received") return None # Metrics collection self.metrics.increment('messages_processed') + # Log detailed processing (only if not errors-only) + self._log_debug(f"Processing message for {message.get('symbol', 'unknown')}") + # Create standardized data point - return MarketDataPoint( + data_point = MarketDataPoint( exchange=self.exchange_name, symbol=message['symbol'], timestamp=self._parse_timestamp(message['timestamp']), @@ -552,16 +617,19 @@ class ProductionCollector(BaseDataCollector): data=self._normalize_data(message) ) + self._log_debug(f"Successfully processed data point for {data_point.symbol}") + return data_point + except Exception as e: self.metrics.increment('processing_errors') - self.logger.error(f"Message processing failed: {e}") + self._log_error(f"Message processing failed: {e}", exc_info=True) raise # Let health monitor handle it ``` ### 2. Error Handling ```python -# Implement proper error handling +# Implement proper error handling with conditional logging class RobustCollector(BaseDataCollector): async def _handle_messages(self) -> None: """Handle messages with proper error management.""" @@ -583,43 +651,56 @@ class RobustCollector(BaseDataCollector): except asyncio.TimeoutError: # No data received - let health monitor handle + self._log_warning("Message receive timeout") raise ConnectionError("Message receive timeout") except WebSocketError as e: # WebSocket specific errors - self.logger.error(f"WebSocket error: {e}") + self._log_error(f"WebSocket error: {e}") raise ConnectionError(f"WebSocket failed: {e}") except ValidationError as e: # Data validation errors - don't restart for these - self.logger.warning(f"Data validation failed: {e}") + self._log_warning(f"Data validation failed: {e}") # Continue without raising - these are data issues, not connection issues except Exception as e: # Unexpected errors - trigger restart - self.logger.error(f"Unexpected error: {e}") + self._log_error(f"Unexpected error in message handling: {e}", exc_info=True) raise ``` -### 3. Manager Setup +### 3. Manager Setup with Hierarchical Logging ```python +from utils.logger import get_logger + async def setup_production_system(): - """Setup production collector system.""" + """Setup production collector system with conditional logging.""" - # Create manager with appropriate settings + # Create manager with its own logger + manager_logger = get_logger('crypto_trading_system', verbose=True) manager = CollectorManager( manager_name="crypto_trading_system", global_health_check_interval=60.0, # Check every minute - restart_delay=30.0 # 30s between restarts + restart_delay=30.0, # 30s between restarts + logger=manager_logger, # Manager logging + log_errors_only=False # Log all levels for manager ) - # Add primary data sources + # Add primary data sources with individual loggers exchanges = ['okx', 'binance', 'coinbase'] symbols = ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'AVAX-USDT'] for exchange in exchanges: - collector = create_collector(exchange, symbols) + # Create individual logger for each exchange + exchange_logger = get_logger(f'{exchange}_collector', verbose=True) + + collector = create_collector( + exchange, + symbols, + logger=exchange_logger # Individual collector logging + ) # Configure for production config = CollectorConfig( @@ -657,19 +738,21 @@ async def main(): # Alert on failures await send_alert(f"Collectors failed: {manager.get_failed_collectors()}") - # Log status every 5 minutes + # Log status every 5 minutes (if manager has logging enabled) await asyncio.sleep(300) ``` ### 4. Monitoring Integration ```python -# Integrate with monitoring systems +# Integrate with monitoring systems and conditional logging import prometheus_client from utils.logger import get_logger class MonitoredCollector(BaseDataCollector): def __init__(self, *args, **kwargs): + # Extract logger before passing to parent + logger = kwargs.get('logger') super().__init__(*args, **kwargs) # Prometheus metrics @@ -707,6 +790,9 @@ class MonitoredCollector(BaseDataCollector): exchange=self.exchange_name ).set(status['statistics']['uptime_seconds']) + # Log metrics update (only if debug logging enabled) + self._log_debug(f"Updated metrics for {data_point.symbol}") + # Call parent await super()._notify_callbacks(data_point) @@ -717,6 +803,9 @@ class MonitoredCollector(BaseDataCollector): error_type='connection' ).inc() + # Always log connection errors + self._log_error("Connection error occurred") + return await super()._handle_connection_error() ``` @@ -730,8 +819,12 @@ class MonitoredCollector(BaseDataCollector): **Solutions**: ```python -# Check connection details -collector = MyCollector(symbols=["BTC-USDT"]) +# Check connection details with debugging +from utils.logger import get_logger + +debug_logger = get_logger('debug_collector', verbose=True) +collector = MyCollector(symbols=["BTC-USDT"], logger=debug_logger) + success = await collector.start() if not success: status = collector.get_status() @@ -750,11 +843,15 @@ if not success: **Solutions**: ```python -# Adjust health check intervals +# Adjust health check intervals and enable detailed logging +logger = get_logger('troubleshoot_collector', verbose=True) + collector = MyCollector( symbols=["BTC-USDT"], health_check_interval=60.0, # Increase interval - auto_restart=True + auto_restart=True, + logger=logger, # Enable detailed logging + log_errors_only=False # Log all message types ) # Check for: @@ -770,8 +867,9 @@ collector = MyCollector( **Solutions**: ```python -# Check data flow -collector = MyCollector(symbols=["BTC-USDT"]) +# Check data flow with debug logging +logger = get_logger('data_debug', verbose=True) +collector = MyCollector(symbols=["BTC-USDT"], logger=logger) def debug_callback(data_point): print(f"Received: {data_point}") @@ -791,353 +889,32 @@ collector.add_data_callback(DataType.TICKER, debug_callback) **Solutions**: ```python -# Implement proper cleanup +# Implement proper cleanup with logging class CleanCollector(BaseDataCollector): async def disconnect(self): """Ensure proper cleanup.""" + self._log_info("Starting cleanup process") + # Clear buffers - self.message_buffer.clear() + if hasattr(self, 'message_buffer'): + self.message_buffer.clear() + self._log_debug("Cleared message buffer") # Close connections if self.websocket: await self.websocket.close() self.websocket = None + self._log_debug("Closed WebSocket connection") # Clear callbacks for callback_list in self._data_callbacks.values(): callback_list.clear() + self._log_debug("Cleared callbacks") await super().disconnect() + self._log_info("Cleanup completed") ``` -### Performance Optimization - -#### 1. Batch Processing - -```python -class BatchingCollector(BaseDataCollector): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.message_batch = [] - self.batch_size = 100 - self.batch_timeout = 1.0 - - async def _handle_messages(self): - """Batch process messages for efficiency.""" - message = await self.websocket.receive() - self.message_batch.append(message) - - # Process batch when full or timeout - if (len(self.message_batch) >= self.batch_size or - time.time() - self.last_batch_time > self.batch_timeout): - await self._process_batch() - - async def _process_batch(self): - """Process messages in batch.""" - batch = self.message_batch.copy() - self.message_batch.clear() - self.last_batch_time = time.time() - - for message in batch: - data_point = await self._process_message(message) - if data_point: - await self._notify_callbacks(data_point) -``` - -#### 2. Connection Pooling - -```python -class PooledCollector(BaseDataCollector): - async def connect(self) -> bool: - """Use connection pooling for better performance.""" - try: - # Create connection pool - self.connection_pool = await aiohttp.ClientSession( - connector=aiohttp.TCPConnector( - limit=10, # Pool size - limit_per_host=5, # Per-host limit - keepalive_timeout=300, # Keep connections alive - enable_cleanup_closed=True - ) - ) - return True - except Exception: - return False -``` - -### Logging and Debugging - -#### Enable Debug Logging - -```python -import os -os.environ['LOG_LEVEL'] = 'DEBUG' - -# Collector will now log detailed information -collector = MyCollector(symbols=["BTC-USDT"]) -await collector.start() - -# Check logs in ./logs/ directory -# - collector_debug.log: Debug information -# - collector_info.log: General information -# - collector_error.log: Error messages -``` - -#### Custom Logging - -```python -from utils.logger import get_logger - -class CustomCollector(BaseDataCollector): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - # Add custom logger - self.performance_logger = get_logger( - f"{self.exchange_name}_performance", - verbose=False - ) - - async def _process_message(self, message): - start_time = time.time() - - try: - result = await super()._process_message(message) - - # Log performance - processing_time = time.time() - start_time - self.performance_logger.info( - f"Message processed in {processing_time:.3f}s" - ) - - return result - except Exception as e: - self.performance_logger.error( - f"Processing failed after {time.time() - start_time:.3f}s: {e}" - ) - raise -``` - -## Integration Examples - -### Django Integration - -```python -# Django management command -from django.core.management.base import BaseCommand -from data import CollectorManager -import asyncio - -class Command(BaseCommand): - help = 'Start crypto data collectors' - - def handle(self, *args, **options): - async def run_collectors(): - manager = CollectorManager("django_collectors") - - # Add collectors - from myapp.collectors import OKXCollector, BinanceCollector - manager.add_collector(OKXCollector(['BTC-USDT'])) - manager.add_collector(BinanceCollector(['ETH-USDT'])) - - # Start system - await manager.start() - - # Keep running - try: - while True: - await asyncio.sleep(60) - status = manager.get_status() - self.stdout.write(f"Status: {status['statistics']}") - except KeyboardInterrupt: - await manager.stop() - - asyncio.run(run_collectors()) -``` - -### FastAPI Integration - -```python -# FastAPI application -from fastapi import FastAPI -from data import CollectorManager -import asyncio - -app = FastAPI() -manager = None - -@app.on_event("startup") -async def startup_event(): - global manager - manager = CollectorManager("fastapi_collectors") - - # Add collectors - from collectors import OKXCollector - collector = OKXCollector(['BTC-USDT', 'ETH-USDT']) - manager.add_collector(collector) - - # Start in background - await manager.start() - -@app.on_event("shutdown") -async def shutdown_event(): - global manager - if manager: - await manager.stop() - -@app.get("/collector/status") -async def get_collector_status(): - return manager.get_status() - -@app.post("/collector/{name}/restart") -async def restart_collector(name: str): - success = await manager.restart_collector(name) - return {"success": success} -``` - -### Celery Integration - -```python -# Celery task -from celery import Celery -from data import CollectorManager -import asyncio - -app = Celery('crypto_collectors') - -@app.task -def start_data_collection(): - """Start data collection as Celery task.""" - - async def run(): - manager = CollectorManager("celery_collectors") - - # Setup collectors - from collectors import OKXCollector, BinanceCollector - manager.add_collector(OKXCollector(['BTC-USDT'])) - manager.add_collector(BinanceCollector(['ETH-USDT'])) - - # Start and monitor - await manager.start() - - # Run until stopped - try: - while True: - await asyncio.sleep(300) # 5 minute intervals - - # Check health and restart if needed - failed = manager.get_failed_collectors() - if failed: - print(f"Restarting failed collectors: {failed}") - await manager.restart_all_collectors() - - except Exception as e: - print(f"Collection error: {e}") - finally: - await manager.stop() - - # Run async task - asyncio.run(run()) -``` - -## Migration Guide - -### From Manual Connection Management - -**Before** (manual management): -```python -class OldCollector: - def __init__(self): - self.websocket = None - self.running = False - - async def start(self): - while self.running: - try: - self.websocket = await connect() - await self.listen() - except Exception as e: - print(f"Error: {e}") - await asyncio.sleep(5) # Manual retry -``` - -**After** (with BaseDataCollector): -```python -class NewCollector(BaseDataCollector): - def __init__(self): - super().__init__("exchange", ["BTC-USDT"]) - # Auto-restart and health monitoring included - - async def connect(self) -> bool: - self.websocket = await connect() - return True - - async def _handle_messages(self): - message = await self.websocket.receive() - # Error handling and restart logic automatic -``` - -### From Basic Monitoring - -**Before** (basic monitoring): -```python -# Manual status tracking -status = { - 'connected': False, - 'last_message': None, - 'error_count': 0 -} - -# Manual health checks -async def health_check(): - if time.time() - status['last_message'] > 300: - print("No data for 5 minutes!") -``` - -**After** (comprehensive monitoring): -```python -# Automatic health monitoring -collector = MyCollector(["BTC-USDT"]) - -# Rich status information -status = collector.get_status() -health = collector.get_health_status() - -# Automatic alerts and recovery -if not health['is_healthy']: - print(f"Issues: {health['issues']}") - # Auto-restart already triggered -``` - ---- - -## Support and Contributing - -### Getting Help - -1. **Check Logs**: Review logs in `./logs/` directory -2. **Status Information**: Use `get_status()` and `get_health_status()` methods -3. **Debug Mode**: Set `LOG_LEVEL=DEBUG` for detailed logging -4. **Test with Demo**: Run `examples/collector_demo.py` to verify setup - -### Contributing - -The data collector system is designed to be extensible. Contributions are welcome for: - -- New exchange implementations -- Enhanced monitoring features -- Performance optimizations -- Additional data types -- Integration examples - -### License - -This documentation and the associated code are part of the Crypto Trading Bot Platform project. - ---- - -*For more information, see the main project documentation in `/docs/`.* - ## Exchange Factory System ### Overview @@ -1166,8 +943,11 @@ print(f"OKX data types: {okx_info['supported_data_types']}") ```python from data.exchanges import ExchangeCollectorConfig, ExchangeFactory from data.base_collector import DataType +from utils.logger import get_logger + +# Create configuration with conditional logging +logger = get_logger('factory_collector', verbose=True) -# Create configuration config = ExchangeCollectorConfig( exchange='okx', # Exchange name symbol='BTC-USDT', # Trading pair @@ -1184,7 +964,7 @@ config = ExchangeCollectorConfig( # Validate configuration is_valid = ExchangeFactory.validate_config(config) if is_valid: - collector = ExchangeFactory.create_collector(config) + collector = ExchangeFactory.create_collector(config, logger=logger) ``` ### Exchange Capabilities @@ -1209,12 +989,16 @@ Each exchange provides convenience functions for easy collector creation: ```python from data.exchanges import create_okx_collector +from utils.logger import get_logger + +# Quick OKX collector creation with logging +logger = get_logger('okx_btc_usdt', verbose=True) -# Quick OKX collector creation collector = create_okx_collector( symbol='BTC-USDT', data_types=[DataType.TRADE, DataType.ORDERBOOK], - auto_restart=True + auto_restart=True, + logger=logger ) ``` @@ -1229,39 +1013,52 @@ The OKX collector provides: - **Ping/Pong Management**: OKX-specific keepalive mechanism with proper format - **Raw Data Storage**: Optional storage of raw OKX messages for debugging - **Connection Resilience**: Robust reconnection logic for OKX WebSocket +- **Conditional Logging**: Full integration with the logging system ### OKX Usage Examples ```python -# Direct OKX collector usage -from data.exchanges.okx import OKXCollector -from data.base_collector import DataType +from utils.logger import get_logger + +# Direct OKX collector usage with conditional logging +logger = get_logger('okx_collector', verbose=True) collector = OKXCollector( symbol='BTC-USDT', data_types=[DataType.TRADE, DataType.ORDERBOOK], auto_restart=True, health_check_interval=30.0, - store_raw_data=True + store_raw_data=True, + logger=logger, # Enable logging + log_errors_only=False # Log all levels ) -# Factory pattern usage -from data.exchanges import create_okx_collector +# Factory pattern usage with error-only logging +error_logger = get_logger('okx_critical', verbose=False) collector = create_okx_collector( symbol='BTC-USDT', - data_types=[DataType.TRADE, DataType.ORDERBOOK] + data_types=[DataType.TRADE, DataType.ORDERBOOK], + logger=error_logger, + log_errors_only=True # Only log errors ) -# Multiple collectors -from data.exchanges import ExchangeFactory, ExchangeCollectorConfig - +# Multiple collectors with different logging strategies configs = [ ExchangeCollectorConfig('okx', 'BTC-USDT', [DataType.TRADE]), ExchangeCollectorConfig('okx', 'ETH-USDT', [DataType.ORDERBOOK]) ] -collectors = ExchangeFactory.create_multiple_collectors(configs) +collectors = [] +for config in configs: + # Different logging for each collector + if config.symbol == 'BTC-USDT': + logger = get_logger('okx_btc', verbose=True) # Full logging + else: + logger = get_logger('okx_eth', verbose=False, log_errors_only=True) # Errors only + + collector = ExchangeFactory.create_collector(config, logger=logger) + collectors.append(collector) ``` ### OKX Data Processing @@ -1314,4 +1111,212 @@ The OKX collector processes three main data types: } ``` -For comprehensive OKX documentation, see [OKX Collector Documentation](okx_collector.md). \ No newline at end of file +For comprehensive OKX documentation, see [OKX Collector Documentation](okx_collector.md). + +## Integration Examples + +### Django Integration + +```python +# Django management command with conditional logging +from django.core.management.base import BaseCommand +from data import CollectorManager +from utils.logger import get_logger +import asyncio + +class Command(BaseCommand): + help = 'Start crypto data collectors' + + def handle(self, *args, **options): + async def run_collectors(): + # Create manager with logging + manager_logger = get_logger('django_collectors', verbose=True) + manager = CollectorManager("django_collectors", logger=manager_logger) + + # Add collectors with individual loggers + from myapp.collectors import OKXCollector, BinanceCollector + + okx_logger = get_logger('django_okx', verbose=True) + binance_logger = get_logger('django_binance', verbose=True, log_errors_only=True) + + manager.add_collector(OKXCollector(['BTC-USDT'], logger=okx_logger)) + manager.add_collector(BinanceCollector(['ETH-USDT'], logger=binance_logger)) + + # Start system + await manager.start() + + # Keep running + try: + while True: + await asyncio.sleep(60) + status = manager.get_status() + self.stdout.write(f"Status: {status['statistics']}") + except KeyboardInterrupt: + await manager.stop() + + asyncio.run(run_collectors()) +``` + +### FastAPI Integration + +```python +# FastAPI application with conditional logging +from fastapi import FastAPI +from data import CollectorManager +from utils.logger import get_logger +import asyncio + +app = FastAPI() +manager = None + +@app.on_event("startup") +async def startup_event(): + global manager + + # Create manager with logging + manager_logger = get_logger('fastapi_collectors', verbose=True) + manager = CollectorManager("fastapi_collectors", logger=manager_logger) + + # Add collectors with error-only logging for production + from collectors import OKXCollector + + collector_logger = get_logger('fastapi_okx', verbose=False, log_errors_only=True) + collector = OKXCollector(['BTC-USDT', 'ETH-USDT'], logger=collector_logger) + manager.add_collector(collector) + + # Start in background + await manager.start() + +@app.on_event("shutdown") +async def shutdown_event(): + global manager + if manager: + await manager.stop() + +@app.get("/collector/status") +async def get_collector_status(): + return manager.get_status() + +@app.post("/collector/{name}/restart") +async def restart_collector(name: str): + success = await manager.restart_collector(name) + return {"success": success} +``` + +## Migration Guide + +### From Manual Connection Management + +**Before** (manual management): +```python +class OldCollector: + def __init__(self): + self.websocket = None + self.running = False + + async def start(self): + while self.running: + try: + self.websocket = await connect() + await self.listen() + except Exception as e: + print(f"Error: {e}") + await asyncio.sleep(5) # Manual retry +``` + +**After** (with BaseDataCollector and conditional logging): +```python +from utils.logger import get_logger + +class NewCollector(BaseDataCollector): + def __init__(self): + logger = get_logger('new_collector', verbose=True) + super().__init__( + "exchange", + ["BTC-USDT"], + logger=logger, + log_errors_only=False + ) + # Auto-restart and health monitoring included + + async def connect(self) -> bool: + self._log_info("Connecting to exchange") + self.websocket = await connect() + self._log_info("Connection established") + return True + + async def _handle_messages(self): + message = await self.websocket.receive() + self._log_debug(f"Received message: {message}") + # Error handling and restart logic automatic +``` + +### From Basic Monitoring + +**Before** (basic monitoring): +```python +# Manual status tracking +status = { + 'connected': False, + 'last_message': None, + 'error_count': 0 +} + +# Manual health checks +async def health_check(): + if time.time() - status['last_message'] > 300: + print("No data for 5 minutes!") +``` + +**After** (comprehensive monitoring with conditional logging): +```python +# Automatic health monitoring with logging +logger = get_logger('monitored_collector', verbose=True) +collector = MyCollector(["BTC-USDT"], logger=logger) + +# Rich status information +status = collector.get_status() +health = collector.get_health_status() + +# Automatic alerts and recovery with logging +if not health['is_healthy']: + print(f"Issues: {health['issues']}") + # Auto-restart already triggered and logged +``` + +## Related Documentation + +- [Data Collection Service](../services/data_collection_service.md) - High-level service orchestration +- [Logging System](logging.md) - Conditional logging implementation +- [Database Operations](../database/operations.md) - Database integration patterns +- [Monitoring Guide](../monitoring/README.md) - System monitoring and alerting + +--- + +## Support and Contributing + +### Getting Help + +1. **Check Logs**: Review logs in `./logs/` directory (see [Logging System](logging.md)) +2. **Status Information**: Use `get_status()` and `get_health_status()` methods +3. **Debug Mode**: Enable debug logging with conditional logging system +4. **Test with Demo**: Run `examples/collector_demo.py` to verify setup + +### Contributing + +The data collector system is designed to be extensible. Contributions are welcome for: + +- New exchange implementations +- Enhanced monitoring features +- Performance optimizations +- Additional data types +- Integration examples +- Logging system improvements + +### License + +This documentation and the associated code are part of the Crypto Trading Bot Platform project. + +--- + +*For more information, see the main project documentation in `/docs/`.* \ No newline at end of file diff --git a/docs/components/logging.md b/docs/components/logging.md index 28f5c16..b283e44 100644 --- a/docs/components/logging.md +++ b/docs/components/logging.md @@ -27,6 +27,22 @@ The TCP Dashboard implements a sophisticated conditional logging system that all 3. **Logger Inheritance**: Parent components pass their logger to child components 4. **Hierarchical Structure**: Log files are organized by component hierarchy +### Component Hierarchy + +``` +Top-level Application (individual logger) +├── ProductionManager (individual logger) +│ ├── DataSaver (receives logger from ProductionManager) +│ ├── DataValidator (receives logger from ProductionManager) +│ ├── DatabaseConnection (receives logger from ProductionManager) +│ └── CollectorManager (individual logger) +│ ├── OKX collector BTC-USD (individual logger) +│ │ ├── DataAggregator (receives logger from OKX collector) +│ │ ├── DataTransformer (receives logger from OKX collector) +│ │ └── DataProcessor (receives logger from OKX collector) +│ └── Another collector... +``` + ### Usage Patterns #### 1. No Logging @@ -134,24 +150,48 @@ class ComponentExample: self.logger = logger self.log_errors_only = log_errors_only - # Conditional logging helpers - self._log_debug = self._create_conditional_logger('debug') - self._log_info = self._create_conditional_logger('info') - self._log_warning = self._create_conditional_logger('warning') - self._log_error = self._create_conditional_logger('error') - self._log_critical = self._create_conditional_logger('critical') + def _log_debug(self, message: str) -> None: + """Log debug message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.debug(message) - def _create_conditional_logger(self, level): - """Create conditional logging function based on configuration.""" - if not self.logger: - return lambda msg: None # No-op if no logger + def _log_info(self, message: str) -> None: + """Log info message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.info(message) + + def _log_warning(self, message: str) -> None: + """Log warning message if logger is available and not in errors-only mode.""" + if self.logger and not self.log_errors_only: + self.logger.warning(message) + + def _log_error(self, message: str, exc_info: bool = False) -> None: + """Log error message if logger is available (always logs errors).""" + if self.logger: + self.logger.error(message, exc_info=exc_info) + + def _log_critical(self, message: str, exc_info: bool = False) -> None: + """Log critical message if logger is available (always logs critical).""" + if self.logger: + self.logger.critical(message, exc_info=exc_info) +``` + +#### Child Component Pattern + +Child components receive logger from parent: + +```python +class OKXCollector(BaseDataCollector): + def __init__(self, symbol: str, logger=None, log_errors_only=False): + super().__init__(..., logger=logger, log_errors_only=log_errors_only) - log_func = getattr(self.logger, level) - - if level in ['debug', 'info', 'warning'] and self.log_errors_only: - return lambda msg: None # Suppress non-error messages - - return log_func # Normal logging + # Pass logger to child components + self._data_processor = OKXDataProcessor( + symbol, + logger=self.logger # Pass parent's logger + ) + self._data_validator = DataValidator(logger=self.logger) + self._data_transformer = DataTransformer(logger=self.logger) ``` #### Supported Components @@ -178,179 +218,6 @@ The following components support conditional logging: - Parameters: `logger=None` - Data processing with conditional logging -### Best Practices for Conditional Logging - -#### 1. Logger Inheritance -```python -# Parent component creates logger -parent_logger = get_logger('parent_system') -parent = ParentComponent(logger=parent_logger) - -# Pass logger to children for consistent hierarchy -child1 = ChildComponent(logger=parent_logger) -child2 = ChildComponent(logger=parent_logger, log_errors_only=True) -child3 = ChildComponent(logger=None) # No logging -``` - -#### 2. Environment-Based Configuration -```python -import os -from utils.logger import get_logger - -def create_system_logger(): - """Create logger based on environment.""" - env = os.getenv('ENVIRONMENT', 'development') - - if env == 'production': - return get_logger('production_system', log_level='INFO', verbose=False) - elif env == 'testing': - return None # No logging during tests - else: - return get_logger('dev_system', log_level='DEBUG', verbose=True) - -# Use in components -system_logger = create_system_logger() -manager = CollectorManager(logger=system_logger) -``` - -#### 3. Conditional Error-Only Mode -```python -def create_collector_with_logging_strategy(symbol, strategy='normal'): - """Create collector with different logging strategies.""" - base_logger = get_logger(f'collector_{symbol.lower().replace("-", "_")}') - - if strategy == 'silent': - return OKXCollector(symbol, logger=None) - elif strategy == 'errors_only': - return OKXCollector(symbol, logger=base_logger, log_errors_only=True) - else: - return OKXCollector(symbol, logger=base_logger) - -# Usage -btc_collector = create_collector_with_logging_strategy('BTC-USDT', 'normal') -eth_collector = create_collector_with_logging_strategy('ETH-USDT', 'errors_only') -ada_collector = create_collector_with_logging_strategy('ADA-USDT', 'silent') -``` - -#### 4. Performance Optimization -```python -class OptimizedComponent: - def __init__(self, logger=None, log_errors_only=False): - self.logger = logger - self.log_errors_only = log_errors_only - - # Pre-compute logging capabilities for performance - self.can_log_debug = logger and not log_errors_only - self.can_log_info = logger and not log_errors_only - self.can_log_warning = logger and not log_errors_only - self.can_log_error = logger is not None - self.can_log_critical = logger is not None - - def process_data(self, data): - if self.can_log_debug: - self.logger.debug(f"Processing {len(data)} records") - - # ... processing logic ... - - if self.can_log_info: - self.logger.info("Data processing completed") -``` - -### Migration Guide - -#### From Standard Logging -```python -# Old approach -import logging -logger = logging.getLogger(__name__) - -class OldComponent: - def __init__(self): - self.logger = logger - -# New conditional approach -from utils.logger import get_logger - -class NewComponent: - def __init__(self, logger=None, log_errors_only=False): - self.logger = logger - self.log_errors_only = log_errors_only - - # Add conditional logging helpers - self._setup_conditional_logging() -``` - -#### Gradual Adoption -1. **Phase 1**: Add optional logger parameters to new components -2. **Phase 2**: Update existing components to support conditional logging -3. **Phase 3**: Implement hierarchical logging structure -4. **Phase 4**: Add error-only logging mode - -### Testing Conditional Logging - -#### Test Script Example -```python -# test_conditional_logging.py -from utils.logger import get_logger -from data.collector_manager import CollectorManager -from data.exchanges.okx.collector import OKXCollector - -def test_no_logging(): - """Test components work without loggers.""" - manager = CollectorManager(logger=None) - collector = OKXCollector("BTC-USDT", logger=None) - print("✓ No logging test passed") - -def test_with_logging(): - """Test components work with loggers.""" - logger = get_logger('test_system') - manager = CollectorManager(logger=logger) - collector = OKXCollector("BTC-USDT", logger=logger) - print("✓ With logging test passed") - -def test_error_only(): - """Test error-only logging mode.""" - logger = get_logger('test_errors') - collector = OKXCollector("BTC-USDT", logger=logger, log_errors_only=True) - print("✓ Error-only logging test passed") - -if __name__ == "__main__": - test_no_logging() - test_with_logging() - test_error_only() - print("✅ All conditional logging tests passed!") -``` - -## Log Format - -All log messages follow this unified format: -``` -[YYYY-MM-DD HH:MM:SS - LEVEL - message] -``` - -Example: -``` -[2024-01-15 14:30:25 - INFO - Bot started successfully] -[2024-01-15 14:30:26 - ERROR - Connection failed: timeout] -``` - -## File Organization - -Logs are organized in a hierarchical structure: -``` -logs/ -├── app/ -│ ├── 2024-01-15.txt -│ └── 2024-01-16.txt -├── bot_manager/ -│ ├── 2024-01-15.txt -│ └── 2024-01-16.txt -├── data_collector/ -│ └── 2024-01-15.txt -└── strategies/ - └── 2024-01-15.txt -``` - ## Basic Usage ### Import and Initialize @@ -414,6 +281,38 @@ class BotManager: self.logger.info(f"Bot {bot_id} stopped") ``` +## Log Format + +All log messages follow this unified format: +``` +[YYYY-MM-DD HH:MM:SS - LEVEL - message] +``` + +Example: +``` +[2024-01-15 14:30:25 - INFO - Bot started successfully] +[2024-01-15 14:30:26 - ERROR - Connection failed: timeout] +``` + +## File Organization + +Logs are organized in a hierarchical structure: +``` +logs/ +├── tcp_dashboard/ +│ ├── 2024-01-15.txt +│ └── 2024-01-16.txt +├── production_manager/ +│ ├── 2024-01-15.txt +│ └── 2024-01-16.txt +├── collector_manager/ +│ └── 2024-01-15.txt +├── okx_collector_btc_usdt/ +│ └── 2024-01-15.txt +└── okx_collector_eth_usdt/ + └── 2024-01-15.txt +``` + ## Configuration ### Logger Parameters @@ -487,6 +386,84 @@ logger = get_logger('bot_manager', max_log_files=14) - Deletes older files automatically - Based on file modification time, not filename +## Best Practices for Conditional Logging + +### 1. Logger Inheritance +```python +# Parent component creates logger +parent_logger = get_logger('parent_system') +parent = ParentComponent(logger=parent_logger) + +# Pass logger to children for consistent hierarchy +child1 = ChildComponent(logger=parent_logger) +child2 = ChildComponent(logger=parent_logger, log_errors_only=True) +child3 = ChildComponent(logger=None) # No logging +``` + +### 2. Environment-Based Configuration +```python +import os +from utils.logger import get_logger + +def create_system_logger(): + """Create logger based on environment.""" + env = os.getenv('ENVIRONMENT', 'development') + + if env == 'production': + return get_logger('production_system', log_level='INFO', verbose=False) + elif env == 'testing': + return None # No logging during tests + else: + return get_logger('dev_system', log_level='DEBUG', verbose=True) + +# Use in components +system_logger = create_system_logger() +manager = CollectorManager(logger=system_logger) +``` + +### 3. Conditional Error-Only Mode +```python +def create_collector_with_logging_strategy(symbol, strategy='normal'): + """Create collector with different logging strategies.""" + base_logger = get_logger(f'collector_{symbol.lower().replace("-", "_")}') + + if strategy == 'silent': + return OKXCollector(symbol, logger=None) + elif strategy == 'errors_only': + return OKXCollector(symbol, logger=base_logger, log_errors_only=True) + else: + return OKXCollector(symbol, logger=base_logger) + +# Usage +btc_collector = create_collector_with_logging_strategy('BTC-USDT', 'normal') +eth_collector = create_collector_with_logging_strategy('ETH-USDT', 'errors_only') +ada_collector = create_collector_with_logging_strategy('ADA-USDT', 'silent') +``` + +### 4. Performance Optimization +```python +class OptimizedComponent: + def __init__(self, logger=None, log_errors_only=False): + self.logger = logger + self.log_errors_only = log_errors_only + + # Pre-compute logging capabilities for performance + self.can_log_debug = logger and not log_errors_only + self.can_log_info = logger and not log_errors_only + self.can_log_warning = logger and not log_errors_only + self.can_log_error = logger is not None + self.can_log_critical = logger is not None + + def process_data(self, data): + if self.can_log_debug: + self.logger.debug(f"Processing {len(data)} records") + + # ... processing logic ... + + if self.can_log_info: + self.logger.info("Data processing completed") +``` + ## Advanced Features ### Manual Log Cleanup @@ -671,16 +648,37 @@ if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Data: {expensive_serialization(data)}") ``` -## Integration with Existing Code +## Migration Guide -The logging system is designed to be gradually adopted: +### Updating Existing Components -1. **Start with new modules**: Use the unified logger in new code -2. **Replace existing logging**: Gradually migrate existing logging to the unified system -3. **No breaking changes**: Existing code continues to work +1. **Add logger parameter to constructor**: +```python +def __init__(self, ..., logger=None, log_errors_only=False): +``` -### Migration Example +2. **Add conditional logging helpers**: +```python +def _log_debug(self, message: str) -> None: + if self.logger and not self.log_errors_only: + self.logger.debug(message) +``` +3. **Update all logging calls**: +```python +# Before +self.logger.info("Message") + +# After +self._log_info("Message") +``` + +4. **Pass logger to child components**: +```python +child = ChildComponent(logger=self.logger) +``` + +### From Standard Logging ```python # Old logging (if any existed) import logging @@ -692,13 +690,113 @@ from utils.logger import get_logger logger = get_logger('component_name', verbose=True) ``` +### Gradual Adoption +1. **Phase 1**: Add optional logger parameters to new components +2. **Phase 2**: Update existing components to support conditional logging +3. **Phase 3**: Implement hierarchical logging structure +4. **Phase 4**: Add error-only logging mode + ## Testing +### Testing Conditional Logging + +#### Test Script Example +```python +# test_conditional_logging.py +from utils.logger import get_logger +from data.collector_manager import CollectorManager +from data.exchanges.okx.collector import OKXCollector + +def test_no_logging(): + """Test components work without loggers.""" + manager = CollectorManager(logger=None) + collector = OKXCollector("BTC-USDT", logger=None) + print("✓ No logging test passed") + +def test_with_logging(): + """Test components work with loggers.""" + logger = get_logger('test_system') + manager = CollectorManager(logger=logger) + collector = OKXCollector("BTC-USDT", logger=logger) + print("✓ With logging test passed") + +def test_error_only(): + """Test error-only logging mode.""" + logger = get_logger('test_errors') + collector = OKXCollector("BTC-USDT", logger=logger, log_errors_only=True) + print("✓ Error-only logging test passed") + +if __name__ == "__main__": + test_no_logging() + test_with_logging() + test_error_only() + print("✅ All conditional logging tests passed!") +``` + +### Testing Changes + +```python +# Test without logger +component = MyComponent(logger=None) +# Should work without errors, no logging + +# Test with logger +logger = get_logger('test_component') +component = MyComponent(logger=logger) +# Should log normally + +# Test error-only mode +component = MyComponent(logger=logger, log_errors_only=True) +# Should only log errors +``` + +### Basic System Test + Run a simple test to verify the logging system: ```bash python -c "from utils.logger import get_logger; logger = get_logger('test', verbose=True); logger.info('Test message'); print('Check logs/test/ directory')" ``` +## Troubleshooting + +### Common Issues + +1. **Permission errors**: Ensure the application has write permissions to the project directory +2. **Disk space**: Monitor disk usage and adjust log retention with `max_log_files` +3. **Threading issues**: The logger is thread-safe, but check for application-level concurrency issues +4. **Too many console messages**: Adjust `verbose` parameter or log levels + +### Debug Mode + +Enable debug logging to troubleshoot issues: +```python +logger = get_logger('component_name', 'DEBUG', verbose=True) +``` + +### Console Output Issues + +```python +# Force console output regardless of environment +logger = get_logger('component_name', verbose=True) + +# Check environment variables +import os +print(f"VERBOSE_LOGGING: {os.getenv('VERBOSE_LOGGING')}") +print(f"LOG_TO_CONSOLE: {os.getenv('LOG_TO_CONSOLE')}") +``` + +### Fallback Logging + +If file logging fails, the system automatically falls back to console logging with a warning message. + +## Integration with Existing Code + +The logging system is designed to be gradually adopted: + +1. **Start with new modules**: Use the unified logger in new code +2. **Replace existing logging**: Gradually migrate existing logging to the unified system +3. **No breaking changes**: Existing code continues to work + ## Maintenance ### Automatic Cleanup Benefits @@ -735,49 +833,4 @@ find logs/ -name "*.txt" -size +10M find logs/ -name "*.txt" | cut -d'/' -f2 | sort | uniq -c ``` -## Troubleshooting - -### Common Issues - -1. **Permission errors**: Ensure the application has write permissions to the project directory -2. **Disk space**: Monitor disk usage and adjust log retention with `max_log_files` -3. **Threading issues**: The logger is thread-safe, but check for application-level concurrency issues -4. **Too many console messages**: Adjust `verbose` parameter or log levels - -### Debug Mode - -Enable debug logging to troubleshoot issues: -```python -logger = get_logger('component_name', 'DEBUG', verbose=True) -``` - -### Console Output Issues - -```python -# Force console output regardless of environment -logger = get_logger('component_name', verbose=True) - -# Check environment variables -import os -print(f"VERBOSE_LOGGING: {os.getenv('VERBOSE_LOGGING')}") -print(f"LOG_TO_CONSOLE: {os.getenv('LOG_TO_CONSOLE')}") -``` - -### Fallback Logging - -If file logging fails, the system automatically falls back to console logging with a warning message. - -## New Features Summary - -### Verbose Parameter -- Controls console logging output -- Respects log levels (DEBUG shows all, ERROR shows only errors) -- Uses environment variables as default (`VERBOSE_LOGGING` or `LOG_TO_CONSOLE`) -- Can be explicitly set to `True`/`False` to override environment - -### Automatic Cleanup -- Enabled by default (`clean_old_logs=True`) -- Triggered when new log files are created (date changes) -- Keeps most recent `max_log_files` files (default: 30) -- Component-specific retention policies -- Non-blocking operation with error handling \ No newline at end of file +This conditional logging system provides maximum flexibility while maintaining clean, maintainable code that works in all scenarios. \ No newline at end of file diff --git a/docs/data-collection-service.md b/docs/data-collection-service.md deleted file mode 100644 index bb72b79..0000000 --- a/docs/data-collection-service.md +++ /dev/null @@ -1,481 +0,0 @@ -# Data Collection Service - -The Data Collection Service is a production-ready service for cryptocurrency market data collection with clean logging and robust error handling. It manages multiple data collectors for different trading pairs and exchanges. - -## Features - -- **Clean Logging**: Only essential information (connections, disconnections, errors) -- **Multi-Exchange Support**: Extensible architecture for multiple exchanges -- **Health Monitoring**: Built-in health checks and auto-recovery -- **Configurable**: JSON-based configuration with sensible defaults -- **Graceful Shutdown**: Proper signal handling and cleanup -- **Testing**: Comprehensive unit test coverage - -## Quick Start - -### Basic Usage - -```bash -# Start with default configuration (indefinite run) -python scripts/start_data_collection.py - -# Run for 8 hours -python scripts/start_data_collection.py --hours 8 - -# Use custom configuration -python scripts/start_data_collection.py --config config/my_config.json -``` - -### Monitoring - -```bash -# Check status once -python scripts/monitor_clean.py - -# Monitor continuously every 60 seconds -python scripts/monitor_clean.py --interval 60 -``` - -## Configuration - -The service uses JSON configuration files with automatic default creation if none exists. - -### Default Configuration Location - -`config/data_collection.json` - -### Configuration Structure - -```json -{ - "exchanges": { - "okx": { - "enabled": true, - "trading_pairs": [ - { - "symbol": "BTC-USDT", - "enabled": true, - "data_types": ["trade"], - "timeframes": ["1m", "5m", "15m", "1h"] - }, - { - "symbol": "ETH-USDT", - "enabled": true, - "data_types": ["trade"], - "timeframes": ["1m", "5m", "15m", "1h"] - } - ] - } - }, - "collection_settings": { - "health_check_interval": 120, - "store_raw_data": true, - "auto_restart": true, - "max_restart_attempts": 3 - }, - "logging": { - "level": "INFO", - "log_errors_only": true, - "verbose_data_logging": false - } -} -``` - -### Configuration Options - -#### Exchange Settings - -- **enabled**: Whether to enable this exchange -- **trading_pairs**: Array of trading pair configurations - -#### Trading Pair Settings - -- **symbol**: Trading pair symbol (e.g., "BTC-USDT") -- **enabled**: Whether to collect data for this pair -- **data_types**: Types of data to collect (["trade"], ["ticker"], etc.) -- **timeframes**: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"]) - -#### Collection Settings - -- **health_check_interval**: Health check frequency in seconds -- **store_raw_data**: Whether to store raw trade data -- **auto_restart**: Enable automatic restart on failures -- **max_restart_attempts**: Maximum restart attempts before giving up - -#### Logging Settings - -- **level**: Log level ("DEBUG", "INFO", "WARNING", "ERROR") -- **log_errors_only**: Only log errors and essential events -- **verbose_data_logging**: Enable verbose logging of individual trades/candles - -## Service Architecture - -### Core Components - -1. **DataCollectionService**: Main service class managing the lifecycle -2. **CollectorManager**: Manages multiple data collectors with health monitoring -3. **ExchangeFactory**: Creates exchange-specific collectors -4. **BaseDataCollector**: Abstract base for all data collectors - -### Data Flow - -``` -Exchange API → Data Collector → Data Processor → Database - ↓ - Health Monitor → Service Manager -``` - -### Storage - -- **Raw Data**: PostgreSQL `raw_trades` table -- **Candles**: PostgreSQL `market_data` table with multiple timeframes -- **Real-time**: Redis pub/sub for live data distribution - -## Logging Philosophy - -The service implements **clean production logging** focused on operational needs: - -### What Gets Logged - -✅ **Service Lifecycle** -- Service start/stop -- Collector initialization -- Database connections - -✅ **Connection Events** -- WebSocket connect/disconnect -- Reconnection attempts -- API errors - -✅ **Health & Errors** -- Health check results -- Error conditions -- Recovery actions - -✅ **Statistics** -- Periodic uptime reports -- Collection summary - -### What Doesn't Get Logged - -❌ **Individual Data Points** -- Every trade received -- Every candle generated -- Raw market data - -❌ **Verbose Operations** -- Database queries -- Internal processing steps -- Routine heartbeats - -## API Reference - -### DataCollectionService - -The main service class for managing data collection. - -#### Constructor - -```python -DataCollectionService(config_path: str = "config/data_collection.json") -``` - -#### Methods - -##### `async run(duration_hours: Optional[float] = None) -> bool` - -Run the service for a specified duration or indefinitely. - -**Parameters:** -- `duration_hours`: Optional duration in hours (None = indefinite) - -**Returns:** -- `bool`: True if successful, False if error occurred - -##### `async start() -> bool` - -Start the data collection service. - -**Returns:** -- `bool`: True if started successfully - -##### `async stop() -> None` - -Stop the service gracefully. - -##### `get_status() -> Dict[str, Any]` - -Get current service status including uptime, collector counts, and errors. - -**Returns:** -- `dict`: Status information - -### Standalone Function - -#### `run_data_collection_service(config_path, duration_hours)` - -```python -async def run_data_collection_service( - config_path: str = "config/data_collection.json", - duration_hours: Optional[float] = None -) -> bool -``` - -Convenience function to run the service. - -## Integration Examples - -### Basic Integration - -```python -import asyncio -from data.collection_service import DataCollectionService - -async def main(): - service = DataCollectionService("config/my_config.json") - await service.run(duration_hours=24) # Run for 24 hours - -if __name__ == "__main__": - asyncio.run(main()) -``` - -### Custom Status Monitoring - -```python -import asyncio -from data.collection_service import DataCollectionService - -async def monitor_service(): - service = DataCollectionService() - - # Start service in background - start_task = asyncio.create_task(service.run()) - - # Monitor status every 5 minutes - while service.running: - status = service.get_status() - print(f"Uptime: {status['uptime_hours']:.1f}h, " - f"Collectors: {status['collectors_running']}, " - f"Errors: {status['errors_count']}") - - await asyncio.sleep(300) # 5 minutes - - await start_task - -asyncio.run(monitor_service()) -``` - -### Programmatic Control - -```python -import asyncio -from data.collection_service import DataCollectionService - -async def controlled_collection(): - service = DataCollectionService() - - # Initialize and start - await service.initialize_collectors() - await service.start() - - try: - # Run for 1 hour - await asyncio.sleep(3600) - finally: - # Graceful shutdown - await service.stop() - -asyncio.run(controlled_collection()) -``` - -## Error Handling - -The service implements robust error handling at multiple levels: - -### Service Level - -- **Configuration Errors**: Invalid JSON, missing files -- **Initialization Errors**: Database connection, collector creation -- **Runtime Errors**: Unexpected exceptions during operation - -### Collector Level - -- **Connection Errors**: WebSocket disconnections, API failures -- **Data Errors**: Invalid data formats, processing failures -- **Health Errors**: Failed health checks, timeout conditions - -### Recovery Strategies - -1. **Automatic Restart**: Collectors auto-restart on failures -2. **Exponential Backoff**: Increasing delays between retry attempts -3. **Circuit Breaker**: Stop retrying after max attempts exceeded -4. **Graceful Degradation**: Continue with healthy collectors - -## Testing - -### Running Tests - -```bash -# Run all data collection service tests -uv run pytest tests/test_data_collection_service.py -v - -# Run specific test -uv run pytest tests/test_data_collection_service.py::TestDataCollectionService::test_service_initialization -v - -# Run with coverage -uv run pytest tests/test_data_collection_service.py --cov=data.collection_service -``` - -### Test Coverage - -The test suite covers: -- Service initialization and configuration -- Collector creation and management -- Service lifecycle (start/stop) -- Error handling and recovery -- Configuration validation -- Signal handling -- Status reporting - -## Troubleshooting - -### Common Issues - -#### Configuration Not Found - -``` -❌ Failed to load config from config/data_collection.json: [Errno 2] No such file or directory -``` - -**Solution**: The service will create a default configuration. Check the created file and adjust as needed. - -#### Database Connection Failed - -``` -❌ Database connection failed: connection refused -``` - -**Solution**: Ensure PostgreSQL and Redis are running via Docker: - -```bash -docker-compose up -d postgres redis -``` - -#### No Collectors Created - -``` -❌ No collectors were successfully initialized -``` - -**Solution**: Check configuration - ensure at least one exchange is enabled with valid trading pairs. - -#### WebSocket Connection Issues - -``` -❌ Failed to start data collectors -``` - -**Solution**: Check network connectivity and API credentials. Verify exchange is accessible. - -### Debug Mode - -For verbose debugging, modify the logging configuration: - -```json -{ - "logging": { - "level": "DEBUG", - "log_errors_only": false, - "verbose_data_logging": true - } -} -``` - -⚠️ **Warning**: Debug mode generates extensive logs and should not be used in production. - -## Production Deployment - -### Docker - -The service can be containerized for production deployment: - -```dockerfile -FROM python:3.11-slim - -WORKDIR /app -COPY . . - -RUN pip install uv -RUN uv pip install -r requirements.txt - -CMD ["python", "scripts/start_data_collection.py", "--config", "config/production.json"] -``` - -### Systemd Service - -Create a systemd service for Linux deployment: - -```ini -[Unit] -Description=Cryptocurrency Data Collection Service -After=network.target postgres.service redis.service - -[Service] -Type=simple -User=crypto-collector -WorkingDirectory=/opt/crypto-dashboard -ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json -Restart=always -RestartSec=10 - -[Install] -WantedBy=multi-user.target -``` - -### Environment Variables - -Configure sensitive data via environment variables: - -```bash -export POSTGRES_HOST=localhost -export POSTGRES_PORT=5432 -export POSTGRES_DB=crypto_dashboard -export POSTGRES_USER=dashboard_user -export POSTGRES_PASSWORD=secure_password -export REDIS_HOST=localhost -export REDIS_PORT=6379 -``` - -## Performance Considerations - -### Resource Usage - -- **Memory**: ~100MB base + ~10MB per trading pair -- **CPU**: Low (async I/O bound) -- **Network**: ~1KB/s per trading pair -- **Storage**: ~1GB/day per trading pair (with raw data) - -### Scaling - -- **Vertical**: Increase timeframes and trading pairs -- **Horizontal**: Run multiple services with different configurations -- **Database**: Use TimescaleDB for time-series optimization - -### Optimization Tips - -1. **Disable Raw Data**: Set `store_raw_data: false` to reduce storage -2. **Limit Timeframes**: Only collect needed timeframes -3. **Batch Processing**: Use longer health check intervals -4. **Connection Pooling**: Database connections are automatically pooled - -## Changelog - -### v1.0.0 (Current) - -- Initial implementation -- OKX exchange support -- Clean logging system -- Comprehensive test coverage -- JSON configuration -- Health monitoring -- Graceful shutdown \ No newline at end of file diff --git a/docs/logging_system.md b/docs/logging_system.md deleted file mode 100644 index 7dc0b94..0000000 --- a/docs/logging_system.md +++ /dev/null @@ -1,292 +0,0 @@ -# Conditional Logging System - -## Overview - -The TCP Dashboard project implements a sophisticated conditional logging system that provides fine-grained control over logging behavior across all components. This system supports hierarchical logging, conditional logging, and error-only logging modes. - -## Key Features - -### 1. Conditional Logging -- **No Logger**: If no logger instance is passed to a component's constructor, that component performs no logging operations -- **Logger Provided**: If a logger instance is passed, the component uses it for logging -- **Error-Only Mode**: If `log_errors_only=True` is set, only error and critical level messages are logged - -### 2. Logger Inheritance -- Components that receive a logger pass the same logger instance down to child components -- This creates a hierarchical logging structure that follows the component hierarchy - -### 3. Hierarchical File Organization -- Log files are organized based on component hierarchy -- Each major component gets its own log directory -- Child components log to their parent's log file - -## Component Hierarchy - -``` -Top-level Application (individual logger) -├── ProductionManager (individual logger) -│ ├── DataSaver (receives logger from ProductionManager) -│ ├── DataValidator (receives logger from ProductionManager) -│ ├── DatabaseConnection (receives logger from ProductionManager) -│ └── CollectorManager (individual logger) -│ ├── OKX collector BTC-USD (individual logger) -│ │ ├── DataAggregator (receives logger from OKX collector) -│ │ ├── DataTransformer (receives logger from OKX collector) -│ │ └── DataProcessor (receives logger from OKX collector) -│ └── Another collector... -``` - -## Usage Examples - -### Basic Usage - -```python -from utils.logger import get_logger -from data.exchanges.okx.collector import OKXCollector - -# Create a logger for the collector -collector_logger = get_logger('okx_collector_btc_usdt', verbose=True) - -# Create collector with logger - all child components will use this logger -collector = OKXCollector( - symbol='BTC-USDT', - logger=collector_logger -) - -# Child components (data processor, validator, transformer) will automatically -# receive and use the same logger instance -``` - -### No Logging Mode - -```python -# Create collector without logger - no logging will be performed -collector = OKXCollector( - symbol='BTC-USDT', - logger=None # or simply omit the parameter -) - -# No log files will be created, no console output -``` - -### Error-Only Logging Mode - -```python -from utils.logger import get_logger -from data.collector_manager import CollectorManager - -# Create logger for manager -manager_logger = get_logger('collector_manager', verbose=True) - -# Create manager with error-only logging -manager = CollectorManager( - manager_name="production_manager", - logger=manager_logger, - log_errors_only=True # Only errors and critical messages will be logged -) - -# Manager will only log errors, but child collectors can have their own loggers -``` - -### Hierarchical Logging Setup - -```python -from utils.logger import get_logger -from data.collector_manager import CollectorManager -from data.exchanges.okx.collector import OKXCollector - -# Create manager with its own logger -manager_logger = get_logger('collector_manager', verbose=True) -manager = CollectorManager(logger=manager_logger) - -# Create individual collectors with their own loggers -btc_logger = get_logger('okx_collector_btc_usdt', verbose=True) -eth_logger = get_logger('okx_collector_eth_usdt', verbose=True) - -btc_collector = OKXCollector('BTC-USDT', logger=btc_logger) -eth_collector = OKXCollector('ETH-USDT', logger=eth_logger) - -# Add collectors to manager -manager.add_collector(btc_collector) -manager.add_collector(eth_collector) - -# Result: -# - Manager logs to: logs/collector_manager/YYYY-MM-DD.txt -# - BTC collector logs to: logs/okx_collector_btc_usdt/YYYY-MM-DD.txt -# - ETH collector logs to: logs/okx_collector_eth_usdt/YYYY-MM-DD.txt -# - All child components of each collector log to their parent's file -``` - -## Implementation Details - -### Base Classes - -All base classes support conditional logging: - -```python -class BaseDataCollector: - def __init__(self, ..., logger=None, log_errors_only=False): - self.logger = logger - self.log_errors_only = log_errors_only - - def _log_debug(self, message: str) -> None: - if self.logger and not self.log_errors_only: - self.logger.debug(message) - - def _log_error(self, message: str, exc_info: bool = False) -> None: - if self.logger: - self.logger.error(message, exc_info=exc_info) -``` - -### Child Component Pattern - -Child components receive logger from parent: - -```python -class OKXCollector(BaseDataCollector): - def __init__(self, symbol: str, logger=None): - super().__init__(..., logger=logger) - - # Pass logger to child components - self._data_processor = OKXDataProcessor( - symbol, - logger=self.logger # Pass parent's logger - ) -``` - -### Conditional Logging Helpers - -All components use helper methods for conditional logging: - -```python -def _log_debug(self, message: str) -> None: - """Log debug message if logger is available and not in errors-only mode.""" - if self.logger and not self.log_errors_only: - self.logger.debug(message) - -def _log_info(self, message: str) -> None: - """Log info message if logger is available and not in errors-only mode.""" - if self.logger and not self.log_errors_only: - self.logger.info(message) - -def _log_warning(self, message: str) -> None: - """Log warning message if logger is available and not in errors-only mode.""" - if self.logger and not self.log_errors_only: - self.logger.warning(message) - -def _log_error(self, message: str, exc_info: bool = False) -> None: - """Log error message if logger is available (always logs errors).""" - if self.logger: - self.logger.error(message, exc_info=exc_info) - -def _log_critical(self, message: str, exc_info: bool = False) -> None: - """Log critical message if logger is available (always logs critical).""" - if self.logger: - self.logger.critical(message, exc_info=exc_info) -``` - -## Log File Structure - -``` -logs/ -├── collector_manager/ -│ └── 2024-01-15.txt -├── okx_collector_btc_usdt/ -│ └── 2024-01-15.txt -├── okx_collector_eth_usdt/ -│ └── 2024-01-15.txt -└── production_manager/ - └── 2024-01-15.txt -``` - -## Configuration Options - -### Logger Parameters - -- `logger`: Logger instance or None -- `log_errors_only`: Boolean flag for error-only mode -- `verbose`: Console output (when creating new loggers) -- `clean_old_logs`: Automatic cleanup of old log files -- `max_log_files`: Maximum number of log files to keep - -### Environment Variables - -```bash -# Enable verbose console logging -VERBOSE_LOGGING=true - -# Enable console output -LOG_TO_CONSOLE=true -``` - -## Best Practices - -### 1. Component Design -- Always accept `logger=None` parameter in constructors -- Pass logger to all child components -- Use conditional logging helper methods -- Never assume logger is available - -### 2. Error Handling -- Always log errors regardless of `log_errors_only` setting -- Use appropriate log levels -- Include context in error messages - -### 3. Performance -- Conditional logging has minimal performance impact -- Logger checks are fast boolean operations -- No string formatting when logging is disabled - -### 4. Testing -- Test components with and without loggers -- Verify error-only mode works correctly -- Check that child components receive loggers properly - -## Migration Guide - -### Updating Existing Components - -1. **Add logger parameter to constructor**: -```python -def __init__(self, ..., logger=None, log_errors_only=False): -``` - -2. **Add conditional logging helpers**: -```python -def _log_debug(self, message: str) -> None: - if self.logger and not self.log_errors_only: - self.logger.debug(message) -``` - -3. **Update all logging calls**: -```python -# Before -self.logger.info("Message") - -# After -self._log_info("Message") -``` - -4. **Pass logger to child components**: -```python -child = ChildComponent(logger=self.logger) -``` - -### Testing Changes - -```python -# Test without logger -component = MyComponent(logger=None) -# Should work without errors, no logging - -# Test with logger -logger = get_logger('test_component') -component = MyComponent(logger=logger) -# Should log normally - -# Test error-only mode -component = MyComponent(logger=logger, log_errors_only=True) -# Should only log errors -``` - -This conditional logging system provides maximum flexibility while maintaining clean, maintainable code that works in all scenarios. \ No newline at end of file diff --git a/docs/services/data_collection_service.md b/docs/services/data_collection_service.md new file mode 100644 index 0000000..42be79d --- /dev/null +++ b/docs/services/data_collection_service.md @@ -0,0 +1,782 @@ +# Data Collection Service + +The Data Collection Service is a production-ready service for cryptocurrency market data collection with clean logging and robust error handling. It provides a service layer that manages multiple data collectors for different trading pairs and exchanges. + +## Overview + +The service provides a high-level interface for managing the data collection system, handling configuration, lifecycle management, and monitoring. It acts as a orchestration layer on top of the core data collector components. + +## Features + +- **Service Lifecycle Management**: Start, stop, and monitor data collection operations +- **JSON Configuration**: File-based configuration with automatic defaults +- **Clean Production Logging**: Only essential operational information +- **Health Monitoring**: Service-level health checks and auto-recovery +- **Graceful Shutdown**: Proper signal handling and cleanup +- **Multi-Exchange Orchestration**: Coordinate collectors across multiple exchanges +- **Production Ready**: Designed for 24/7 operation with monitoring + +## Quick Start + +### Basic Usage + +```bash +# Start with default configuration (indefinite run) +python scripts/start_data_collection.py + +# Run for 8 hours +python scripts/start_data_collection.py --hours 8 + +# Use custom configuration +python scripts/start_data_collection.py --config config/my_config.json +``` + +### Monitoring + +```bash +# Check status once +python scripts/monitor_clean.py + +# Monitor continuously every 60 seconds +python scripts/monitor_clean.py --interval 60 +``` + +## Configuration + +The service uses JSON configuration files with automatic default creation if none exists. + +### Default Configuration Location + +`config/data_collection.json` + +### Configuration Structure + +```json +{ + "exchanges": { + "okx": { + "enabled": true, + "trading_pairs": [ + { + "symbol": "BTC-USDT", + "enabled": true, + "data_types": ["trade"], + "timeframes": ["1m", "5m", "15m", "1h"] + }, + { + "symbol": "ETH-USDT", + "enabled": true, + "data_types": ["trade"], + "timeframes": ["1m", "5m", "15m", "1h"] + } + ] + } + }, + "collection_settings": { + "health_check_interval": 120, + "store_raw_data": true, + "auto_restart": true, + "max_restart_attempts": 3 + }, + "logging": { + "level": "INFO", + "log_errors_only": true, + "verbose_data_logging": false + } +} +``` + +### Configuration Options + +#### Exchange Settings + +- **enabled**: Whether to enable this exchange +- **trading_pairs**: Array of trading pair configurations + +#### Trading Pair Settings + +- **symbol**: Trading pair symbol (e.g., "BTC-USDT") +- **enabled**: Whether to collect data for this pair +- **data_types**: Types of data to collect (["trade"], ["ticker"], etc.) +- **timeframes**: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"]) + +#### Collection Settings + +- **health_check_interval**: Health check frequency in seconds +- **store_raw_data**: Whether to store raw trade data +- **auto_restart**: Enable automatic restart on failures +- **max_restart_attempts**: Maximum restart attempts before giving up + +#### Logging Settings + +- **level**: Log level ("DEBUG", "INFO", "WARNING", "ERROR") +- **log_errors_only**: Only log errors and essential events +- **verbose_data_logging**: Enable verbose logging of individual trades/candles + +## Service Architecture + +### Service Layer Components + +``` +┌─────────────────────────────────────────────────┐ +│ DataCollectionService │ +│ ┌─────────────────────────────────────────┐ │ +│ │ Configuration Manager │ │ +│ │ • JSON config loading/validation │ │ +│ │ • Default config generation │ │ +│ │ • Runtime config updates │ │ +│ └─────────────────────────────────────────┘ │ +│ ┌─────────────────────────────────────────┐ │ +│ │ Service Monitor │ │ +│ │ • Service-level health checks │ │ +│ │ • Uptime tracking │ │ +│ │ • Error aggregation │ │ +│ └─────────────────────────────────────────┘ │ +│ │ │ +│ ┌─────────────────────────────────────────┐ │ +│ │ CollectorManager │ │ +│ │ • Individual collector management │ │ +│ │ • Health monitoring │ │ +│ │ • Auto-restart coordination │ │ +│ └─────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────┘ + │ + ┌─────────────────────────────┐ + │ Core Data Collectors │ + │ (See data_collectors.md) │ + └─────────────────────────────┘ +``` + +### Data Flow + +``` +Configuration → Service → CollectorManager → Data Collectors → Database + ↓ ↓ + Service Monitor Health Monitor +``` + +### Storage Integration + +- **Raw Data**: PostgreSQL `raw_trades` table via repository pattern +- **Candles**: PostgreSQL `market_data` table with multiple timeframes +- **Real-time**: Redis pub/sub for live data distribution +- **Service Metrics**: Service uptime, error counts, collector statistics + +## Logging Philosophy + +The service implements **clean production logging** focused on operational needs: + +### What Gets Logged + +✅ **Service Lifecycle** +- Service start/stop events +- Configuration loading +- Service initialization + +✅ **Collector Orchestration** +- Collector creation and destruction +- Service-level health summaries +- Recovery operations + +✅ **Configuration Events** +- Config file changes +- Runtime configuration updates +- Validation errors + +✅ **Service Statistics** +- Periodic uptime reports +- Collection summary statistics +- Performance metrics + +### What Doesn't Get Logged + +❌ **Individual Data Points** +- Every trade received +- Every candle generated +- Raw market data + +❌ **Internal Operations** +- Individual collector heartbeats +- Routine database operations +- Internal processing steps + +## API Reference + +### DataCollectionService + +The main service class for managing data collection operations. + +#### Constructor + +```python +DataCollectionService(config_path: str = "config/data_collection.json") +``` + +**Parameters:** +- `config_path`: Path to JSON configuration file + +#### Methods + +##### `async run(duration_hours: Optional[float] = None) -> bool` + +Run the service for a specified duration or indefinitely. + +**Parameters:** +- `duration_hours`: Optional duration in hours (None = indefinite) + +**Returns:** +- `bool`: True if successful, False if error occurred + +**Example:** +```python +service = DataCollectionService() +await service.run(duration_hours=24) # Run for 24 hours +``` + +##### `async start() -> bool` + +Start the data collection service and all configured collectors. + +**Returns:** +- `bool`: True if started successfully + +##### `async stop() -> None` + +Stop the service gracefully, including all collectors and cleanup. + +##### `get_status() -> Dict[str, Any]` + +Get current service status including uptime, collector counts, and errors. + +**Returns:** +```python +{ + 'service_running': True, + 'uptime_hours': 12.5, + 'collectors_total': 6, + 'collectors_running': 5, + 'collectors_failed': 1, + 'errors_count': 2, + 'last_error': 'Connection timeout for ETH-USDT', + 'configuration': { + 'config_file': 'config/data_collection.json', + 'exchanges_enabled': ['okx'], + 'total_trading_pairs': 6 + } +} +``` + +##### `async initialize_collectors() -> bool` + +Initialize all collectors based on configuration. + +**Returns:** +- `bool`: True if all collectors initialized successfully + +##### `load_configuration() -> Dict[str, Any]` + +Load and validate configuration from file. + +**Returns:** +- `dict`: Loaded configuration + +### Standalone Function + +#### `run_data_collection_service(config_path, duration_hours)` + +```python +async def run_data_collection_service( + config_path: str = "config/data_collection.json", + duration_hours: Optional[float] = None +) -> bool +``` + +Convenience function to run the service with minimal setup. + +**Parameters:** +- `config_path`: Path to configuration file +- `duration_hours`: Optional duration in hours + +**Returns:** +- `bool`: True if successful + +## Integration Examples + +### Basic Service Integration + +```python +import asyncio +from data.collection_service import DataCollectionService + +async def main(): + service = DataCollectionService("config/my_config.json") + + # Run for 24 hours + success = await service.run(duration_hours=24) + + if not success: + print("Service encountered errors") + +if __name__ == "__main__": + asyncio.run(main()) +``` + +### Custom Status Monitoring + +```python +import asyncio +from data.collection_service import DataCollectionService + +async def monitor_service(): + service = DataCollectionService() + + # Start service in background + start_task = asyncio.create_task(service.run()) + + # Monitor status every 5 minutes + while service.running: + status = service.get_status() + print(f"Service Uptime: {status['uptime_hours']:.1f}h") + print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}") + print(f"Errors: {status['errors_count']}") + + await asyncio.sleep(300) # 5 minutes + + await start_task + +asyncio.run(monitor_service()) +``` + +### Programmatic Control + +```python +import asyncio +from data.collection_service import DataCollectionService + +async def controlled_collection(): + service = DataCollectionService() + + try: + # Initialize and start + await service.initialize_collectors() + await service.start() + + # Monitor and control + while True: + status = service.get_status() + + # Check if any collectors failed + if status['collectors_failed'] > 0: + print("Some collectors failed, checking health...") + # Service auto-restart will handle this + + await asyncio.sleep(60) # Check every minute + + except KeyboardInterrupt: + print("Shutting down service...") + finally: + await service.stop() + +asyncio.run(controlled_collection()) +``` + +### Configuration Management + +```python +import asyncio +import json +from data.collection_service import DataCollectionService + +async def dynamic_configuration(): + service = DataCollectionService() + + # Load and modify configuration + config = service.load_configuration() + + # Add new trading pair + config['exchanges']['okx']['trading_pairs'].append({ + 'symbol': 'SOL-USDT', + 'enabled': True, + 'data_types': ['trade'], + 'timeframes': ['1m', '5m'] + }) + + # Save updated configuration + with open('config/data_collection.json', 'w') as f: + json.dump(config, f, indent=2) + + # Restart service with new config + await service.stop() + await service.start() + +asyncio.run(dynamic_configuration()) +``` + +## Error Handling + +The service implements robust error handling at the service orchestration level: + +### Service Level Errors + +- **Configuration Errors**: Invalid JSON, missing required fields +- **Initialization Errors**: Failed collector creation, database connectivity +- **Runtime Errors**: Service-level exceptions, resource exhaustion + +### Error Recovery Strategies + +1. **Graceful Degradation**: Continue with healthy collectors +2. **Configuration Validation**: Validate before applying changes +3. **Service Restart**: Full service restart on critical errors +4. **Error Aggregation**: Collect and report errors across all collectors + +### Error Reporting + +```python +# Service status includes error information +status = service.get_status() + +if status['errors_count'] > 0: + print(f"Service has {status['errors_count']} errors") + print(f"Last error: {status['last_error']}") + + # Get detailed error information from collectors + for collector_name in service.manager.list_collectors(): + collector_status = service.manager.get_collector_status(collector_name) + if collector_status['status'] == 'error': + print(f"Collector {collector_name}: {collector_status['statistics']['last_error']}") +``` + +## Testing + +### Running Service Tests + +```bash +# Run all data collection service tests +uv run pytest tests/test_data_collection_service.py -v + +# Run specific test categories +uv run pytest tests/test_data_collection_service.py::TestDataCollectionService -v + +# Run with coverage +uv run pytest tests/test_data_collection_service.py --cov=data.collection_service +``` + +### Test Coverage + +The service test suite covers: +- Service initialization and configuration loading +- Collector orchestration and management +- Service lifecycle (start/stop/restart) +- Configuration validation and error handling +- Signal handling and graceful shutdown +- Status reporting and monitoring +- Error aggregation and recovery + +### Mock Testing + +```python +import pytest +from unittest.mock import AsyncMock, patch +from data.collection_service import DataCollectionService + +@pytest.mark.asyncio +async def test_service_with_mock_collectors(): + with patch('data.collection_service.CollectorManager') as mock_manager: + # Mock successful initialization + mock_manager.return_value.start.return_value = True + + service = DataCollectionService() + result = await service.start() + + assert result is True + mock_manager.return_value.start.assert_called_once() +``` + +## Production Deployment + +### Docker Deployment + +```dockerfile +FROM python:3.11-slim + +WORKDIR /app +COPY . . + +# Install dependencies +RUN pip install uv +RUN uv pip install -r requirements.txt + +# Create logs and config directories +RUN mkdir -p logs config + +# Copy production configuration +COPY config/production.json config/data_collection.json + +# Health check +HEALTHCHECK --interval=60s --timeout=10s --start-period=30s --retries=3 \ + CMD python scripts/health_check.py || exit 1 + +# Run service +CMD ["python", "scripts/start_data_collection.py", "--config", "config/data_collection.json"] +``` + +### Kubernetes Deployment + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: data-collection-service +spec: + replicas: 1 + selector: + matchLabels: + app: data-collection-service + template: + metadata: + labels: + app: data-collection-service + spec: + containers: + - name: data-collector + image: crypto-dashboard/data-collector:latest + ports: + - containerPort: 8080 + env: + - name: POSTGRES_HOST + value: "postgres-service" + - name: REDIS_HOST + value: "redis-service" + volumeMounts: + - name: config-volume + mountPath: /app/config + - name: logs-volume + mountPath: /app/logs + livenessProbe: + exec: + command: + - python + - scripts/health_check.py + initialDelaySeconds: 30 + periodSeconds: 60 + volumes: + - name: config-volume + configMap: + name: data-collection-config + - name: logs-volume + emptyDir: {} +``` + +### Systemd Service + +```ini +[Unit] +Description=Cryptocurrency Data Collection Service +After=network.target postgres.service redis.service +Requires=postgres.service redis.service + +[Service] +Type=simple +User=crypto-collector +Group=crypto-collector +WorkingDirectory=/opt/crypto-dashboard +ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json +ExecReload=/bin/kill -HUP $MAINPID +Restart=always +RestartSec=10 +KillMode=mixed +TimeoutStopSec=30 + +# Environment +Environment=PYTHONPATH=/opt/crypto-dashboard +Environment=LOG_LEVEL=INFO + +# Security +NoNewPrivileges=true +PrivateTmp=true +ProtectSystem=strict +ReadWritePaths=/opt/crypto-dashboard/logs + +[Install] +WantedBy=multi-user.target +``` + +### Environment Configuration + +```bash +# Production environment variables +export ENVIRONMENT=production +export POSTGRES_HOST=postgres.internal +export POSTGRES_PORT=5432 +export POSTGRES_DB=crypto_dashboard +export POSTGRES_USER=dashboard_user +export POSTGRES_PASSWORD=secure_password +export REDIS_HOST=redis.internal +export REDIS_PORT=6379 + +# Service configuration +export DATA_COLLECTION_CONFIG=/etc/crypto-dashboard/data_collection.json +export LOG_LEVEL=INFO +export HEALTH_CHECK_INTERVAL=120 +``` + +## Monitoring and Alerting + +### Metrics Collection + +The service exposes metrics for monitoring systems: + +```python +# Service metrics +service_uptime_hours = 24.5 +collectors_running = 5 +collectors_total = 6 +errors_per_hour = 0.2 +data_points_processed = 15000 +``` + +### Health Checks + +```python +# External health check endpoint +async def health_check(): + service = DataCollectionService() + status = service.get_status() + + if not status['service_running']: + return {'status': 'unhealthy', 'reason': 'service_stopped'} + + if status['collectors_failed'] > status['collectors_total'] * 0.5: + return {'status': 'degraded', 'reason': 'too_many_failed_collectors'} + + return {'status': 'healthy'} +``` + +### Alerting Rules + +```yaml +# Prometheus alerting rules +groups: +- name: data_collection_service + rules: + - alert: DataCollectionServiceDown + expr: up{job="data-collection-service"} == 0 + for: 5m + annotations: + summary: "Data collection service is down" + + - alert: TooManyFailedCollectors + expr: collectors_failed / collectors_total > 0.5 + for: 10m + annotations: + summary: "More than 50% of collectors have failed" + + - alert: HighErrorRate + expr: rate(errors_total[5m]) > 0.1 + for: 15m + annotations: + summary: "High error rate in data collection service" +``` + +## Performance Considerations + +### Resource Usage + +- **Memory**: ~150MB base + ~15MB per trading pair (including service overhead) +- **CPU**: Low (async I/O bound, service orchestration) +- **Network**: ~1KB/s per trading pair +- **Storage**: Service logs ~10MB/day + +### Scaling Strategies + +1. **Horizontal Scaling**: Multiple service instances with different configurations +2. **Configuration Partitioning**: Separate services by exchange or asset class +3. **Load Balancing**: Distribute trading pairs across service instances +4. **Regional Deployment**: Deploy closer to exchange data centers + +### Optimization Tips + +1. **Configuration Tuning**: Optimize health check intervals and timeframes +2. **Resource Limits**: Set appropriate memory and CPU limits +3. **Batch Operations**: Use efficient database operations +4. **Monitoring Overhead**: Balance monitoring frequency with performance + +## Troubleshooting + +### Common Service Issues + +#### Service Won't Start + +``` +❌ Failed to start data collection service +``` + +**Solutions:** +1. Check configuration file validity +2. Verify database connectivity +3. Ensure no port conflicts +4. Check file permissions + +#### Configuration Loading Failed + +``` +❌ Failed to load config from config/data_collection.json: Invalid JSON +``` + +**Solutions:** +1. Validate JSON syntax +2. Check required fields +3. Verify file encoding (UTF-8) +4. Recreate default configuration + +#### No Collectors Created + +``` +❌ No collectors were successfully initialized +``` + +**Solutions:** +1. Check exchange configuration +2. Verify trading pair symbols +3. Check network connectivity +4. Review collector creation logs + +### Debug Mode + +Enable verbose service debugging: + +```json +{ + "logging": { + "level": "DEBUG", + "log_errors_only": false, + "verbose_data_logging": true + } +} +``` + +### Service Diagnostics + +```python +# Run diagnostic check +from data.collection_service import DataCollectionService + +service = DataCollectionService() +status = service.get_status() + +print(f"Service Running: {status['service_running']}") +print(f"Configuration File: {status['configuration']['config_file']}") +print(f"Collectors: {status['collectors_running']}/{status['collectors_total']}") + +# Check individual collector health +for collector_name in service.manager.list_collectors(): + collector_status = service.manager.get_collector_status(collector_name) + print(f"{collector_name}: {collector_status['status']}") +``` + +## Related Documentation + +- [Data Collectors System](../components/data_collectors.md) - Core collector components +- [Logging System](../components/logging.md) - Logging configuration +- [Database Operations](../database/operations.md) - Database integration +- [Monitoring Guide](../monitoring/README.md) - System monitoring setup \ No newline at end of file