- Introduced a comprehensive data collection framework, including `CollectorServiceConfig`, `BaseDataCollector`, and `CollectorManager`, enhancing modularity and maintainability. - Developed `CollectorFactory` for streamlined collector creation, promoting separation of concerns and improved configuration handling. - Enhanced `DataCollectionService` to utilize the new architecture, ensuring robust error handling and logging practices. - Added `TaskManager` for efficient management of asynchronous tasks, improving performance and resource management. - Implemented health monitoring and auto-recovery features in `CollectorManager`, ensuring reliable operation of data collectors. - Updated imports across the codebase to reflect the new structure, ensuring consistent access to components. These changes significantly improve the architecture and maintainability of the data collection service, aligning with project standards for modularity, performance, and error handling.
119 lines
3.6 KiB
Python
119 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Clean Production OKX Data Collector
|
|
|
|
Simplified production script using the new DataCollectionService architecture.
|
|
Provides clean console output with minimal logging for production environments.
|
|
"""
|
|
|
|
import asyncio
|
|
import signal
|
|
import sys
|
|
import time
|
|
import json
|
|
from typing import Optional
|
|
from pathlib import Path
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
# Set environment for clean production logging
|
|
import os
|
|
os.environ['DEBUG'] = 'false'
|
|
|
|
# Suppress verbose SQLAlchemy logging
|
|
import logging
|
|
logging.getLogger('sqlalchemy').setLevel(logging.CRITICAL)
|
|
logging.getLogger('sqlalchemy.engine').setLevel(logging.CRITICAL)
|
|
logging.getLogger('sqlalchemy.pool').setLevel(logging.CRITICAL)
|
|
logging.getLogger('sqlalchemy.dialects').setLevel(logging.CRITICAL)
|
|
logging.getLogger('sqlalchemy.orm').setLevel(logging.CRITICAL)
|
|
|
|
from data.collector.collection_service import run_data_collection_service
|
|
|
|
|
|
async def get_config_timeframes(config_path: str) -> str:
|
|
"""Get timeframes from configuration for display."""
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
config = json.load(f)
|
|
# Get unique timeframes from all enabled trading pairs
|
|
all_timeframes = set()
|
|
for pair in config.get('trading_pairs', []):
|
|
if pair.get('enabled', True):
|
|
all_timeframes.update(pair.get('timeframes', ['1m', '5m']))
|
|
return ', '.join(sorted(all_timeframes))
|
|
except:
|
|
return "configured timeframes"
|
|
|
|
|
|
async def run_clean_production(duration_hours: Optional[float] = None) -> bool:
|
|
"""Run production collector with clean output."""
|
|
|
|
# Configuration path - use the new service config format
|
|
config_path = "config/data_collection.json"
|
|
|
|
try:
|
|
# Get timeframes for display
|
|
timeframes_str = await get_config_timeframes(config_path)
|
|
|
|
# Header
|
|
print("OKX PRODUCTION DATA COLLECTOR")
|
|
print("="*50)
|
|
if duration_hours:
|
|
print(f"Duration: {duration_hours} hours")
|
|
else:
|
|
print(f"Duration: Indefinite (until stopped)")
|
|
print(f"Timeframes: {timeframes_str}")
|
|
print(f"Database: Raw trades + aggregated candles")
|
|
print(f"Logs: logs/ directory")
|
|
print("="*50)
|
|
|
|
# Start data collection using the new service
|
|
print("Starting data collection service...")
|
|
success = await run_data_collection_service(config_path, duration_hours)
|
|
|
|
if success:
|
|
print("Data collection completed successfully")
|
|
return True
|
|
else:
|
|
print("Data collection failed")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Clean Production OKX Data Collector")
|
|
parser.add_argument(
|
|
"--hours",
|
|
type=float,
|
|
help="Collection duration in hours (default: indefinite until stopped manually)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate arguments
|
|
if args.hours is not None and args.hours <= 0:
|
|
print("Duration must be positive")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
success = asyncio.run(run_clean_production(args.hours))
|
|
sys.exit(0 if success else 1)
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted by user")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"Fatal error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |