diff --git a/config/data_collection.json b/config/data_collection.json new file mode 100644 index 0000000..b61bbe4 --- /dev/null +++ b/config/data_collection.json @@ -0,0 +1,69 @@ +{ + "exchange": "okx", + "connection": { + "public_ws_url": "wss://ws.okx.com:8443/ws/v5/public", + "private_ws_url": "wss://ws.okx.com:8443/ws/v5/private", + "ping_interval": 25.0, + "pong_timeout": 10.0, + "max_reconnect_attempts": 5, + "reconnect_delay": 5.0 + }, + "data_collection": { + "store_raw_data": true, + "health_check_interval": 120.0, + "auto_restart": true, + "buffer_size": 1000 + }, + "trading_pairs": [ + { + "symbol": "BTC-USDT", + "enabled": true, + "data_types": [ + "trade", + "orderbook" + ], + "timeframes": [ + "1m", + "5m", + "15m", + "1h" + ], + "channels": { + "trades": "trades", + "orderbook": "books5", + "ticker": "tickers" + } + }, + { + "symbol": "ETH-USDT", + "enabled": true, + "data_types": [ + "trade", + "orderbook" + ], + "timeframes": [ + "1m", + "5m", + "15m", + "1h" + ], + "channels": { + "trades": "trades", + "orderbook": "books5", + "ticker": "tickers" + } + } + ], + "logging": { + "component_name_template": "okx_collector_{symbol}", + "log_level": "INFO", + "verbose": false + }, + "database": { + "store_processed_data": true, + "store_raw_data": true, + "force_update_candles": false, + "batch_size": 100, + "flush_interval": 5.0 + } +} \ No newline at end of file diff --git a/data/collection_service.py b/data/collection_service.py new file mode 100644 index 0000000..62d774d --- /dev/null +++ b/data/collection_service.py @@ -0,0 +1,449 @@ +#!/usr/bin/env python3 +""" +Data Collection Service + +Production-ready service for cryptocurrency market data collection +with clean logging and robust error handling. + +This service manages multiple data collectors for different trading pairs +and exchanges, with proper health monitoring and graceful shutdown. +""" + +import asyncio +import signal +import sys +import time +import json +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Dict, Any +import logging + +# Add project root to path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +# Set environment for clean production logging +import os +os.environ['DEBUG'] = 'false' + +# Suppress verbose SQLAlchemy logging for production +logging.getLogger('sqlalchemy').setLevel(logging.WARNING) +logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) +logging.getLogger('sqlalchemy.pool').setLevel(logging.WARNING) +logging.getLogger('sqlalchemy.dialects').setLevel(logging.WARNING) +logging.getLogger('sqlalchemy.orm').setLevel(logging.WARNING) + +from data.exchanges.factory import ExchangeFactory +from data.collector_manager import CollectorManager +from data.base_collector import DataType +from database.connection import init_database +from utils.logger import get_logger + + +class DataCollectionService: + """ + Production data collection service. + + Manages multiple data collectors with clean logging focused on: + - Service lifecycle (start/stop/restart) + - Connection status (connect/disconnect/reconnect) + - Health status and errors + - Basic collection statistics + + Excludes verbose logging of individual trades/candles for production clarity. + """ + + def __init__(self, config_path: str = "config/data_collection.json"): + """Initialize the data collection service.""" + self.config_path = config_path + + # Initialize clean logging first - only essential information + self.logger = get_logger( + "data_collection_service", + log_level="INFO", + verbose=False # Clean console output + ) + + # Load configuration after logger is initialized + self.config = self._load_config() + + # Core components + self.collector_manager = CollectorManager( + logger=self.logger, + log_errors_only=True # Only log errors and essential events + ) + self.collectors: List = [] + + # Service state + self.running = False + self.start_time = None + self.shutdown_event = asyncio.Event() + + # Statistics for monitoring + self.stats = { + 'collectors_created': 0, + 'collectors_running': 0, + 'total_uptime_seconds': 0, + 'last_activity': None, + 'errors_count': 0 + } + + self.logger.info("šŸš€ Data Collection Service initialized") + self.logger.info(f"šŸ“ Configuration: {config_path}") + + def _load_config(self) -> Dict[str, Any]: + """Load service configuration from JSON file.""" + try: + config_file = Path(self.config_path) + if not config_file.exists(): + # Create default config if it doesn't exist + self._create_default_config(config_file) + + with open(config_file, 'r') as f: + config = json.load(f) + + self.logger.info(f"āœ… Configuration loaded from {self.config_path}") + return config + + except Exception as e: + self.logger.error(f"āŒ Failed to load configuration: {e}") + raise + + def _create_default_config(self, config_file: Path) -> None: + """Create a default configuration file.""" + default_config = { + "exchange": "okx", + "connection": { + "public_ws_url": "wss://ws.okx.com:8443/ws/v5/public", + "private_ws_url": "wss://ws.okx.com:8443/ws/v5/private", + "ping_interval": 25.0, + "pong_timeout": 10.0, + "max_reconnect_attempts": 5, + "reconnect_delay": 5.0 + }, + "data_collection": { + "store_raw_data": True, + "health_check_interval": 120.0, + "auto_restart": True, + "buffer_size": 1000 + }, + "trading_pairs": [ + { + "symbol": "BTC-USDT", + "enabled": True, + "data_types": ["trade", "orderbook"], + "timeframes": ["1m", "5m", "15m", "1h"], + "channels": { + "trades": "trades", + "orderbook": "books5", + "ticker": "tickers" + } + }, + { + "symbol": "ETH-USDT", + "enabled": True, + "data_types": ["trade", "orderbook"], + "timeframes": ["1m", "5m", "15m", "1h"], + "channels": { + "trades": "trades", + "orderbook": "books5", + "ticker": "tickers" + } + } + ], + "logging": { + "component_name_template": "okx_collector_{symbol}", + "log_level": "INFO", + "verbose": False + }, + "database": { + "store_processed_data": True, + "store_raw_data": True, + "force_update_candles": False, + "batch_size": 100, + "flush_interval": 5.0 + } + } + + # Ensure directory exists + config_file.parent.mkdir(parents=True, exist_ok=True) + + with open(config_file, 'w') as f: + json.dump(default_config, f, indent=2) + + self.logger.info(f"šŸ“„ Created default configuration: {config_file}") + + async def initialize_collectors(self) -> bool: + """Initialize all data collectors based on configuration.""" + try: + # Get exchange configuration (now using okx_config.json structure) + exchange_name = self.config.get('exchange', 'okx') + trading_pairs = self.config.get('trading_pairs', []) + data_collection_config = self.config.get('data_collection', {}) + + enabled_pairs = [pair for pair in trading_pairs if pair.get('enabled', True)] + + if not enabled_pairs: + self.logger.warning(f"āš ļø No enabled trading pairs for {exchange_name}") + return False + + self.logger.info(f"šŸ”§ Initializing {len(enabled_pairs)} collectors for {exchange_name.upper()}") + + total_collectors = 0 + + # Create collectors for each trading pair + for pair_config in enabled_pairs: + if await self._create_collector(exchange_name, pair_config, data_collection_config): + total_collectors += 1 + else: + self.logger.error(f"āŒ Failed to create collector for {pair_config.get('symbol', 'unknown')}") + self.stats['errors_count'] += 1 + + self.stats['collectors_created'] = total_collectors + + if total_collectors > 0: + self.logger.info(f"āœ… Successfully initialized {total_collectors} data collectors") + return True + else: + self.logger.error("āŒ No collectors were successfully initialized") + return False + + except Exception as e: + self.logger.error(f"āŒ Failed to initialize collectors: {e}") + self.stats['errors_count'] += 1 + return False + + async def _create_collector(self, exchange_name: str, pair_config: Dict[str, Any], data_collection_config: Dict[str, Any]) -> bool: + """Create a single data collector for a trading pair.""" + try: + from data.exchanges.factory import ExchangeCollectorConfig + + symbol = pair_config['symbol'] + data_types = [DataType(dt) for dt in pair_config.get('data_types', ['trade'])] + timeframes = pair_config.get('timeframes', ['1m', '5m']) + + # Create collector configuration using the proper structure + collector_config = ExchangeCollectorConfig( + exchange=exchange_name, + symbol=symbol, + data_types=data_types, + auto_restart=data_collection_config.get('auto_restart', True), + health_check_interval=data_collection_config.get('health_check_interval', 120.0), + store_raw_data=data_collection_config.get('store_raw_data', True), + custom_params={ + 'component_name': f"{exchange_name}_collector_{symbol.replace('-', '_').lower()}", + 'logger': self.logger, + 'log_errors_only': True, # Clean logging - only errors and essential events + 'force_update_candles': self.config.get('database', {}).get('force_update_candles', False) + } + ) + + # Create collector using factory with proper config + collector = ExchangeFactory.create_collector(collector_config) + + if collector: + # Add to manager + self.collector_manager.add_collector(collector) + self.collectors.append(collector) + + self.logger.info(f"āœ… Created collector: {symbol} [{'/'.join(timeframes)}]") + return True + else: + self.logger.error(f"āŒ Failed to create collector for {symbol}") + return False + + except Exception as e: + self.logger.error(f"āŒ Error creating collector for {pair_config.get('symbol', 'unknown')}: {e}") + return False + + async def start(self) -> bool: + """Start the data collection service.""" + try: + self.start_time = time.time() + self.running = True + + self.logger.info("šŸš€ Starting Data Collection Service...") + + # Initialize database + self.logger.info("šŸ“Š Initializing database connection...") + init_database() + self.logger.info("āœ… Database connection established") + + # Start collector manager + self.logger.info("šŸ”Œ Starting data collectors...") + success = await self.collector_manager.start() + + if success: + self.stats['collectors_running'] = len(self.collectors) + self.stats['last_activity'] = datetime.now() + + self.logger.info("āœ… Data Collection Service started successfully") + self.logger.info(f"šŸ“ˆ Active collectors: {self.stats['collectors_running']}") + return True + else: + self.logger.error("āŒ Failed to start data collectors") + self.stats['errors_count'] += 1 + return False + + except Exception as e: + self.logger.error(f"āŒ Failed to start service: {e}") + self.stats['errors_count'] += 1 + return False + + async def stop(self) -> None: + """Stop the data collection service gracefully.""" + try: + self.logger.info("šŸ›‘ Stopping Data Collection Service...") + self.running = False + + # Stop all collectors + await self.collector_manager.stop() + + # Update statistics + if self.start_time: + self.stats['total_uptime_seconds'] = time.time() - self.start_time + + self.stats['collectors_running'] = 0 + + self.logger.info("āœ… Data Collection Service stopped gracefully") + self.logger.info(f"šŸ“Š Total uptime: {self.stats['total_uptime_seconds']:.1f} seconds") + + except Exception as e: + self.logger.error(f"āŒ Error during service shutdown: {e}") + self.stats['errors_count'] += 1 + + def get_status(self) -> Dict[str, Any]: + """Get current service status.""" + current_time = time.time() + uptime = current_time - self.start_time if self.start_time else 0 + + return { + 'running': self.running, + 'uptime_seconds': uptime, + 'uptime_hours': uptime / 3600, + 'collectors_total': len(self.collectors), + 'collectors_running': self.stats['collectors_running'], + 'errors_count': self.stats['errors_count'], + 'last_activity': self.stats['last_activity'], + 'start_time': datetime.fromtimestamp(self.start_time) if self.start_time else None + } + + def setup_signal_handlers(self) -> None: + """Setup signal handlers for graceful shutdown.""" + def signal_handler(signum, frame): + self.logger.info(f"šŸ“” Received shutdown signal ({signum}), stopping gracefully...") + self.shutdown_event.set() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + async def run(self, duration_hours: Optional[float] = None) -> bool: + """ + Run the data collection service. + + Args: + duration_hours: Optional duration to run (None = indefinite) + + Returns: + bool: True if successful, False if error occurred + """ + self.setup_signal_handlers() + + try: + # Initialize collectors + if not await self.initialize_collectors(): + return False + + # Start service + if not await self.start(): + return False + + # Service running notification + status = self.get_status() + if duration_hours: + self.logger.info(f"ā±ļø Service will run for {duration_hours} hours") + else: + self.logger.info("ā±ļø Service running indefinitely (until stopped)") + + self.logger.info(f"šŸ“Š Active collectors: {status['collectors_running']}") + self.logger.info("šŸ” Monitor with: python scripts/monitor_clean.py") + + # Main service loop + update_interval = 600 # Status update every 10 minutes + last_update = time.time() + + while not self.shutdown_event.is_set(): + # Wait for shutdown signal or timeout + try: + await asyncio.wait_for(self.shutdown_event.wait(), timeout=1.0) + break + except asyncio.TimeoutError: + pass + + current_time = time.time() + + # Check duration limit + if duration_hours: + elapsed_hours = (current_time - self.start_time) / 3600 + if elapsed_hours >= duration_hours: + self.logger.info(f"ā° Completed {duration_hours} hour run") + break + + # Periodic status update + if current_time - last_update >= update_interval: + elapsed_hours = (current_time - self.start_time) / 3600 + self.logger.info(f"ā±ļø Service uptime: {elapsed_hours:.1f} hours") + last_update = current_time + + return True + + except Exception as e: + self.logger.error(f"āŒ Service error: {e}") + self.stats['errors_count'] += 1 + return False + + finally: + await self.stop() + + +# Service entry point function +async def run_data_collection_service( + config_path: str = "config/data_collection.json", + duration_hours: Optional[float] = None +) -> bool: + """ + Run the data collection service. + + Args: + config_path: Path to configuration file + duration_hours: Optional duration in hours (None = indefinite) + + Returns: + bool: True if successful, False otherwise + """ + service = DataCollectionService(config_path) + return await service.run(duration_hours) + + +if __name__ == "__main__": + # Simple CLI when run directly + import argparse + + parser = argparse.ArgumentParser(description="Data Collection Service") + parser.add_argument('--config', default="config/data_collection.json", + help='Configuration file path') + parser.add_argument('--hours', type=float, + help='Run duration in hours (default: indefinite)') + + args = parser.parse_args() + + try: + success = asyncio.run(run_data_collection_service(args.config, args.hours)) + sys.exit(0 if success else 1) + except KeyboardInterrupt: + print("\nšŸ‘‹ Service interrupted by user") + sys.exit(0) + except Exception as e: + print(f"āŒ Fatal error: {e}") + sys.exit(1) \ No newline at end of file diff --git a/data/common/data_types.py b/data/common/data_types.py index 46074a8..f4b38f2 100644 --- a/data/common/data_types.py +++ b/data/common/data_types.py @@ -118,7 +118,7 @@ class OHLCVCandle: @dataclass class CandleProcessingConfig: """Configuration for candle processing - shared across exchanges.""" - timeframes: List[str] = field(default_factory=lambda: ['1s', '5s', '1m', '5m', '15m', '1h']) + timeframes: List[str] = field(default_factory=lambda: ['5s', '1m', '5m', '15m', '1h']) auto_save_candles: bool = True emit_incomplete_candles: bool = False max_trades_per_candle: int = 100000 # Safety limit diff --git a/data/exchanges/okx/collector.py b/data/exchanges/okx/collector.py index b6689f3..e6746d3 100644 --- a/data/exchanges/okx/collector.py +++ b/data/exchanges/okx/collector.py @@ -402,7 +402,7 @@ class OKXCollector(BaseDataCollector): if success and self.logger: action = "Updated" if self.force_update_candles else "Stored" - self.logger.info(f"{self.component_name}: {action} candle: {candle.symbol} {candle.timeframe} at {candle.end_time} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}") + self.logger.debug(f"{self.component_name}: {action} candle: {candle.symbol} {candle.timeframe} at {candle.end_time} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}") except DatabaseOperationError as e: if self.logger: @@ -488,7 +488,7 @@ class OKXCollector(BaseDataCollector): """ self._processed_candles += 1 if self.logger: - self.logger.info(f"{self.component_name}: Completed candle: {candle.symbol} {candle.timeframe} O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}") + self.logger.debug(f"{self.component_name}: Completed candle: {candle.symbol} {candle.timeframe} O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}") # Store completed candle in market_data table if candle.is_complete: diff --git a/database/operations.py b/database/operations.py index c3dd10e..6b57775 100644 --- a/database/operations.py +++ b/database/operations.py @@ -45,6 +45,11 @@ class BaseRepository: if self.logger: self.logger.info(message) + def log_debug(self, message: str) -> None: + """Log debug message if logger is available.""" + if self.logger: + self.logger.debug(message) + def log_error(self, message: str) -> None: """Log error message if logger is available.""" if self.logger: @@ -133,7 +138,7 @@ class MarketDataRepository(BaseRepository): session.commit() - self.log_info(f"{action} candle: {candle.symbol} {candle.timeframe} at {candle_timestamp} (force_update={force_update})") + self.log_debug(f"{action} candle: {candle.symbol} {candle.timeframe} at {candle_timestamp} (force_update={force_update})") return True except Exception as e: @@ -294,7 +299,7 @@ class RawTradeRepository(BaseRepository): session.commit() - self.log_info(f"Stored raw {data_point.data_type.value} data for {data_point.symbol}") + self.log_debug(f"Stored raw {data_point.data_type.value} data for {data_point.symbol}") return True except Exception as e: @@ -343,7 +348,7 @@ class RawTradeRepository(BaseRepository): session.commit() - self.log_info(f"Stored raw WebSocket data: {data_type} for {symbol}") + self.log_debug(f"Stored raw WebSocket data: {data_type} for {symbol}") return True except Exception as e: diff --git a/docs/data-collection-service.md b/docs/data-collection-service.md new file mode 100644 index 0000000..bb72b79 --- /dev/null +++ b/docs/data-collection-service.md @@ -0,0 +1,481 @@ +# Data Collection Service + +The Data Collection Service is a production-ready service for cryptocurrency market data collection with clean logging and robust error handling. It manages multiple data collectors for different trading pairs and exchanges. + +## Features + +- **Clean Logging**: Only essential information (connections, disconnections, errors) +- **Multi-Exchange Support**: Extensible architecture for multiple exchanges +- **Health Monitoring**: Built-in health checks and auto-recovery +- **Configurable**: JSON-based configuration with sensible defaults +- **Graceful Shutdown**: Proper signal handling and cleanup +- **Testing**: Comprehensive unit test coverage + +## Quick Start + +### Basic Usage + +```bash +# Start with default configuration (indefinite run) +python scripts/start_data_collection.py + +# Run for 8 hours +python scripts/start_data_collection.py --hours 8 + +# Use custom configuration +python scripts/start_data_collection.py --config config/my_config.json +``` + +### Monitoring + +```bash +# Check status once +python scripts/monitor_clean.py + +# Monitor continuously every 60 seconds +python scripts/monitor_clean.py --interval 60 +``` + +## Configuration + +The service uses JSON configuration files with automatic default creation if none exists. + +### Default Configuration Location + +`config/data_collection.json` + +### Configuration Structure + +```json +{ + "exchanges": { + "okx": { + "enabled": true, + "trading_pairs": [ + { + "symbol": "BTC-USDT", + "enabled": true, + "data_types": ["trade"], + "timeframes": ["1m", "5m", "15m", "1h"] + }, + { + "symbol": "ETH-USDT", + "enabled": true, + "data_types": ["trade"], + "timeframes": ["1m", "5m", "15m", "1h"] + } + ] + } + }, + "collection_settings": { + "health_check_interval": 120, + "store_raw_data": true, + "auto_restart": true, + "max_restart_attempts": 3 + }, + "logging": { + "level": "INFO", + "log_errors_only": true, + "verbose_data_logging": false + } +} +``` + +### Configuration Options + +#### Exchange Settings + +- **enabled**: Whether to enable this exchange +- **trading_pairs**: Array of trading pair configurations + +#### Trading Pair Settings + +- **symbol**: Trading pair symbol (e.g., "BTC-USDT") +- **enabled**: Whether to collect data for this pair +- **data_types**: Types of data to collect (["trade"], ["ticker"], etc.) +- **timeframes**: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"]) + +#### Collection Settings + +- **health_check_interval**: Health check frequency in seconds +- **store_raw_data**: Whether to store raw trade data +- **auto_restart**: Enable automatic restart on failures +- **max_restart_attempts**: Maximum restart attempts before giving up + +#### Logging Settings + +- **level**: Log level ("DEBUG", "INFO", "WARNING", "ERROR") +- **log_errors_only**: Only log errors and essential events +- **verbose_data_logging**: Enable verbose logging of individual trades/candles + +## Service Architecture + +### Core Components + +1. **DataCollectionService**: Main service class managing the lifecycle +2. **CollectorManager**: Manages multiple data collectors with health monitoring +3. **ExchangeFactory**: Creates exchange-specific collectors +4. **BaseDataCollector**: Abstract base for all data collectors + +### Data Flow + +``` +Exchange API → Data Collector → Data Processor → Database + ↓ + Health Monitor → Service Manager +``` + +### Storage + +- **Raw Data**: PostgreSQL `raw_trades` table +- **Candles**: PostgreSQL `market_data` table with multiple timeframes +- **Real-time**: Redis pub/sub for live data distribution + +## Logging Philosophy + +The service implements **clean production logging** focused on operational needs: + +### What Gets Logged + +āœ… **Service Lifecycle** +- Service start/stop +- Collector initialization +- Database connections + +āœ… **Connection Events** +- WebSocket connect/disconnect +- Reconnection attempts +- API errors + +āœ… **Health & Errors** +- Health check results +- Error conditions +- Recovery actions + +āœ… **Statistics** +- Periodic uptime reports +- Collection summary + +### What Doesn't Get Logged + +āŒ **Individual Data Points** +- Every trade received +- Every candle generated +- Raw market data + +āŒ **Verbose Operations** +- Database queries +- Internal processing steps +- Routine heartbeats + +## API Reference + +### DataCollectionService + +The main service class for managing data collection. + +#### Constructor + +```python +DataCollectionService(config_path: str = "config/data_collection.json") +``` + +#### Methods + +##### `async run(duration_hours: Optional[float] = None) -> bool` + +Run the service for a specified duration or indefinitely. + +**Parameters:** +- `duration_hours`: Optional duration in hours (None = indefinite) + +**Returns:** +- `bool`: True if successful, False if error occurred + +##### `async start() -> bool` + +Start the data collection service. + +**Returns:** +- `bool`: True if started successfully + +##### `async stop() -> None` + +Stop the service gracefully. + +##### `get_status() -> Dict[str, Any]` + +Get current service status including uptime, collector counts, and errors. + +**Returns:** +- `dict`: Status information + +### Standalone Function + +#### `run_data_collection_service(config_path, duration_hours)` + +```python +async def run_data_collection_service( + config_path: str = "config/data_collection.json", + duration_hours: Optional[float] = None +) -> bool +``` + +Convenience function to run the service. + +## Integration Examples + +### Basic Integration + +```python +import asyncio +from data.collection_service import DataCollectionService + +async def main(): + service = DataCollectionService("config/my_config.json") + await service.run(duration_hours=24) # Run for 24 hours + +if __name__ == "__main__": + asyncio.run(main()) +``` + +### Custom Status Monitoring + +```python +import asyncio +from data.collection_service import DataCollectionService + +async def monitor_service(): + service = DataCollectionService() + + # Start service in background + start_task = asyncio.create_task(service.run()) + + # Monitor status every 5 minutes + while service.running: + status = service.get_status() + print(f"Uptime: {status['uptime_hours']:.1f}h, " + f"Collectors: {status['collectors_running']}, " + f"Errors: {status['errors_count']}") + + await asyncio.sleep(300) # 5 minutes + + await start_task + +asyncio.run(monitor_service()) +``` + +### Programmatic Control + +```python +import asyncio +from data.collection_service import DataCollectionService + +async def controlled_collection(): + service = DataCollectionService() + + # Initialize and start + await service.initialize_collectors() + await service.start() + + try: + # Run for 1 hour + await asyncio.sleep(3600) + finally: + # Graceful shutdown + await service.stop() + +asyncio.run(controlled_collection()) +``` + +## Error Handling + +The service implements robust error handling at multiple levels: + +### Service Level + +- **Configuration Errors**: Invalid JSON, missing files +- **Initialization Errors**: Database connection, collector creation +- **Runtime Errors**: Unexpected exceptions during operation + +### Collector Level + +- **Connection Errors**: WebSocket disconnections, API failures +- **Data Errors**: Invalid data formats, processing failures +- **Health Errors**: Failed health checks, timeout conditions + +### Recovery Strategies + +1. **Automatic Restart**: Collectors auto-restart on failures +2. **Exponential Backoff**: Increasing delays between retry attempts +3. **Circuit Breaker**: Stop retrying after max attempts exceeded +4. **Graceful Degradation**: Continue with healthy collectors + +## Testing + +### Running Tests + +```bash +# Run all data collection service tests +uv run pytest tests/test_data_collection_service.py -v + +# Run specific test +uv run pytest tests/test_data_collection_service.py::TestDataCollectionService::test_service_initialization -v + +# Run with coverage +uv run pytest tests/test_data_collection_service.py --cov=data.collection_service +``` + +### Test Coverage + +The test suite covers: +- Service initialization and configuration +- Collector creation and management +- Service lifecycle (start/stop) +- Error handling and recovery +- Configuration validation +- Signal handling +- Status reporting + +## Troubleshooting + +### Common Issues + +#### Configuration Not Found + +``` +āŒ Failed to load config from config/data_collection.json: [Errno 2] No such file or directory +``` + +**Solution**: The service will create a default configuration. Check the created file and adjust as needed. + +#### Database Connection Failed + +``` +āŒ Database connection failed: connection refused +``` + +**Solution**: Ensure PostgreSQL and Redis are running via Docker: + +```bash +docker-compose up -d postgres redis +``` + +#### No Collectors Created + +``` +āŒ No collectors were successfully initialized +``` + +**Solution**: Check configuration - ensure at least one exchange is enabled with valid trading pairs. + +#### WebSocket Connection Issues + +``` +āŒ Failed to start data collectors +``` + +**Solution**: Check network connectivity and API credentials. Verify exchange is accessible. + +### Debug Mode + +For verbose debugging, modify the logging configuration: + +```json +{ + "logging": { + "level": "DEBUG", + "log_errors_only": false, + "verbose_data_logging": true + } +} +``` + +āš ļø **Warning**: Debug mode generates extensive logs and should not be used in production. + +## Production Deployment + +### Docker + +The service can be containerized for production deployment: + +```dockerfile +FROM python:3.11-slim + +WORKDIR /app +COPY . . + +RUN pip install uv +RUN uv pip install -r requirements.txt + +CMD ["python", "scripts/start_data_collection.py", "--config", "config/production.json"] +``` + +### Systemd Service + +Create a systemd service for Linux deployment: + +```ini +[Unit] +Description=Cryptocurrency Data Collection Service +After=network.target postgres.service redis.service + +[Service] +Type=simple +User=crypto-collector +WorkingDirectory=/opt/crypto-dashboard +ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target +``` + +### Environment Variables + +Configure sensitive data via environment variables: + +```bash +export POSTGRES_HOST=localhost +export POSTGRES_PORT=5432 +export POSTGRES_DB=crypto_dashboard +export POSTGRES_USER=dashboard_user +export POSTGRES_PASSWORD=secure_password +export REDIS_HOST=localhost +export REDIS_PORT=6379 +``` + +## Performance Considerations + +### Resource Usage + +- **Memory**: ~100MB base + ~10MB per trading pair +- **CPU**: Low (async I/O bound) +- **Network**: ~1KB/s per trading pair +- **Storage**: ~1GB/day per trading pair (with raw data) + +### Scaling + +- **Vertical**: Increase timeframes and trading pairs +- **Horizontal**: Run multiple services with different configurations +- **Database**: Use TimescaleDB for time-series optimization + +### Optimization Tips + +1. **Disable Raw Data**: Set `store_raw_data: false` to reduce storage +2. **Limit Timeframes**: Only collect needed timeframes +3. **Batch Processing**: Use longer health check intervals +4. **Connection Pooling**: Database connections are automatically pooled + +## Changelog + +### v1.0.0 (Current) + +- Initial implementation +- OKX exchange support +- Clean logging system +- Comprehensive test coverage +- JSON configuration +- Health monitoring +- Graceful shutdown \ No newline at end of file diff --git a/example_complete_series_aggregation.py b/example_complete_series_aggregation.py deleted file mode 100644 index e5b170d..0000000 --- a/example_complete_series_aggregation.py +++ /dev/null @@ -1,236 +0,0 @@ -#!/usr/bin/env python3 -""" -Example: Complete Time Series Aggregation - -This example shows how to modify the aggregation system to emit candles -for every time period, even when there are no trades. -""" - -import asyncio -from datetime import datetime, timezone, timedelta -from decimal import Decimal -from typing import Dict, List, Optional - -from data.common.data_types import StandardizedTrade, OHLCVCandle, CandleProcessingConfig -from data.common.aggregation import RealTimeCandleProcessor - - -class CompleteSeriesProcessor(RealTimeCandleProcessor): - """ - Extended processor that emits candles for every time period, - filling gaps with previous close prices when no trades occur. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.last_prices = {} # Track last known price for each timeframe - self.timers = {} # Timer tasks for each timeframe - - async def start_time_based_emission(self): - """Start timers to emit candles on time boundaries regardless of trades.""" - for timeframe in self.config.timeframes: - self.timers[timeframe] = asyncio.create_task( - self._time_based_candle_emitter(timeframe) - ) - - async def stop_time_based_emission(self): - """Stop all timers.""" - for task in self.timers.values(): - task.cancel() - self.timers.clear() - - async def _time_based_candle_emitter(self, timeframe: str): - """Emit candles on time boundaries for a specific timeframe.""" - try: - while True: - # Calculate next boundary - now = datetime.now(timezone.utc) - next_boundary = self._get_next_time_boundary(now, timeframe) - - # Wait until next boundary - wait_seconds = (next_boundary - now).total_seconds() - if wait_seconds > 0: - await asyncio.sleep(wait_seconds) - - # Check if we have an active bucket with trades - current_bucket = self.current_buckets.get(timeframe) - - if current_bucket is None or current_bucket.trade_count == 0: - # No trades during this period - create empty candle - await self._emit_empty_candle(timeframe, next_boundary) - # If there are trades, they will be handled by normal trade processing - - except asyncio.CancelledError: - pass # Timer was cancelled - - async def _emit_empty_candle(self, timeframe: str, end_time: datetime): - """Emit an empty candle when no trades occurred during the period.""" - try: - # Calculate start time - start_time = self._get_bucket_start_time(end_time - timedelta(seconds=1), timeframe) - - # Use last known price or default - last_price = self.last_prices.get(timeframe, Decimal('0')) - - # Create empty candle with last known price as OHLC - empty_candle = OHLCVCandle( - symbol=self.symbol, - timeframe=timeframe, - start_time=start_time, - end_time=end_time, - open=last_price, - high=last_price, - low=last_price, - close=last_price, - volume=Decimal('0'), - trade_count=0, - exchange=self.exchange, - is_complete=True, - first_trade_time=None, - last_trade_time=None - ) - - # Emit the empty candle - self._emit_candle(empty_candle) - - if self.logger: - self.logger.info( - f"ā­• {timeframe.upper()} EMPTY CANDLE at {end_time.strftime('%H:%M:%S')}: " - f"No trades, using last price ${last_price}" - ) - - except Exception as e: - if self.logger: - self.logger.error(f"Error emitting empty candle: {e}") - - def _emit_candle(self, candle: OHLCVCandle) -> None: - """Override to track last prices.""" - # Update last known price - if candle.close > 0: - self.last_prices[candle.timeframe] = candle.close - - # Call parent implementation - super()._emit_candle(candle) - - def _get_next_time_boundary(self, current_time: datetime, timeframe: str) -> datetime: - """Calculate the next time boundary for a timeframe.""" - if timeframe == '1s': - # Next second boundary - return (current_time + timedelta(seconds=1)).replace(microsecond=0) - elif timeframe == '5s': - # Next 5-second boundary - next_sec = (current_time.second // 5 + 1) * 5 - if next_sec >= 60: - return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) - return current_time.replace(second=next_sec, microsecond=0) - elif timeframe == '10s': - # Next 10-second boundary - next_sec = (current_time.second // 10 + 1) * 10 - if next_sec >= 60: - return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) - return current_time.replace(second=next_sec, microsecond=0) - elif timeframe == '15s': - # Next 15-second boundary - next_sec = (current_time.second // 15 + 1) * 15 - if next_sec >= 60: - return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) - return current_time.replace(second=next_sec, microsecond=0) - elif timeframe == '30s': - # Next 30-second boundary - next_sec = (current_time.second // 30 + 1) * 30 - if next_sec >= 60: - return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1) - return current_time.replace(second=next_sec, microsecond=0) - elif timeframe == '1m': - # Next minute boundary - return (current_time + timedelta(minutes=1)).replace(second=0, microsecond=0) - elif timeframe == '5m': - # Next 5-minute boundary - next_min = (current_time.minute // 5 + 1) * 5 - if next_min >= 60: - return current_time.replace(minute=0, second=0, microsecond=0, hour=current_time.hour + 1) - return current_time.replace(minute=next_min, second=0, microsecond=0) - else: - # For other timeframes, add appropriate logic - return current_time + timedelta(minutes=1) - - -# Example usage -async def demo_complete_series(): - """Demonstrate complete time series aggregation.""" - print("šŸ• Complete Time Series Aggregation Demo") - print("This will emit candles even when no trades occur\n") - - # Create processor with complete series capability - config = CandleProcessingConfig(timeframes=['1s', '5s', '30s']) - processor = CompleteSeriesProcessor( - symbol="BTC-USDT", - exchange="demo", - config=config, - component_name="complete_series_demo" - ) - - # Set initial price - processor.last_prices = {'1s': Decimal('50000'), '5s': Decimal('50000'), '30s': Decimal('50000')} - - # Add callback to see emitted candles - def on_candle(candle: OHLCVCandle): - candle_type = "TRADE" if candle.trade_count > 0 else "EMPTY" - print(f"šŸ“Š {candle_type} {candle.timeframe.upper()} at {candle.end_time.strftime('%H:%M:%S')}: " - f"${candle.close} (T={candle.trade_count})") - - processor.add_candle_callback(on_candle) - - # Start time-based emission - await processor.start_time_based_emission() - - try: - # Simulate some trades with gaps - print("Simulating trades with gaps...\n") - - base_time = datetime.now(timezone.utc) - - # Trade at T+0 - trade1 = StandardizedTrade( - symbol="BTC-USDT", - trade_id="1", - price=Decimal('50100'), - size=Decimal('0.1'), - side="buy", - timestamp=base_time, - exchange="demo" - ) - processor.process_trade(trade1) - - # Wait 3 seconds (should see empty candles for missing periods) - await asyncio.sleep(3) - - # Trade at T+3 - trade2 = StandardizedTrade( - symbol="BTC-USDT", - trade_id="2", - price=Decimal('50200'), - size=Decimal('0.2'), - side="sell", - timestamp=base_time + timedelta(seconds=3), - exchange="demo" - ) - processor.process_trade(trade2) - - # Wait more to see more empty candles - await asyncio.sleep(5) - - print("\nāœ… Demo completed - You can see both trade candles and empty candles") - - finally: - await processor.stop_time_based_emission() - - -if __name__ == "__main__": - print("Complete Time Series Aggregation Example") - print("=" * 50) - print("This shows how to emit candles even when no trades occur.") - print("Uncomment the line below to run the demo:\n") - - # Uncomment to run the demo: - # asyncio.run(demo_complete_series()) \ No newline at end of file diff --git a/scripts/start_data_collection.py b/scripts/start_data_collection.py new file mode 100644 index 0000000..b6fab4a --- /dev/null +++ b/scripts/start_data_collection.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +""" +Start Data Collection Service + +Simple script to start the cryptocurrency data collection service +with clean console output and proper configuration. + +Usage: + python scripts/start_data_collection.py [options] + +Examples: + # Start with default configuration (indefinite run) + python scripts/start_data_collection.py + + # Run for 8 hours with default config + python scripts/start_data_collection.py --hours 8 + + # Use custom configuration file + python scripts/start_data_collection.py --config config/my_config.json + + # Run for 24 hours with custom config + python scripts/start_data_collection.py --config config/production.json --hours 24 +""" + +import asyncio +import argparse +import sys +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from data.collection_service import run_data_collection_service + + +def display_banner(config_path: str, duration_hours: float = None): + """Display service startup banner.""" + print("šŸš€ CRYPTOCURRENCY DATA COLLECTION SERVICE") + print("=" * 55) + print(f"šŸ“ Configuration: {config_path}") + + if duration_hours: + print(f"ā±ļø Duration: {duration_hours} hours") + else: + print("ā±ļø Duration: Indefinite (until stopped)") + + print("šŸ“Š Logging: Essential events only (connections, errors)") + print("šŸ’¾ Storage: PostgreSQL + Redis") + print("šŸ” Monitor: python scripts/monitor_clean.py") + print("ā¹ļø Stop: Ctrl+C") + print("=" * 55) + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="Start Cryptocurrency Data Collection Service", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Start with default configuration (indefinite) + python scripts/start_data_collection.py + + # Run for 8 hours + python scripts/start_data_collection.py --hours 8 + + # Use custom configuration + python scripts/start_data_collection.py --config config/custom.json + + # Production run for 24 hours + python scripts/start_data_collection.py --config config/production.json --hours 24 + +Configuration: + The service will create a default configuration file if none exists. + Default location: config/data_collection.json + + The configuration includes: + - Exchange settings (OKX by default) + - Trading pairs (BTC-USDT, ETH-USDT by default) + - Data types and timeframes + - Health monitoring settings + """ + ) + + parser.add_argument( + '--config', + default="config/data_collection.json", + help='Configuration file path (default: config/data_collection.json)' + ) + + parser.add_argument( + '--hours', + type=float, + help='Collection duration in hours (default: indefinite until Ctrl+C)' + ) + + parser.add_argument( + '--quiet', + action='store_true', + help='Suppress banner and start directly' + ) + + args = parser.parse_args() + + # Validate arguments + if args.hours is not None and args.hours <= 0: + print("āŒ Duration must be positive") + sys.exit(1) + + # Display banner unless quiet mode + if not args.quiet: + display_banner(args.config, args.hours) + + try: + # Start the service + print("šŸŽÆ Starting service..." if not args.quiet else "") + + success = asyncio.run(run_data_collection_service( + config_path=args.config, + duration_hours=args.hours + )) + + if success: + print("āœ… Service completed successfully" if not args.quiet else "") + sys.exit(0) + else: + print("āŒ Service failed" if not args.quiet else "") + sys.exit(1) + + except KeyboardInterrupt: + print("\nšŸ‘‹ Service interrupted by user") + sys.exit(0) + except Exception as e: + print(f"āŒ Fatal error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tasks/tasks-crypto-bot-prd.md b/tasks/tasks-crypto-bot-prd.md index dbf37d9..1174d4c 100644 --- a/tasks/tasks-crypto-bot-prd.md +++ b/tasks/tasks-crypto-bot-prd.md @@ -12,6 +12,7 @@ - `database/init/schema_clean.sql` - Copy of clean schema for Docker initialization - `data/base_collector.py` - Abstract base class for all data collectors with standardized interface, error handling, data validation, health monitoring, and auto-restart capabilities - `data/collector_manager.py` - Centralized collector management with health monitoring, auto-recovery, and coordinated lifecycle management +- `data/collection_service.py` - Production-ready data collection service with clean logging, multi-exchange support, and robust error handling - `data/__init__.py` - Data collection package initialization - `data/okx_collector.py` - OKX API integration for real-time market data collection - `data/aggregator.py` - OHLCV candle aggregation and processing @@ -26,6 +27,9 @@ - `config/strategies/` - Directory for JSON strategy parameter files - `config/settings.py` - Centralized configuration settings using Pydantic - `scripts/dev.py` - Development setup and management script +- `scripts/start_data_collection.py` - Simple script to start the data collection service with clean output +- `scripts/production_clean.py` - Clean production OKX data collector script (adapted for service development) +- `scripts/monitor_clean.py` - Clean database monitor for production data collection status - `scripts/init_database.py` - Database initialization and verification script - `scripts/test_models.py` - Test script for SQLAlchemy models integration verification - `utils/logger.py` - Enhanced unified logging system with verbose console output, automatic cleanup, and configurable retention [USE THIS FOR ALL LOGGING] @@ -35,12 +39,14 @@ - `tests/test_strategies.py` - Unit tests for strategy implementations - `tests/test_bot_manager.py` - Unit tests for bot management functionality - `tests/test_data_collection.py` - Unit tests for data collection and aggregation +- `tests/test_data_collection_service.py` - Comprehensive unit tests for the DataCollectionService (25 tests) - `tests/test_base_collector.py` - Comprehensive unit tests for the BaseDataCollector abstract class (13 tests) - `tests/test_collector_manager.py` - Comprehensive unit tests for the CollectorManager with health monitoring (14 tests) - `tests/test_logging_enhanced.py` - Comprehensive unit tests for enhanced logging features (16 tests) - `tests/test_indicators.py` - Comprehensive unit tests for technical indicators module (18 tests) - `docs/setup.md` - Comprehensive setup guide for new machines and environments - `docs/logging.md` - Complete documentation for the enhanced unified logging system +- `docs/data-collection-service.md` - Complete documentation for the data collection service with usage examples, configuration, and deployment guide - `docs/components/technical-indicators.md` - Complete documentation for the technical indicators module with usage examples and integration guide ## Tasks @@ -66,8 +72,8 @@ - [x] 2.4 Implement Redis channels for real-time data distribution - [x] 2.5 Create data storage layer for OHLCV data in PostgreSQL - [x] 2.6 Add technical indicators calculation (SMA, EMA, RSI, MACD, Bollinger Bands) - - [ ] 2.7 Implement data recovery and reconnection logic for API failures - - [ ] 2.8 Create data collection service with proper logging + - [x] 2.7 Implement data recovery and reconnection logic for API failures (DEFERRED: Basic reconnection exists, comprehensive historical data recovery moved to section 13.0 for future implementation) + - [x] 2.8 Create data collection service with proper logging - [ ] 2.9 Unit test data collection and aggregation logic - [ ] 3.0 Basic Dashboard for Data Visualization and Analysis @@ -176,6 +182,9 @@ - [ ] 13.5 Add caching layer for frequently accessed market data - [ ] 13.6 Optimize data retention and archival strategies - [ ] 13.7 Implement horizontal scaling for high-volume trading scenarios + - [ ] 13.8 Implement comprehensive data recovery with OKX REST API for historical backfill + - [ ] 13.9 Add gap detection and automatic data recovery during reconnections + - [ ] 13.10 Implement data integrity validation and conflict resolution for recovered data