- Deleted `example_complete_series_aggregation.py` as it is no longer needed. - Introduced `data_collection_service.py`, a production-ready service for cryptocurrency market data collection with clean logging and robust error handling. - Added configuration management for multiple trading pairs and exchanges, supporting health monitoring and graceful shutdown. - Created `data_collection.json` for service configuration, including exchange settings and logging preferences. - Updated `CandleProcessingConfig` to reflect changes in timeframes for candle processing. - Enhanced documentation to cover the new data collection service and its configuration, ensuring clarity for users.
449 lines
17 KiB
Python
449 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Data Collection Service
|
|
|
|
Production-ready service for cryptocurrency market data collection
|
|
with clean logging and robust error handling.
|
|
|
|
This service manages multiple data collectors for different trading pairs
|
|
and exchanges, with proper health monitoring and graceful shutdown.
|
|
"""
|
|
|
|
import asyncio
|
|
import signal
|
|
import sys
|
|
import time
|
|
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Optional, Dict, Any
|
|
import logging
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
# Set environment for clean production logging
|
|
import os
|
|
os.environ['DEBUG'] = 'false'
|
|
|
|
# Suppress verbose SQLAlchemy logging for production
|
|
logging.getLogger('sqlalchemy').setLevel(logging.WARNING)
|
|
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING)
|
|
logging.getLogger('sqlalchemy.pool').setLevel(logging.WARNING)
|
|
logging.getLogger('sqlalchemy.dialects').setLevel(logging.WARNING)
|
|
logging.getLogger('sqlalchemy.orm').setLevel(logging.WARNING)
|
|
|
|
from data.exchanges.factory import ExchangeFactory
|
|
from data.collector_manager import CollectorManager
|
|
from data.base_collector import DataType
|
|
from database.connection import init_database
|
|
from utils.logger import get_logger
|
|
|
|
|
|
class DataCollectionService:
|
|
"""
|
|
Production data collection service.
|
|
|
|
Manages multiple data collectors with clean logging focused on:
|
|
- Service lifecycle (start/stop/restart)
|
|
- Connection status (connect/disconnect/reconnect)
|
|
- Health status and errors
|
|
- Basic collection statistics
|
|
|
|
Excludes verbose logging of individual trades/candles for production clarity.
|
|
"""
|
|
|
|
def __init__(self, config_path: str = "config/data_collection.json"):
|
|
"""Initialize the data collection service."""
|
|
self.config_path = config_path
|
|
|
|
# Initialize clean logging first - only essential information
|
|
self.logger = get_logger(
|
|
"data_collection_service",
|
|
log_level="INFO",
|
|
verbose=False # Clean console output
|
|
)
|
|
|
|
# Load configuration after logger is initialized
|
|
self.config = self._load_config()
|
|
|
|
# Core components
|
|
self.collector_manager = CollectorManager(
|
|
logger=self.logger,
|
|
log_errors_only=True # Only log errors and essential events
|
|
)
|
|
self.collectors: List = []
|
|
|
|
# Service state
|
|
self.running = False
|
|
self.start_time = None
|
|
self.shutdown_event = asyncio.Event()
|
|
|
|
# Statistics for monitoring
|
|
self.stats = {
|
|
'collectors_created': 0,
|
|
'collectors_running': 0,
|
|
'total_uptime_seconds': 0,
|
|
'last_activity': None,
|
|
'errors_count': 0
|
|
}
|
|
|
|
self.logger.info("🚀 Data Collection Service initialized")
|
|
self.logger.info(f"📁 Configuration: {config_path}")
|
|
|
|
def _load_config(self) -> Dict[str, Any]:
|
|
"""Load service configuration from JSON file."""
|
|
try:
|
|
config_file = Path(self.config_path)
|
|
if not config_file.exists():
|
|
# Create default config if it doesn't exist
|
|
self._create_default_config(config_file)
|
|
|
|
with open(config_file, 'r') as f:
|
|
config = json.load(f)
|
|
|
|
self.logger.info(f"✅ Configuration loaded from {self.config_path}")
|
|
return config
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"❌ Failed to load configuration: {e}")
|
|
raise
|
|
|
|
def _create_default_config(self, config_file: Path) -> None:
|
|
"""Create a default configuration file."""
|
|
default_config = {
|
|
"exchange": "okx",
|
|
"connection": {
|
|
"public_ws_url": "wss://ws.okx.com:8443/ws/v5/public",
|
|
"private_ws_url": "wss://ws.okx.com:8443/ws/v5/private",
|
|
"ping_interval": 25.0,
|
|
"pong_timeout": 10.0,
|
|
"max_reconnect_attempts": 5,
|
|
"reconnect_delay": 5.0
|
|
},
|
|
"data_collection": {
|
|
"store_raw_data": True,
|
|
"health_check_interval": 120.0,
|
|
"auto_restart": True,
|
|
"buffer_size": 1000
|
|
},
|
|
"trading_pairs": [
|
|
{
|
|
"symbol": "BTC-USDT",
|
|
"enabled": True,
|
|
"data_types": ["trade", "orderbook"],
|
|
"timeframes": ["1m", "5m", "15m", "1h"],
|
|
"channels": {
|
|
"trades": "trades",
|
|
"orderbook": "books5",
|
|
"ticker": "tickers"
|
|
}
|
|
},
|
|
{
|
|
"symbol": "ETH-USDT",
|
|
"enabled": True,
|
|
"data_types": ["trade", "orderbook"],
|
|
"timeframes": ["1m", "5m", "15m", "1h"],
|
|
"channels": {
|
|
"trades": "trades",
|
|
"orderbook": "books5",
|
|
"ticker": "tickers"
|
|
}
|
|
}
|
|
],
|
|
"logging": {
|
|
"component_name_template": "okx_collector_{symbol}",
|
|
"log_level": "INFO",
|
|
"verbose": False
|
|
},
|
|
"database": {
|
|
"store_processed_data": True,
|
|
"store_raw_data": True,
|
|
"force_update_candles": False,
|
|
"batch_size": 100,
|
|
"flush_interval": 5.0
|
|
}
|
|
}
|
|
|
|
# Ensure directory exists
|
|
config_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(config_file, 'w') as f:
|
|
json.dump(default_config, f, indent=2)
|
|
|
|
self.logger.info(f"📄 Created default configuration: {config_file}")
|
|
|
|
async def initialize_collectors(self) -> bool:
|
|
"""Initialize all data collectors based on configuration."""
|
|
try:
|
|
# Get exchange configuration (now using okx_config.json structure)
|
|
exchange_name = self.config.get('exchange', 'okx')
|
|
trading_pairs = self.config.get('trading_pairs', [])
|
|
data_collection_config = self.config.get('data_collection', {})
|
|
|
|
enabled_pairs = [pair for pair in trading_pairs if pair.get('enabled', True)]
|
|
|
|
if not enabled_pairs:
|
|
self.logger.warning(f"⚠️ No enabled trading pairs for {exchange_name}")
|
|
return False
|
|
|
|
self.logger.info(f"🔧 Initializing {len(enabled_pairs)} collectors for {exchange_name.upper()}")
|
|
|
|
total_collectors = 0
|
|
|
|
# Create collectors for each trading pair
|
|
for pair_config in enabled_pairs:
|
|
if await self._create_collector(exchange_name, pair_config, data_collection_config):
|
|
total_collectors += 1
|
|
else:
|
|
self.logger.error(f"❌ Failed to create collector for {pair_config.get('symbol', 'unknown')}")
|
|
self.stats['errors_count'] += 1
|
|
|
|
self.stats['collectors_created'] = total_collectors
|
|
|
|
if total_collectors > 0:
|
|
self.logger.info(f"✅ Successfully initialized {total_collectors} data collectors")
|
|
return True
|
|
else:
|
|
self.logger.error("❌ No collectors were successfully initialized")
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"❌ Failed to initialize collectors: {e}")
|
|
self.stats['errors_count'] += 1
|
|
return False
|
|
|
|
async def _create_collector(self, exchange_name: str, pair_config: Dict[str, Any], data_collection_config: Dict[str, Any]) -> bool:
|
|
"""Create a single data collector for a trading pair."""
|
|
try:
|
|
from data.exchanges.factory import ExchangeCollectorConfig
|
|
|
|
symbol = pair_config['symbol']
|
|
data_types = [DataType(dt) for dt in pair_config.get('data_types', ['trade'])]
|
|
timeframes = pair_config.get('timeframes', ['1m', '5m'])
|
|
|
|
# Create collector configuration using the proper structure
|
|
collector_config = ExchangeCollectorConfig(
|
|
exchange=exchange_name,
|
|
symbol=symbol,
|
|
data_types=data_types,
|
|
auto_restart=data_collection_config.get('auto_restart', True),
|
|
health_check_interval=data_collection_config.get('health_check_interval', 120.0),
|
|
store_raw_data=data_collection_config.get('store_raw_data', True),
|
|
custom_params={
|
|
'component_name': f"{exchange_name}_collector_{symbol.replace('-', '_').lower()}",
|
|
'logger': self.logger,
|
|
'log_errors_only': True, # Clean logging - only errors and essential events
|
|
'force_update_candles': self.config.get('database', {}).get('force_update_candles', False)
|
|
}
|
|
)
|
|
|
|
# Create collector using factory with proper config
|
|
collector = ExchangeFactory.create_collector(collector_config)
|
|
|
|
if collector:
|
|
# Add to manager
|
|
self.collector_manager.add_collector(collector)
|
|
self.collectors.append(collector)
|
|
|
|
self.logger.info(f"✅ Created collector: {symbol} [{'/'.join(timeframes)}]")
|
|
return True
|
|
else:
|
|
self.logger.error(f"❌ Failed to create collector for {symbol}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"❌ Error creating collector for {pair_config.get('symbol', 'unknown')}: {e}")
|
|
return False
|
|
|
|
async def start(self) -> bool:
|
|
"""Start the data collection service."""
|
|
try:
|
|
self.start_time = time.time()
|
|
self.running = True
|
|
|
|
self.logger.info("🚀 Starting Data Collection Service...")
|
|
|
|
# Initialize database
|
|
self.logger.info("📊 Initializing database connection...")
|
|
init_database()
|
|
self.logger.info("✅ Database connection established")
|
|
|
|
# Start collector manager
|
|
self.logger.info("🔌 Starting data collectors...")
|
|
success = await self.collector_manager.start()
|
|
|
|
if success:
|
|
self.stats['collectors_running'] = len(self.collectors)
|
|
self.stats['last_activity'] = datetime.now()
|
|
|
|
self.logger.info("✅ Data Collection Service started successfully")
|
|
self.logger.info(f"📈 Active collectors: {self.stats['collectors_running']}")
|
|
return True
|
|
else:
|
|
self.logger.error("❌ Failed to start data collectors")
|
|
self.stats['errors_count'] += 1
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"❌ Failed to start service: {e}")
|
|
self.stats['errors_count'] += 1
|
|
return False
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the data collection service gracefully."""
|
|
try:
|
|
self.logger.info("🛑 Stopping Data Collection Service...")
|
|
self.running = False
|
|
|
|
# Stop all collectors
|
|
await self.collector_manager.stop()
|
|
|
|
# Update statistics
|
|
if self.start_time:
|
|
self.stats['total_uptime_seconds'] = time.time() - self.start_time
|
|
|
|
self.stats['collectors_running'] = 0
|
|
|
|
self.logger.info("✅ Data Collection Service stopped gracefully")
|
|
self.logger.info(f"📊 Total uptime: {self.stats['total_uptime_seconds']:.1f} seconds")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"❌ Error during service shutdown: {e}")
|
|
self.stats['errors_count'] += 1
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get current service status."""
|
|
current_time = time.time()
|
|
uptime = current_time - self.start_time if self.start_time else 0
|
|
|
|
return {
|
|
'running': self.running,
|
|
'uptime_seconds': uptime,
|
|
'uptime_hours': uptime / 3600,
|
|
'collectors_total': len(self.collectors),
|
|
'collectors_running': self.stats['collectors_running'],
|
|
'errors_count': self.stats['errors_count'],
|
|
'last_activity': self.stats['last_activity'],
|
|
'start_time': datetime.fromtimestamp(self.start_time) if self.start_time else None
|
|
}
|
|
|
|
def setup_signal_handlers(self) -> None:
|
|
"""Setup signal handlers for graceful shutdown."""
|
|
def signal_handler(signum, frame):
|
|
self.logger.info(f"📡 Received shutdown signal ({signum}), stopping gracefully...")
|
|
self.shutdown_event.set()
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
async def run(self, duration_hours: Optional[float] = None) -> bool:
|
|
"""
|
|
Run the data collection service.
|
|
|
|
Args:
|
|
duration_hours: Optional duration to run (None = indefinite)
|
|
|
|
Returns:
|
|
bool: True if successful, False if error occurred
|
|
"""
|
|
self.setup_signal_handlers()
|
|
|
|
try:
|
|
# Initialize collectors
|
|
if not await self.initialize_collectors():
|
|
return False
|
|
|
|
# Start service
|
|
if not await self.start():
|
|
return False
|
|
|
|
# Service running notification
|
|
status = self.get_status()
|
|
if duration_hours:
|
|
self.logger.info(f"⏱️ Service will run for {duration_hours} hours")
|
|
else:
|
|
self.logger.info("⏱️ Service running indefinitely (until stopped)")
|
|
|
|
self.logger.info(f"📊 Active collectors: {status['collectors_running']}")
|
|
self.logger.info("🔍 Monitor with: python scripts/monitor_clean.py")
|
|
|
|
# Main service loop
|
|
update_interval = 600 # Status update every 10 minutes
|
|
last_update = time.time()
|
|
|
|
while not self.shutdown_event.is_set():
|
|
# Wait for shutdown signal or timeout
|
|
try:
|
|
await asyncio.wait_for(self.shutdown_event.wait(), timeout=1.0)
|
|
break
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
|
|
current_time = time.time()
|
|
|
|
# Check duration limit
|
|
if duration_hours:
|
|
elapsed_hours = (current_time - self.start_time) / 3600
|
|
if elapsed_hours >= duration_hours:
|
|
self.logger.info(f"⏰ Completed {duration_hours} hour run")
|
|
break
|
|
|
|
# Periodic status update
|
|
if current_time - last_update >= update_interval:
|
|
elapsed_hours = (current_time - self.start_time) / 3600
|
|
self.logger.info(f"⏱️ Service uptime: {elapsed_hours:.1f} hours")
|
|
last_update = current_time
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"❌ Service error: {e}")
|
|
self.stats['errors_count'] += 1
|
|
return False
|
|
|
|
finally:
|
|
await self.stop()
|
|
|
|
|
|
# Service entry point function
|
|
async def run_data_collection_service(
|
|
config_path: str = "config/data_collection.json",
|
|
duration_hours: Optional[float] = None
|
|
) -> bool:
|
|
"""
|
|
Run the data collection service.
|
|
|
|
Args:
|
|
config_path: Path to configuration file
|
|
duration_hours: Optional duration in hours (None = indefinite)
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
service = DataCollectionService(config_path)
|
|
return await service.run(duration_hours)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Simple CLI when run directly
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Data Collection Service")
|
|
parser.add_argument('--config', default="config/data_collection.json",
|
|
help='Configuration file path')
|
|
parser.add_argument('--hours', type=float,
|
|
help='Run duration in hours (default: indefinite)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
success = asyncio.run(run_data_collection_service(args.config, args.hours))
|
|
sys.exit(0 if success else 1)
|
|
except KeyboardInterrupt:
|
|
print("\n👋 Service interrupted by user")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"❌ Fatal error: {e}")
|
|
sys.exit(1) |