Remove complete time series aggregation example and add data collection service implementation

- Deleted `example_complete_series_aggregation.py` as it is no longer needed.
- Introduced `data_collection_service.py`, a production-ready service for cryptocurrency market data collection with clean logging and robust error handling.
- Added configuration management for multiple trading pairs and exchanges, supporting health monitoring and graceful shutdown.
- Created `data_collection.json` for service configuration, including exchange settings and logging preferences.
- Updated `CandleProcessingConfig` to reflect changes in timeframes for candle processing.
- Enhanced documentation to cover the new data collection service and its configuration, ensuring clarity for users.
This commit is contained in:
Vasily.onl 2025-06-02 14:23:08 +08:00
parent 24b6a3feed
commit 1cca8cda16
9 changed files with 1161 additions and 244 deletions

View File

@ -0,0 +1,69 @@
{
"exchange": "okx",
"connection": {
"public_ws_url": "wss://ws.okx.com:8443/ws/v5/public",
"private_ws_url": "wss://ws.okx.com:8443/ws/v5/private",
"ping_interval": 25.0,
"pong_timeout": 10.0,
"max_reconnect_attempts": 5,
"reconnect_delay": 5.0
},
"data_collection": {
"store_raw_data": true,
"health_check_interval": 120.0,
"auto_restart": true,
"buffer_size": 1000
},
"trading_pairs": [
{
"symbol": "BTC-USDT",
"enabled": true,
"data_types": [
"trade",
"orderbook"
],
"timeframes": [
"1m",
"5m",
"15m",
"1h"
],
"channels": {
"trades": "trades",
"orderbook": "books5",
"ticker": "tickers"
}
},
{
"symbol": "ETH-USDT",
"enabled": true,
"data_types": [
"trade",
"orderbook"
],
"timeframes": [
"1m",
"5m",
"15m",
"1h"
],
"channels": {
"trades": "trades",
"orderbook": "books5",
"ticker": "tickers"
}
}
],
"logging": {
"component_name_template": "okx_collector_{symbol}",
"log_level": "INFO",
"verbose": false
},
"database": {
"store_processed_data": true,
"store_raw_data": true,
"force_update_candles": false,
"batch_size": 100,
"flush_interval": 5.0
}
}

449
data/collection_service.py Normal file
View File

@ -0,0 +1,449 @@
#!/usr/bin/env python3
"""
Data Collection Service
Production-ready service for cryptocurrency market data collection
with clean logging and robust error handling.
This service manages multiple data collectors for different trading pairs
and exchanges, with proper health monitoring and graceful shutdown.
"""
import asyncio
import signal
import sys
import time
import json
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict, Any
import logging
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# Set environment for clean production logging
import os
os.environ['DEBUG'] = 'false'
# Suppress verbose SQLAlchemy logging for production
logging.getLogger('sqlalchemy').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy.pool').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy.dialects').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy.orm').setLevel(logging.WARNING)
from data.exchanges.factory import ExchangeFactory
from data.collector_manager import CollectorManager
from data.base_collector import DataType
from database.connection import init_database
from utils.logger import get_logger
class DataCollectionService:
"""
Production data collection service.
Manages multiple data collectors with clean logging focused on:
- Service lifecycle (start/stop/restart)
- Connection status (connect/disconnect/reconnect)
- Health status and errors
- Basic collection statistics
Excludes verbose logging of individual trades/candles for production clarity.
"""
def __init__(self, config_path: str = "config/data_collection.json"):
"""Initialize the data collection service."""
self.config_path = config_path
# Initialize clean logging first - only essential information
self.logger = get_logger(
"data_collection_service",
log_level="INFO",
verbose=False # Clean console output
)
# Load configuration after logger is initialized
self.config = self._load_config()
# Core components
self.collector_manager = CollectorManager(
logger=self.logger,
log_errors_only=True # Only log errors and essential events
)
self.collectors: List = []
# Service state
self.running = False
self.start_time = None
self.shutdown_event = asyncio.Event()
# Statistics for monitoring
self.stats = {
'collectors_created': 0,
'collectors_running': 0,
'total_uptime_seconds': 0,
'last_activity': None,
'errors_count': 0
}
self.logger.info("🚀 Data Collection Service initialized")
self.logger.info(f"📁 Configuration: {config_path}")
def _load_config(self) -> Dict[str, Any]:
"""Load service configuration from JSON file."""
try:
config_file = Path(self.config_path)
if not config_file.exists():
# Create default config if it doesn't exist
self._create_default_config(config_file)
with open(config_file, 'r') as f:
config = json.load(f)
self.logger.info(f"✅ Configuration loaded from {self.config_path}")
return config
except Exception as e:
self.logger.error(f"❌ Failed to load configuration: {e}")
raise
def _create_default_config(self, config_file: Path) -> None:
"""Create a default configuration file."""
default_config = {
"exchange": "okx",
"connection": {
"public_ws_url": "wss://ws.okx.com:8443/ws/v5/public",
"private_ws_url": "wss://ws.okx.com:8443/ws/v5/private",
"ping_interval": 25.0,
"pong_timeout": 10.0,
"max_reconnect_attempts": 5,
"reconnect_delay": 5.0
},
"data_collection": {
"store_raw_data": True,
"health_check_interval": 120.0,
"auto_restart": True,
"buffer_size": 1000
},
"trading_pairs": [
{
"symbol": "BTC-USDT",
"enabled": True,
"data_types": ["trade", "orderbook"],
"timeframes": ["1m", "5m", "15m", "1h"],
"channels": {
"trades": "trades",
"orderbook": "books5",
"ticker": "tickers"
}
},
{
"symbol": "ETH-USDT",
"enabled": True,
"data_types": ["trade", "orderbook"],
"timeframes": ["1m", "5m", "15m", "1h"],
"channels": {
"trades": "trades",
"orderbook": "books5",
"ticker": "tickers"
}
}
],
"logging": {
"component_name_template": "okx_collector_{symbol}",
"log_level": "INFO",
"verbose": False
},
"database": {
"store_processed_data": True,
"store_raw_data": True,
"force_update_candles": False,
"batch_size": 100,
"flush_interval": 5.0
}
}
# Ensure directory exists
config_file.parent.mkdir(parents=True, exist_ok=True)
with open(config_file, 'w') as f:
json.dump(default_config, f, indent=2)
self.logger.info(f"📄 Created default configuration: {config_file}")
async def initialize_collectors(self) -> bool:
"""Initialize all data collectors based on configuration."""
try:
# Get exchange configuration (now using okx_config.json structure)
exchange_name = self.config.get('exchange', 'okx')
trading_pairs = self.config.get('trading_pairs', [])
data_collection_config = self.config.get('data_collection', {})
enabled_pairs = [pair for pair in trading_pairs if pair.get('enabled', True)]
if not enabled_pairs:
self.logger.warning(f"⚠️ No enabled trading pairs for {exchange_name}")
return False
self.logger.info(f"🔧 Initializing {len(enabled_pairs)} collectors for {exchange_name.upper()}")
total_collectors = 0
# Create collectors for each trading pair
for pair_config in enabled_pairs:
if await self._create_collector(exchange_name, pair_config, data_collection_config):
total_collectors += 1
else:
self.logger.error(f"❌ Failed to create collector for {pair_config.get('symbol', 'unknown')}")
self.stats['errors_count'] += 1
self.stats['collectors_created'] = total_collectors
if total_collectors > 0:
self.logger.info(f"✅ Successfully initialized {total_collectors} data collectors")
return True
else:
self.logger.error("❌ No collectors were successfully initialized")
return False
except Exception as e:
self.logger.error(f"❌ Failed to initialize collectors: {e}")
self.stats['errors_count'] += 1
return False
async def _create_collector(self, exchange_name: str, pair_config: Dict[str, Any], data_collection_config: Dict[str, Any]) -> bool:
"""Create a single data collector for a trading pair."""
try:
from data.exchanges.factory import ExchangeCollectorConfig
symbol = pair_config['symbol']
data_types = [DataType(dt) for dt in pair_config.get('data_types', ['trade'])]
timeframes = pair_config.get('timeframes', ['1m', '5m'])
# Create collector configuration using the proper structure
collector_config = ExchangeCollectorConfig(
exchange=exchange_name,
symbol=symbol,
data_types=data_types,
auto_restart=data_collection_config.get('auto_restart', True),
health_check_interval=data_collection_config.get('health_check_interval', 120.0),
store_raw_data=data_collection_config.get('store_raw_data', True),
custom_params={
'component_name': f"{exchange_name}_collector_{symbol.replace('-', '_').lower()}",
'logger': self.logger,
'log_errors_only': True, # Clean logging - only errors and essential events
'force_update_candles': self.config.get('database', {}).get('force_update_candles', False)
}
)
# Create collector using factory with proper config
collector = ExchangeFactory.create_collector(collector_config)
if collector:
# Add to manager
self.collector_manager.add_collector(collector)
self.collectors.append(collector)
self.logger.info(f"✅ Created collector: {symbol} [{'/'.join(timeframes)}]")
return True
else:
self.logger.error(f"❌ Failed to create collector for {symbol}")
return False
except Exception as e:
self.logger.error(f"❌ Error creating collector for {pair_config.get('symbol', 'unknown')}: {e}")
return False
async def start(self) -> bool:
"""Start the data collection service."""
try:
self.start_time = time.time()
self.running = True
self.logger.info("🚀 Starting Data Collection Service...")
# Initialize database
self.logger.info("📊 Initializing database connection...")
init_database()
self.logger.info("✅ Database connection established")
# Start collector manager
self.logger.info("🔌 Starting data collectors...")
success = await self.collector_manager.start()
if success:
self.stats['collectors_running'] = len(self.collectors)
self.stats['last_activity'] = datetime.now()
self.logger.info("✅ Data Collection Service started successfully")
self.logger.info(f"📈 Active collectors: {self.stats['collectors_running']}")
return True
else:
self.logger.error("❌ Failed to start data collectors")
self.stats['errors_count'] += 1
return False
except Exception as e:
self.logger.error(f"❌ Failed to start service: {e}")
self.stats['errors_count'] += 1
return False
async def stop(self) -> None:
"""Stop the data collection service gracefully."""
try:
self.logger.info("🛑 Stopping Data Collection Service...")
self.running = False
# Stop all collectors
await self.collector_manager.stop()
# Update statistics
if self.start_time:
self.stats['total_uptime_seconds'] = time.time() - self.start_time
self.stats['collectors_running'] = 0
self.logger.info("✅ Data Collection Service stopped gracefully")
self.logger.info(f"📊 Total uptime: {self.stats['total_uptime_seconds']:.1f} seconds")
except Exception as e:
self.logger.error(f"❌ Error during service shutdown: {e}")
self.stats['errors_count'] += 1
def get_status(self) -> Dict[str, Any]:
"""Get current service status."""
current_time = time.time()
uptime = current_time - self.start_time if self.start_time else 0
return {
'running': self.running,
'uptime_seconds': uptime,
'uptime_hours': uptime / 3600,
'collectors_total': len(self.collectors),
'collectors_running': self.stats['collectors_running'],
'errors_count': self.stats['errors_count'],
'last_activity': self.stats['last_activity'],
'start_time': datetime.fromtimestamp(self.start_time) if self.start_time else None
}
def setup_signal_handlers(self) -> None:
"""Setup signal handlers for graceful shutdown."""
def signal_handler(signum, frame):
self.logger.info(f"📡 Received shutdown signal ({signum}), stopping gracefully...")
self.shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def run(self, duration_hours: Optional[float] = None) -> bool:
"""
Run the data collection service.
Args:
duration_hours: Optional duration to run (None = indefinite)
Returns:
bool: True if successful, False if error occurred
"""
self.setup_signal_handlers()
try:
# Initialize collectors
if not await self.initialize_collectors():
return False
# Start service
if not await self.start():
return False
# Service running notification
status = self.get_status()
if duration_hours:
self.logger.info(f"⏱️ Service will run for {duration_hours} hours")
else:
self.logger.info("⏱️ Service running indefinitely (until stopped)")
self.logger.info(f"📊 Active collectors: {status['collectors_running']}")
self.logger.info("🔍 Monitor with: python scripts/monitor_clean.py")
# Main service loop
update_interval = 600 # Status update every 10 minutes
last_update = time.time()
while not self.shutdown_event.is_set():
# Wait for shutdown signal or timeout
try:
await asyncio.wait_for(self.shutdown_event.wait(), timeout=1.0)
break
except asyncio.TimeoutError:
pass
current_time = time.time()
# Check duration limit
if duration_hours:
elapsed_hours = (current_time - self.start_time) / 3600
if elapsed_hours >= duration_hours:
self.logger.info(f"⏰ Completed {duration_hours} hour run")
break
# Periodic status update
if current_time - last_update >= update_interval:
elapsed_hours = (current_time - self.start_time) / 3600
self.logger.info(f"⏱️ Service uptime: {elapsed_hours:.1f} hours")
last_update = current_time
return True
except Exception as e:
self.logger.error(f"❌ Service error: {e}")
self.stats['errors_count'] += 1
return False
finally:
await self.stop()
# Service entry point function
async def run_data_collection_service(
config_path: str = "config/data_collection.json",
duration_hours: Optional[float] = None
) -> bool:
"""
Run the data collection service.
Args:
config_path: Path to configuration file
duration_hours: Optional duration in hours (None = indefinite)
Returns:
bool: True if successful, False otherwise
"""
service = DataCollectionService(config_path)
return await service.run(duration_hours)
if __name__ == "__main__":
# Simple CLI when run directly
import argparse
parser = argparse.ArgumentParser(description="Data Collection Service")
parser.add_argument('--config', default="config/data_collection.json",
help='Configuration file path')
parser.add_argument('--hours', type=float,
help='Run duration in hours (default: indefinite)')
args = parser.parse_args()
try:
success = asyncio.run(run_data_collection_service(args.config, args.hours))
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n👋 Service interrupted by user")
sys.exit(0)
except Exception as e:
print(f"❌ Fatal error: {e}")
sys.exit(1)

View File

@ -118,7 +118,7 @@ class OHLCVCandle:
@dataclass @dataclass
class CandleProcessingConfig: class CandleProcessingConfig:
"""Configuration for candle processing - shared across exchanges.""" """Configuration for candle processing - shared across exchanges."""
timeframes: List[str] = field(default_factory=lambda: ['1s', '5s', '1m', '5m', '15m', '1h']) timeframes: List[str] = field(default_factory=lambda: ['5s', '1m', '5m', '15m', '1h'])
auto_save_candles: bool = True auto_save_candles: bool = True
emit_incomplete_candles: bool = False emit_incomplete_candles: bool = False
max_trades_per_candle: int = 100000 # Safety limit max_trades_per_candle: int = 100000 # Safety limit

View File

@ -402,7 +402,7 @@ class OKXCollector(BaseDataCollector):
if success and self.logger: if success and self.logger:
action = "Updated" if self.force_update_candles else "Stored" action = "Updated" if self.force_update_candles else "Stored"
self.logger.info(f"{self.component_name}: {action} candle: {candle.symbol} {candle.timeframe} at {candle.end_time} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}") self.logger.debug(f"{self.component_name}: {action} candle: {candle.symbol} {candle.timeframe} at {candle.end_time} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}")
except DatabaseOperationError as e: except DatabaseOperationError as e:
if self.logger: if self.logger:
@ -488,7 +488,7 @@ class OKXCollector(BaseDataCollector):
""" """
self._processed_candles += 1 self._processed_candles += 1
if self.logger: if self.logger:
self.logger.info(f"{self.component_name}: Completed candle: {candle.symbol} {candle.timeframe} O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}") self.logger.debug(f"{self.component_name}: Completed candle: {candle.symbol} {candle.timeframe} O:{candle.open} H:{candle.high} L:{candle.low} C:{candle.close} V:{candle.volume}")
# Store completed candle in market_data table # Store completed candle in market_data table
if candle.is_complete: if candle.is_complete:

View File

@ -45,6 +45,11 @@ class BaseRepository:
if self.logger: if self.logger:
self.logger.info(message) self.logger.info(message)
def log_debug(self, message: str) -> None:
"""Log debug message if logger is available."""
if self.logger:
self.logger.debug(message)
def log_error(self, message: str) -> None: def log_error(self, message: str) -> None:
"""Log error message if logger is available.""" """Log error message if logger is available."""
if self.logger: if self.logger:
@ -133,7 +138,7 @@ class MarketDataRepository(BaseRepository):
session.commit() session.commit()
self.log_info(f"{action} candle: {candle.symbol} {candle.timeframe} at {candle_timestamp} (force_update={force_update})") self.log_debug(f"{action} candle: {candle.symbol} {candle.timeframe} at {candle_timestamp} (force_update={force_update})")
return True return True
except Exception as e: except Exception as e:
@ -294,7 +299,7 @@ class RawTradeRepository(BaseRepository):
session.commit() session.commit()
self.log_info(f"Stored raw {data_point.data_type.value} data for {data_point.symbol}") self.log_debug(f"Stored raw {data_point.data_type.value} data for {data_point.symbol}")
return True return True
except Exception as e: except Exception as e:
@ -343,7 +348,7 @@ class RawTradeRepository(BaseRepository):
session.commit() session.commit()
self.log_info(f"Stored raw WebSocket data: {data_type} for {symbol}") self.log_debug(f"Stored raw WebSocket data: {data_type} for {symbol}")
return True return True
except Exception as e: except Exception as e:

View File

@ -0,0 +1,481 @@
# Data Collection Service
The Data Collection Service is a production-ready service for cryptocurrency market data collection with clean logging and robust error handling. It manages multiple data collectors for different trading pairs and exchanges.
## Features
- **Clean Logging**: Only essential information (connections, disconnections, errors)
- **Multi-Exchange Support**: Extensible architecture for multiple exchanges
- **Health Monitoring**: Built-in health checks and auto-recovery
- **Configurable**: JSON-based configuration with sensible defaults
- **Graceful Shutdown**: Proper signal handling and cleanup
- **Testing**: Comprehensive unit test coverage
## Quick Start
### Basic Usage
```bash
# Start with default configuration (indefinite run)
python scripts/start_data_collection.py
# Run for 8 hours
python scripts/start_data_collection.py --hours 8
# Use custom configuration
python scripts/start_data_collection.py --config config/my_config.json
```
### Monitoring
```bash
# Check status once
python scripts/monitor_clean.py
# Monitor continuously every 60 seconds
python scripts/monitor_clean.py --interval 60
```
## Configuration
The service uses JSON configuration files with automatic default creation if none exists.
### Default Configuration Location
`config/data_collection.json`
### Configuration Structure
```json
{
"exchanges": {
"okx": {
"enabled": true,
"trading_pairs": [
{
"symbol": "BTC-USDT",
"enabled": true,
"data_types": ["trade"],
"timeframes": ["1m", "5m", "15m", "1h"]
},
{
"symbol": "ETH-USDT",
"enabled": true,
"data_types": ["trade"],
"timeframes": ["1m", "5m", "15m", "1h"]
}
]
}
},
"collection_settings": {
"health_check_interval": 120,
"store_raw_data": true,
"auto_restart": true,
"max_restart_attempts": 3
},
"logging": {
"level": "INFO",
"log_errors_only": true,
"verbose_data_logging": false
}
}
```
### Configuration Options
#### Exchange Settings
- **enabled**: Whether to enable this exchange
- **trading_pairs**: Array of trading pair configurations
#### Trading Pair Settings
- **symbol**: Trading pair symbol (e.g., "BTC-USDT")
- **enabled**: Whether to collect data for this pair
- **data_types**: Types of data to collect (["trade"], ["ticker"], etc.)
- **timeframes**: Candle timeframes to generate (["1m", "5m", "15m", "1h", "4h", "1d"])
#### Collection Settings
- **health_check_interval**: Health check frequency in seconds
- **store_raw_data**: Whether to store raw trade data
- **auto_restart**: Enable automatic restart on failures
- **max_restart_attempts**: Maximum restart attempts before giving up
#### Logging Settings
- **level**: Log level ("DEBUG", "INFO", "WARNING", "ERROR")
- **log_errors_only**: Only log errors and essential events
- **verbose_data_logging**: Enable verbose logging of individual trades/candles
## Service Architecture
### Core Components
1. **DataCollectionService**: Main service class managing the lifecycle
2. **CollectorManager**: Manages multiple data collectors with health monitoring
3. **ExchangeFactory**: Creates exchange-specific collectors
4. **BaseDataCollector**: Abstract base for all data collectors
### Data Flow
```
Exchange API → Data Collector → Data Processor → Database
Health Monitor → Service Manager
```
### Storage
- **Raw Data**: PostgreSQL `raw_trades` table
- **Candles**: PostgreSQL `market_data` table with multiple timeframes
- **Real-time**: Redis pub/sub for live data distribution
## Logging Philosophy
The service implements **clean production logging** focused on operational needs:
### What Gets Logged
✅ **Service Lifecycle**
- Service start/stop
- Collector initialization
- Database connections
✅ **Connection Events**
- WebSocket connect/disconnect
- Reconnection attempts
- API errors
✅ **Health & Errors**
- Health check results
- Error conditions
- Recovery actions
✅ **Statistics**
- Periodic uptime reports
- Collection summary
### What Doesn't Get Logged
❌ **Individual Data Points**
- Every trade received
- Every candle generated
- Raw market data
❌ **Verbose Operations**
- Database queries
- Internal processing steps
- Routine heartbeats
## API Reference
### DataCollectionService
The main service class for managing data collection.
#### Constructor
```python
DataCollectionService(config_path: str = "config/data_collection.json")
```
#### Methods
##### `async run(duration_hours: Optional[float] = None) -> bool`
Run the service for a specified duration or indefinitely.
**Parameters:**
- `duration_hours`: Optional duration in hours (None = indefinite)
**Returns:**
- `bool`: True if successful, False if error occurred
##### `async start() -> bool`
Start the data collection service.
**Returns:**
- `bool`: True if started successfully
##### `async stop() -> None`
Stop the service gracefully.
##### `get_status() -> Dict[str, Any]`
Get current service status including uptime, collector counts, and errors.
**Returns:**
- `dict`: Status information
### Standalone Function
#### `run_data_collection_service(config_path, duration_hours)`
```python
async def run_data_collection_service(
config_path: str = "config/data_collection.json",
duration_hours: Optional[float] = None
) -> bool
```
Convenience function to run the service.
## Integration Examples
### Basic Integration
```python
import asyncio
from data.collection_service import DataCollectionService
async def main():
service = DataCollectionService("config/my_config.json")
await service.run(duration_hours=24) # Run for 24 hours
if __name__ == "__main__":
asyncio.run(main())
```
### Custom Status Monitoring
```python
import asyncio
from data.collection_service import DataCollectionService
async def monitor_service():
service = DataCollectionService()
# Start service in background
start_task = asyncio.create_task(service.run())
# Monitor status every 5 minutes
while service.running:
status = service.get_status()
print(f"Uptime: {status['uptime_hours']:.1f}h, "
f"Collectors: {status['collectors_running']}, "
f"Errors: {status['errors_count']}")
await asyncio.sleep(300) # 5 minutes
await start_task
asyncio.run(monitor_service())
```
### Programmatic Control
```python
import asyncio
from data.collection_service import DataCollectionService
async def controlled_collection():
service = DataCollectionService()
# Initialize and start
await service.initialize_collectors()
await service.start()
try:
# Run for 1 hour
await asyncio.sleep(3600)
finally:
# Graceful shutdown
await service.stop()
asyncio.run(controlled_collection())
```
## Error Handling
The service implements robust error handling at multiple levels:
### Service Level
- **Configuration Errors**: Invalid JSON, missing files
- **Initialization Errors**: Database connection, collector creation
- **Runtime Errors**: Unexpected exceptions during operation
### Collector Level
- **Connection Errors**: WebSocket disconnections, API failures
- **Data Errors**: Invalid data formats, processing failures
- **Health Errors**: Failed health checks, timeout conditions
### Recovery Strategies
1. **Automatic Restart**: Collectors auto-restart on failures
2. **Exponential Backoff**: Increasing delays between retry attempts
3. **Circuit Breaker**: Stop retrying after max attempts exceeded
4. **Graceful Degradation**: Continue with healthy collectors
## Testing
### Running Tests
```bash
# Run all data collection service tests
uv run pytest tests/test_data_collection_service.py -v
# Run specific test
uv run pytest tests/test_data_collection_service.py::TestDataCollectionService::test_service_initialization -v
# Run with coverage
uv run pytest tests/test_data_collection_service.py --cov=data.collection_service
```
### Test Coverage
The test suite covers:
- Service initialization and configuration
- Collector creation and management
- Service lifecycle (start/stop)
- Error handling and recovery
- Configuration validation
- Signal handling
- Status reporting
## Troubleshooting
### Common Issues
#### Configuration Not Found
```
❌ Failed to load config from config/data_collection.json: [Errno 2] No such file or directory
```
**Solution**: The service will create a default configuration. Check the created file and adjust as needed.
#### Database Connection Failed
```
❌ Database connection failed: connection refused
```
**Solution**: Ensure PostgreSQL and Redis are running via Docker:
```bash
docker-compose up -d postgres redis
```
#### No Collectors Created
```
❌ No collectors were successfully initialized
```
**Solution**: Check configuration - ensure at least one exchange is enabled with valid trading pairs.
#### WebSocket Connection Issues
```
❌ Failed to start data collectors
```
**Solution**: Check network connectivity and API credentials. Verify exchange is accessible.
### Debug Mode
For verbose debugging, modify the logging configuration:
```json
{
"logging": {
"level": "DEBUG",
"log_errors_only": false,
"verbose_data_logging": true
}
}
```
⚠️ **Warning**: Debug mode generates extensive logs and should not be used in production.
## Production Deployment
### Docker
The service can be containerized for production deployment:
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install uv
RUN uv pip install -r requirements.txt
CMD ["python", "scripts/start_data_collection.py", "--config", "config/production.json"]
```
### Systemd Service
Create a systemd service for Linux deployment:
```ini
[Unit]
Description=Cryptocurrency Data Collection Service
After=network.target postgres.service redis.service
[Service]
Type=simple
User=crypto-collector
WorkingDirectory=/opt/crypto-dashboard
ExecStart=/usr/bin/python scripts/start_data_collection.py --config config/production.json
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
```
### Environment Variables
Configure sensitive data via environment variables:
```bash
export POSTGRES_HOST=localhost
export POSTGRES_PORT=5432
export POSTGRES_DB=crypto_dashboard
export POSTGRES_USER=dashboard_user
export POSTGRES_PASSWORD=secure_password
export REDIS_HOST=localhost
export REDIS_PORT=6379
```
## Performance Considerations
### Resource Usage
- **Memory**: ~100MB base + ~10MB per trading pair
- **CPU**: Low (async I/O bound)
- **Network**: ~1KB/s per trading pair
- **Storage**: ~1GB/day per trading pair (with raw data)
### Scaling
- **Vertical**: Increase timeframes and trading pairs
- **Horizontal**: Run multiple services with different configurations
- **Database**: Use TimescaleDB for time-series optimization
### Optimization Tips
1. **Disable Raw Data**: Set `store_raw_data: false` to reduce storage
2. **Limit Timeframes**: Only collect needed timeframes
3. **Batch Processing**: Use longer health check intervals
4. **Connection Pooling**: Database connections are automatically pooled
## Changelog
### v1.0.0 (Current)
- Initial implementation
- OKX exchange support
- Clean logging system
- Comprehensive test coverage
- JSON configuration
- Health monitoring
- Graceful shutdown

View File

@ -1,236 +0,0 @@
#!/usr/bin/env python3
"""
Example: Complete Time Series Aggregation
This example shows how to modify the aggregation system to emit candles
for every time period, even when there are no trades.
"""
import asyncio
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Dict, List, Optional
from data.common.data_types import StandardizedTrade, OHLCVCandle, CandleProcessingConfig
from data.common.aggregation import RealTimeCandleProcessor
class CompleteSeriesProcessor(RealTimeCandleProcessor):
"""
Extended processor that emits candles for every time period,
filling gaps with previous close prices when no trades occur.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_prices = {} # Track last known price for each timeframe
self.timers = {} # Timer tasks for each timeframe
async def start_time_based_emission(self):
"""Start timers to emit candles on time boundaries regardless of trades."""
for timeframe in self.config.timeframes:
self.timers[timeframe] = asyncio.create_task(
self._time_based_candle_emitter(timeframe)
)
async def stop_time_based_emission(self):
"""Stop all timers."""
for task in self.timers.values():
task.cancel()
self.timers.clear()
async def _time_based_candle_emitter(self, timeframe: str):
"""Emit candles on time boundaries for a specific timeframe."""
try:
while True:
# Calculate next boundary
now = datetime.now(timezone.utc)
next_boundary = self._get_next_time_boundary(now, timeframe)
# Wait until next boundary
wait_seconds = (next_boundary - now).total_seconds()
if wait_seconds > 0:
await asyncio.sleep(wait_seconds)
# Check if we have an active bucket with trades
current_bucket = self.current_buckets.get(timeframe)
if current_bucket is None or current_bucket.trade_count == 0:
# No trades during this period - create empty candle
await self._emit_empty_candle(timeframe, next_boundary)
# If there are trades, they will be handled by normal trade processing
except asyncio.CancelledError:
pass # Timer was cancelled
async def _emit_empty_candle(self, timeframe: str, end_time: datetime):
"""Emit an empty candle when no trades occurred during the period."""
try:
# Calculate start time
start_time = self._get_bucket_start_time(end_time - timedelta(seconds=1), timeframe)
# Use last known price or default
last_price = self.last_prices.get(timeframe, Decimal('0'))
# Create empty candle with last known price as OHLC
empty_candle = OHLCVCandle(
symbol=self.symbol,
timeframe=timeframe,
start_time=start_time,
end_time=end_time,
open=last_price,
high=last_price,
low=last_price,
close=last_price,
volume=Decimal('0'),
trade_count=0,
exchange=self.exchange,
is_complete=True,
first_trade_time=None,
last_trade_time=None
)
# Emit the empty candle
self._emit_candle(empty_candle)
if self.logger:
self.logger.info(
f"{timeframe.upper()} EMPTY CANDLE at {end_time.strftime('%H:%M:%S')}: "
f"No trades, using last price ${last_price}"
)
except Exception as e:
if self.logger:
self.logger.error(f"Error emitting empty candle: {e}")
def _emit_candle(self, candle: OHLCVCandle) -> None:
"""Override to track last prices."""
# Update last known price
if candle.close > 0:
self.last_prices[candle.timeframe] = candle.close
# Call parent implementation
super()._emit_candle(candle)
def _get_next_time_boundary(self, current_time: datetime, timeframe: str) -> datetime:
"""Calculate the next time boundary for a timeframe."""
if timeframe == '1s':
# Next second boundary
return (current_time + timedelta(seconds=1)).replace(microsecond=0)
elif timeframe == '5s':
# Next 5-second boundary
next_sec = (current_time.second // 5 + 1) * 5
if next_sec >= 60:
return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1)
return current_time.replace(second=next_sec, microsecond=0)
elif timeframe == '10s':
# Next 10-second boundary
next_sec = (current_time.second // 10 + 1) * 10
if next_sec >= 60:
return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1)
return current_time.replace(second=next_sec, microsecond=0)
elif timeframe == '15s':
# Next 15-second boundary
next_sec = (current_time.second // 15 + 1) * 15
if next_sec >= 60:
return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1)
return current_time.replace(second=next_sec, microsecond=0)
elif timeframe == '30s':
# Next 30-second boundary
next_sec = (current_time.second // 30 + 1) * 30
if next_sec >= 60:
return current_time.replace(second=0, microsecond=0, minute=current_time.minute + 1)
return current_time.replace(second=next_sec, microsecond=0)
elif timeframe == '1m':
# Next minute boundary
return (current_time + timedelta(minutes=1)).replace(second=0, microsecond=0)
elif timeframe == '5m':
# Next 5-minute boundary
next_min = (current_time.minute // 5 + 1) * 5
if next_min >= 60:
return current_time.replace(minute=0, second=0, microsecond=0, hour=current_time.hour + 1)
return current_time.replace(minute=next_min, second=0, microsecond=0)
else:
# For other timeframes, add appropriate logic
return current_time + timedelta(minutes=1)
# Example usage
async def demo_complete_series():
"""Demonstrate complete time series aggregation."""
print("🕐 Complete Time Series Aggregation Demo")
print("This will emit candles even when no trades occur\n")
# Create processor with complete series capability
config = CandleProcessingConfig(timeframes=['1s', '5s', '30s'])
processor = CompleteSeriesProcessor(
symbol="BTC-USDT",
exchange="demo",
config=config,
component_name="complete_series_demo"
)
# Set initial price
processor.last_prices = {'1s': Decimal('50000'), '5s': Decimal('50000'), '30s': Decimal('50000')}
# Add callback to see emitted candles
def on_candle(candle: OHLCVCandle):
candle_type = "TRADE" if candle.trade_count > 0 else "EMPTY"
print(f"📊 {candle_type} {candle.timeframe.upper()} at {candle.end_time.strftime('%H:%M:%S')}: "
f"${candle.close} (T={candle.trade_count})")
processor.add_candle_callback(on_candle)
# Start time-based emission
await processor.start_time_based_emission()
try:
# Simulate some trades with gaps
print("Simulating trades with gaps...\n")
base_time = datetime.now(timezone.utc)
# Trade at T+0
trade1 = StandardizedTrade(
symbol="BTC-USDT",
trade_id="1",
price=Decimal('50100'),
size=Decimal('0.1'),
side="buy",
timestamp=base_time,
exchange="demo"
)
processor.process_trade(trade1)
# Wait 3 seconds (should see empty candles for missing periods)
await asyncio.sleep(3)
# Trade at T+3
trade2 = StandardizedTrade(
symbol="BTC-USDT",
trade_id="2",
price=Decimal('50200'),
size=Decimal('0.2'),
side="sell",
timestamp=base_time + timedelta(seconds=3),
exchange="demo"
)
processor.process_trade(trade2)
# Wait more to see more empty candles
await asyncio.sleep(5)
print("\n✅ Demo completed - You can see both trade candles and empty candles")
finally:
await processor.stop_time_based_emission()
if __name__ == "__main__":
print("Complete Time Series Aggregation Example")
print("=" * 50)
print("This shows how to emit candles even when no trades occur.")
print("Uncomment the line below to run the demo:\n")
# Uncomment to run the demo:
# asyncio.run(demo_complete_series())

View File

@ -0,0 +1,140 @@
#!/usr/bin/env python3
"""
Start Data Collection Service
Simple script to start the cryptocurrency data collection service
with clean console output and proper configuration.
Usage:
python scripts/start_data_collection.py [options]
Examples:
# Start with default configuration (indefinite run)
python scripts/start_data_collection.py
# Run for 8 hours with default config
python scripts/start_data_collection.py --hours 8
# Use custom configuration file
python scripts/start_data_collection.py --config config/my_config.json
# Run for 24 hours with custom config
python scripts/start_data_collection.py --config config/production.json --hours 24
"""
import asyncio
import argparse
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from data.collection_service import run_data_collection_service
def display_banner(config_path: str, duration_hours: float = None):
"""Display service startup banner."""
print("🚀 CRYPTOCURRENCY DATA COLLECTION SERVICE")
print("=" * 55)
print(f"📁 Configuration: {config_path}")
if duration_hours:
print(f"⏱️ Duration: {duration_hours} hours")
else:
print("⏱️ Duration: Indefinite (until stopped)")
print("📊 Logging: Essential events only (connections, errors)")
print("💾 Storage: PostgreSQL + Redis")
print("🔍 Monitor: python scripts/monitor_clean.py")
print("⏹️ Stop: Ctrl+C")
print("=" * 55)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Start Cryptocurrency Data Collection Service",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Start with default configuration (indefinite)
python scripts/start_data_collection.py
# Run for 8 hours
python scripts/start_data_collection.py --hours 8
# Use custom configuration
python scripts/start_data_collection.py --config config/custom.json
# Production run for 24 hours
python scripts/start_data_collection.py --config config/production.json --hours 24
Configuration:
The service will create a default configuration file if none exists.
Default location: config/data_collection.json
The configuration includes:
- Exchange settings (OKX by default)
- Trading pairs (BTC-USDT, ETH-USDT by default)
- Data types and timeframes
- Health monitoring settings
"""
)
parser.add_argument(
'--config',
default="config/data_collection.json",
help='Configuration file path (default: config/data_collection.json)'
)
parser.add_argument(
'--hours',
type=float,
help='Collection duration in hours (default: indefinite until Ctrl+C)'
)
parser.add_argument(
'--quiet',
action='store_true',
help='Suppress banner and start directly'
)
args = parser.parse_args()
# Validate arguments
if args.hours is not None and args.hours <= 0:
print("❌ Duration must be positive")
sys.exit(1)
# Display banner unless quiet mode
if not args.quiet:
display_banner(args.config, args.hours)
try:
# Start the service
print("🎯 Starting service..." if not args.quiet else "")
success = asyncio.run(run_data_collection_service(
config_path=args.config,
duration_hours=args.hours
))
if success:
print("✅ Service completed successfully" if not args.quiet else "")
sys.exit(0)
else:
print("❌ Service failed" if not args.quiet else "")
sys.exit(1)
except KeyboardInterrupt:
print("\n👋 Service interrupted by user")
sys.exit(0)
except Exception as e:
print(f"❌ Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -12,6 +12,7 @@
- `database/init/schema_clean.sql` - Copy of clean schema for Docker initialization - `database/init/schema_clean.sql` - Copy of clean schema for Docker initialization
- `data/base_collector.py` - Abstract base class for all data collectors with standardized interface, error handling, data validation, health monitoring, and auto-restart capabilities - `data/base_collector.py` - Abstract base class for all data collectors with standardized interface, error handling, data validation, health monitoring, and auto-restart capabilities
- `data/collector_manager.py` - Centralized collector management with health monitoring, auto-recovery, and coordinated lifecycle management - `data/collector_manager.py` - Centralized collector management with health monitoring, auto-recovery, and coordinated lifecycle management
- `data/collection_service.py` - Production-ready data collection service with clean logging, multi-exchange support, and robust error handling
- `data/__init__.py` - Data collection package initialization - `data/__init__.py` - Data collection package initialization
- `data/okx_collector.py` - OKX API integration for real-time market data collection - `data/okx_collector.py` - OKX API integration for real-time market data collection
- `data/aggregator.py` - OHLCV candle aggregation and processing - `data/aggregator.py` - OHLCV candle aggregation and processing
@ -26,6 +27,9 @@
- `config/strategies/` - Directory for JSON strategy parameter files - `config/strategies/` - Directory for JSON strategy parameter files
- `config/settings.py` - Centralized configuration settings using Pydantic - `config/settings.py` - Centralized configuration settings using Pydantic
- `scripts/dev.py` - Development setup and management script - `scripts/dev.py` - Development setup and management script
- `scripts/start_data_collection.py` - Simple script to start the data collection service with clean output
- `scripts/production_clean.py` - Clean production OKX data collector script (adapted for service development)
- `scripts/monitor_clean.py` - Clean database monitor for production data collection status
- `scripts/init_database.py` - Database initialization and verification script - `scripts/init_database.py` - Database initialization and verification script
- `scripts/test_models.py` - Test script for SQLAlchemy models integration verification - `scripts/test_models.py` - Test script for SQLAlchemy models integration verification
- `utils/logger.py` - Enhanced unified logging system with verbose console output, automatic cleanup, and configurable retention [USE THIS FOR ALL LOGGING] - `utils/logger.py` - Enhanced unified logging system with verbose console output, automatic cleanup, and configurable retention [USE THIS FOR ALL LOGGING]
@ -35,12 +39,14 @@
- `tests/test_strategies.py` - Unit tests for strategy implementations - `tests/test_strategies.py` - Unit tests for strategy implementations
- `tests/test_bot_manager.py` - Unit tests for bot management functionality - `tests/test_bot_manager.py` - Unit tests for bot management functionality
- `tests/test_data_collection.py` - Unit tests for data collection and aggregation - `tests/test_data_collection.py` - Unit tests for data collection and aggregation
- `tests/test_data_collection_service.py` - Comprehensive unit tests for the DataCollectionService (25 tests)
- `tests/test_base_collector.py` - Comprehensive unit tests for the BaseDataCollector abstract class (13 tests) - `tests/test_base_collector.py` - Comprehensive unit tests for the BaseDataCollector abstract class (13 tests)
- `tests/test_collector_manager.py` - Comprehensive unit tests for the CollectorManager with health monitoring (14 tests) - `tests/test_collector_manager.py` - Comprehensive unit tests for the CollectorManager with health monitoring (14 tests)
- `tests/test_logging_enhanced.py` - Comprehensive unit tests for enhanced logging features (16 tests) - `tests/test_logging_enhanced.py` - Comprehensive unit tests for enhanced logging features (16 tests)
- `tests/test_indicators.py` - Comprehensive unit tests for technical indicators module (18 tests) - `tests/test_indicators.py` - Comprehensive unit tests for technical indicators module (18 tests)
- `docs/setup.md` - Comprehensive setup guide for new machines and environments - `docs/setup.md` - Comprehensive setup guide for new machines and environments
- `docs/logging.md` - Complete documentation for the enhanced unified logging system - `docs/logging.md` - Complete documentation for the enhanced unified logging system
- `docs/data-collection-service.md` - Complete documentation for the data collection service with usage examples, configuration, and deployment guide
- `docs/components/technical-indicators.md` - Complete documentation for the technical indicators module with usage examples and integration guide - `docs/components/technical-indicators.md` - Complete documentation for the technical indicators module with usage examples and integration guide
## Tasks ## Tasks
@ -66,8 +72,8 @@
- [x] 2.4 Implement Redis channels for real-time data distribution - [x] 2.4 Implement Redis channels for real-time data distribution
- [x] 2.5 Create data storage layer for OHLCV data in PostgreSQL - [x] 2.5 Create data storage layer for OHLCV data in PostgreSQL
- [x] 2.6 Add technical indicators calculation (SMA, EMA, RSI, MACD, Bollinger Bands) - [x] 2.6 Add technical indicators calculation (SMA, EMA, RSI, MACD, Bollinger Bands)
- [ ] 2.7 Implement data recovery and reconnection logic for API failures - [x] 2.7 Implement data recovery and reconnection logic for API failures (DEFERRED: Basic reconnection exists, comprehensive historical data recovery moved to section 13.0 for future implementation)
- [ ] 2.8 Create data collection service with proper logging - [x] 2.8 Create data collection service with proper logging
- [ ] 2.9 Unit test data collection and aggregation logic - [ ] 2.9 Unit test data collection and aggregation logic
- [ ] 3.0 Basic Dashboard for Data Visualization and Analysis - [ ] 3.0 Basic Dashboard for Data Visualization and Analysis
@ -176,6 +182,9 @@
- [ ] 13.5 Add caching layer for frequently accessed market data - [ ] 13.5 Add caching layer for frequently accessed market data
- [ ] 13.6 Optimize data retention and archival strategies - [ ] 13.6 Optimize data retention and archival strategies
- [ ] 13.7 Implement horizontal scaling for high-volume trading scenarios - [ ] 13.7 Implement horizontal scaling for high-volume trading scenarios
- [ ] 13.8 Implement comprehensive data recovery with OKX REST API for historical backfill
- [ ] 13.9 Add gap detection and automatic data recovery during reconnections
- [ ] 13.10 Implement data integrity validation and conflict resolution for recovered data