diff --git a/config/okx_config.json b/config/okx_config.json deleted file mode 100644 index 45b2cb6..0000000 --- a/config/okx_config.json +++ /dev/null @@ -1,69 +0,0 @@ -{ - "exchange": "okx", - "connection": { - "public_ws_url": "wss://ws.okx.com:8443/ws/v5/public", - "private_ws_url": "wss://ws.okx.com:8443/ws/v5/private", - "ping_interval": 25.0, - "pong_timeout": 10.0, - "max_reconnect_attempts": 5, - "reconnect_delay": 5.0 - }, - "data_collection": { - "store_raw_data": true, - "health_check_interval": 120.0, - "auto_restart": true, - "buffer_size": 1000 - }, - "factory": { - "use_factory_pattern": true, - "default_data_types": ["trade", "orderbook"], - "default_timeframes": ["1s", "5s", "1m", "5m", "15m", "1h"], - "batch_create": true - }, - "trading_pairs": [ - { - "symbol": "BTC-USDT", - "enabled": true, - "data_types": ["trade", "orderbook"], - "timeframes": ["1s", "5s", "1m", "5m", "15m", "1h"], - "channels": { - "trades": "trades", - "orderbook": "books5", - "ticker": "tickers" - } - }, - { - "symbol": "ETH-USDT", - "enabled": true, - "data_types": ["trade", "orderbook"], - "timeframes": ["1s", "5s", "1m", "5m", "15m", "1h"], - "channels": { - "trades": "trades", - "orderbook": "books5", - "ticker": "tickers" - } - } - ], - "logging": { - "component_name_template": "okx_collector_{symbol}", - "log_level": "INFO", - "verbose": false - }, - "database": { - "store_processed_data": true, - "store_raw_data": false, - "force_update_candles": false, - "batch_size": 100, - "flush_interval": 5.0 - }, - "rate_limiting": { - "max_subscriptions_per_connection": 100, - "max_messages_per_second": 1000 - }, - "monitoring": { - "enable_health_checks": true, - "health_check_interval": 30.0, - "alert_on_connection_loss": true, - "max_consecutive_errors": 5 - } -} \ No newline at end of file diff --git a/data/base_collector.py b/data/base_collector.py index 6263642..02a7ea7 100644 --- a/data/base_collector.py +++ b/data/base_collector.py @@ -115,9 +115,11 @@ class BaseDataCollector(ABC): exchange_name: str, symbols: List[str], data_types: Optional[List[DataType]] = None, + timeframes: Optional[List[str]] = None, component_name: Optional[str] = None, auto_restart: bool = True, health_check_interval: float = 30.0, + logger = None, log_errors_only: bool = False): """ @@ -127,6 +129,7 @@ class BaseDataCollector(ABC): exchange_name: Name of the exchange (e.g., 'okx', 'binance') symbols: List of trading symbols to collect data for data_types: Types of data to collect (default: [DataType.CANDLE]) + timeframes: List of timeframes to collect (e.g., ['1s', '1m', '5m']) component_name: Name for logging (default: based on exchange_name) auto_restart: Enable automatic restart on failures (default: True) health_check_interval: Seconds between health checks (default: 30.0) @@ -136,6 +139,7 @@ class BaseDataCollector(ABC): self.exchange_name = exchange_name.lower() self.symbols = set(symbols) self.data_types = data_types or [DataType.CANDLE] + self.timeframes = timeframes or ['1m', '5m'] # Default timeframes if none provided self.auto_restart = auto_restart self.health_check_interval = health_check_interval self.log_errors_only = log_errors_only @@ -187,6 +191,7 @@ class BaseDataCollector(ABC): self.component_name = component if not self.log_errors_only: self.logger.info(f"{self.component_name}: Initialized {self.exchange_name} data collector for symbols: {', '.join(symbols)}") + self.logger.info(f"{self.component_name}: Using timeframes: {', '.join(self.timeframes)}") else: self.component_name = component_name or f"{self.exchange_name}_collector" @@ -581,6 +586,7 @@ class BaseDataCollector(ABC): 'should_be_running': self._should_be_running, 'symbols': list(self.symbols), 'data_types': [dt.value for dt in self.data_types], + 'timeframes': self.timeframes, 'auto_restart': self.auto_restart, 'health': { 'time_since_heartbeat': time_since_heartbeat, @@ -637,7 +643,9 @@ class BaseDataCollector(ABC): 'last_heartbeat': self._last_heartbeat.isoformat() if self._last_heartbeat else None, 'last_data_received': self._last_data_received.isoformat() if self._last_data_received else None, 'should_be_running': self._should_be_running, - 'is_running': self._running + 'is_running': self._running, + 'timeframes': self.timeframes, + 'data_types': [dt.value for dt in self.data_types] } def add_symbol(self, symbol: str) -> None: diff --git a/data/collection_service.py b/data/collection_service.py index 62d774d..36ed575 100644 --- a/data/collection_service.py +++ b/data/collection_service.py @@ -228,6 +228,7 @@ class DataCollectionService: exchange=exchange_name, symbol=symbol, data_types=data_types, + timeframes=timeframes, # Pass timeframes to config auto_restart=data_collection_config.get('auto_restart', True), health_check_interval=data_collection_config.get('health_check_interval', 120.0), store_raw_data=data_collection_config.get('store_raw_data', True), @@ -235,7 +236,8 @@ class DataCollectionService: 'component_name': f"{exchange_name}_collector_{symbol.replace('-', '_').lower()}", 'logger': self.logger, 'log_errors_only': True, # Clean logging - only errors and essential events - 'force_update_candles': self.config.get('database', {}).get('force_update_candles', False) + 'force_update_candles': self.config.get('database', {}).get('force_update_candles', False), + 'timeframes': timeframes # Pass timeframes to collector } ) diff --git a/data/exchanges/factory.py b/data/exchanges/factory.py index 7e7b673..53b8f97 100644 --- a/data/exchanges/factory.py +++ b/data/exchanges/factory.py @@ -7,10 +7,11 @@ from different exchanges based on configuration. import importlib from typing import Dict, List, Optional, Any, Type, Tuple -from dataclasses import dataclass +from dataclasses import dataclass, field from utils.logger import get_logger from ..base_collector import BaseDataCollector, DataType +from ..common import CandleProcessingConfig from .registry import EXCHANGE_REGISTRY, get_supported_exchanges, get_exchange_info from .exceptions import ( ExchangeError, @@ -29,6 +30,7 @@ class ExchangeCollectorConfig: exchange: str symbol: str data_types: List[DataType] + timeframes: List[str] = field(default_factory=lambda: ['1m', '5m']) # Default timeframes auto_restart: bool = True health_check_interval: float = 30.0 store_raw_data: bool = True @@ -42,6 +44,8 @@ class ExchangeCollectorConfig: raise InvalidConfigurationError("Symbol cannot be empty") if not self.data_types: raise InvalidConfigurationError("At least one data type must be specified") + if not self.timeframes: + raise InvalidConfigurationError("At least one timeframe must be specified") logger.debug(f"Created collector config for {self.exchange} {self.symbol}") @@ -92,12 +96,23 @@ class ExchangeFactory: 'data_types': config.data_types, 'auto_restart': config.auto_restart, 'health_check_interval': config.health_check_interval, - 'store_raw_data': config.store_raw_data + 'store_raw_data': config.store_raw_data, + 'timeframes': config.timeframes # Pass timeframes to collector } # Add any custom parameters if config.custom_params: + # If custom_params contains a candle_config key, use it, otherwise create one + if 'candle_config' not in config.custom_params: + config.custom_params['candle_config'] = CandleProcessingConfig( + timeframes=config.timeframes + ) collector_args.update(config.custom_params) + else: + # Create default candle config if no custom params + collector_args['candle_config'] = CandleProcessingConfig( + timeframes=config.timeframes + ) # Create and return the collector instance logger.info(f"Successfully created collector for {exchange_name} {config.symbol}") diff --git a/data/exchanges/okx/collector.py b/data/exchanges/okx/collector.py index 3d87730..14df8d1 100644 --- a/data/exchanges/okx/collector.py +++ b/data/exchanges/okx/collector.py @@ -15,7 +15,9 @@ from ...base_collector import ( BaseDataCollector, DataType, CollectorStatus, MarketDataPoint, OHLCVData, DataValidationError, ConnectionError ) -from ...common import StandardizedTrade, OHLCVCandle +from ...common import ( + StandardizedTrade, OHLCVCandle, CandleProcessingConfig +) from .websocket import ( OKXWebSocketClient, OKXSubscription, OKXChannelType, ConnectionState, OKXWebSocketError @@ -53,6 +55,8 @@ class OKXCollector(BaseDataCollector): health_check_interval: float = 30.0, store_raw_data: bool = True, force_update_candles: bool = False, + timeframes: Optional[List[str]] = None, + candle_config: Optional[CandleProcessingConfig] = None, logger = None, log_errors_only: bool = False): """ @@ -66,6 +70,8 @@ class OKXCollector(BaseDataCollector): health_check_interval: Seconds between health checks store_raw_data: Whether to store raw data for debugging force_update_candles: If True, update existing candles; if False, keep existing candles unchanged + timeframes: List of timeframes to collect (e.g., ['1s', '5s', '1m']) + candle_config: Optional CandleProcessingConfig instance (will create one if not provided) logger: Logger instance for conditional logging (None for no logging) log_errors_only: If True and logger provided, only log error-level messages """ @@ -82,6 +88,7 @@ class OKXCollector(BaseDataCollector): exchange_name="okx", symbols=[symbol], data_types=data_types, + timeframes=timeframes, # Pass timeframes to base collector component_name=component_name, auto_restart=auto_restart, health_check_interval=health_check_interval, @@ -98,7 +105,12 @@ class OKXCollector(BaseDataCollector): self._ws_client: Optional[OKXWebSocketClient] = None # Data processor using new common framework - self._data_processor = OKXDataProcessor(symbol, component_name=f"{component_name}_processor", logger=logger) + self._data_processor = OKXDataProcessor( + symbol, + config=candle_config or CandleProcessingConfig(timeframes=self.timeframes), # Use provided config or create new one + component_name=f"{component_name}_processor", + logger=logger + ) # Add callbacks for processed data self._data_processor.add_trade_callback(self._on_trade_processed) @@ -122,6 +134,7 @@ class OKXCollector(BaseDataCollector): if logger: logger.info(f"{component_name}: Initialized OKX collector for {symbol} with data types: {[dt.value for dt in data_types]}") + logger.info(f"{component_name}: Using timeframes: {self.timeframes}") logger.info(f"{component_name}: Using common data processing framework") async def connect(self) -> bool: @@ -511,6 +524,7 @@ class OKXCollector(BaseDataCollector): "websocket_state": self._ws_client.connection_state.value if self._ws_client else "disconnected", "store_raw_data": self.store_raw_data, "force_update_candles": self.force_update_candles, + "timeframes": self.timeframes, "processing_stats": { "messages_received": self._message_count, "trades_processed": self._processed_trades, diff --git a/docs/modules/exchanges/README.md b/docs/modules/exchanges/README.md index c686a34..17ad243 100644 --- a/docs/modules/exchanges/README.md +++ b/docs/modules/exchanges/README.md @@ -19,10 +19,26 @@ This module provides a standardized interface for collecting real-time data from | Exchange | Status | Features | Documentation | |----------|---------|-----------|---------------| -| OKX | ✅ Production | Trades, Order Book, Ticker, Candles | [Guide](okx_collector.md) | +| OKX | ✅ Production | Trades, Order Book, Ticker, Configurable Timeframes (1s+) | [Guide](okx_collector.md) | | Binance | 🔄 Planned | TBD | - | | Coinbase | 🔄 Planned | TBD | - | +## Features + +### Core Features +- Real-time data collection +- Robust error handling +- Automatic reconnection +- Health monitoring +- Configurable timeframes + - Support for 1-second intervals + - Flexible timeframe configuration + - Custom timeframe aggregation + +### Exchange-Specific Features +- OKX: Full WebSocket support with configurable timeframes (1s+) +- More exchanges coming soon + ## Adding New Exchanges See [Technical Documentation](exchanges.md) for detailed implementation guide. diff --git a/docs/modules/exchanges/exchanges.md b/docs/modules/exchanges/exchanges.md index a572180..f6b1755 100644 --- a/docs/modules/exchanges/exchanges.md +++ b/docs/modules/exchanges/exchanges.md @@ -30,6 +30,7 @@ class ExchangeCollectorConfig: exchange: str symbol: str data_types: List[DataType] + timeframes: Optional[List[str]] = None # Timeframes for candle collection auto_restart: bool = True health_check_interval: float = 30.0 store_raw_data: bool = True @@ -43,6 +44,11 @@ class ExchangeCollectorConfig: raise InvalidConfigurationError("Symbol cannot be empty") if not self.data_types: raise InvalidConfigurationError("At least one data type must be specified") + if self.timeframes is not None: + if not all(isinstance(tf, str) for tf in self.timeframes): + raise InvalidConfigurationError("All timeframes must be strings") + if not self.timeframes: + raise InvalidConfigurationError("Timeframes list cannot be empty if provided") ``` ### Registry Configuration @@ -56,11 +62,26 @@ EXCHANGE_REGISTRY = { 'websocket': 'data.exchanges.okx.websocket.OKXWebSocketClient', 'name': 'OKX', 'supported_pairs': ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'DOGE-USDT', 'TON-USDT'], - 'supported_data_types': ['trade', 'orderbook', 'ticker', 'candles'] + 'supported_data_types': ['trade', 'orderbook', 'ticker', 'candles'], + 'supported_timeframes': ['1s', '5s', '1m', '5m', '15m', '1h', '4h', '1d'] # Available timeframes } } ``` +### Example Usage with Timeframes + +```python +# Create collector with specific timeframes +config = ExchangeCollectorConfig( + exchange="okx", + symbol="BTC-USDT", + data_types=[DataType.TRADE, DataType.CANDLE], + timeframes=['1s', '5s', '1m', '5m'] # Specify desired timeframes +) + +collector = ExchangeFactory.create_collector(config) +``` + ### Error Handling Custom exceptions hierarchy for precise error handling: diff --git a/docs/modules/exchanges/okx_collector.md b/docs/modules/exchanges/okx_collector.md index 50d33de..3ff1710 100644 --- a/docs/modules/exchanges/okx_collector.md +++ b/docs/modules/exchanges/okx_collector.md @@ -17,7 +17,10 @@ The OKX Data Collector provides real-time market data collection from OKX exchan - **Trades**: Real-time trade executions (`trades` channel) - **Orderbook**: 5-level order book depth (`books5` channel) - **Ticker**: 24h ticker statistics (`tickers` channel) -- **Candles**: Real-time OHLCV aggregation (1s, 5s, 10s, 15s, 30s, 1m, 5m, 15m, 1h, 4h, 1d) +- **Candles**: Real-time OHLCV aggregation with configurable timeframes + - Supports any timeframe from 1s upwards + - Common timeframes: 1s, 5s, 1m, 5m, 15m, 1h, 4h, 1d + - Custom timeframes can be configured in data_collection.json ### 🔧 **Configuration Options** - Auto-restart on failures @@ -25,7 +28,8 @@ The OKX Data Collector provides real-time market data collection from OKX exchan - Raw data storage toggle - Custom ping/pong timing - Reconnection attempts configuration -- Multi-timeframe candle aggregation +- Flexible timeframe configuration (1s, 5s, 1m, 5m, 15m, 1h, etc.) +- Configurable candle aggregation settings ## Quick Start @@ -173,9 +177,9 @@ from data.base_collector import DataType from data.common import CandleProcessingConfig async def main(): - # Configure multi-timeframe candle processing + # Configure multi-timeframe candle processing with 1s support candle_config = CandleProcessingConfig( - timeframes=['1s', '5s', '10s', '15s', '30s', '1m', '5m', '15m', '1h'], + timeframes=['1s', '5s', '1m', '5m', '15m', '1h'], # Including 1s timeframe auto_save_candles=True, emit_incomplete_candles=False ) @@ -184,6 +188,7 @@ async def main(): collector = OKXCollector( symbol='BTC-USDT', data_types=[DataType.TRADE], # Trades needed for candle aggregation + timeframes=['1s', '5s', '1m', '5m', '15m', '1h'], # Specify desired timeframes candle_config=candle_config, auto_restart=True, store_raw_data=False # Disable raw storage for production