diff --git a/config/okx_config.json b/config/okx_config.json index 2a41d9f..056bed3 100644 --- a/config/okx_config.json +++ b/config/okx_config.json @@ -52,6 +52,7 @@ "database": { "store_processed_data": true, "store_raw_data": true, + "force_update_candles": false, "batch_size": 100, "flush_interval": 5.0 }, diff --git a/data/exchanges/okx/collector.py b/data/exchanges/okx/collector.py index a72ece3..24bdf00 100644 --- a/data/exchanges/okx/collector.py +++ b/data/exchanges/okx/collector.py @@ -52,6 +52,7 @@ class OKXCollector(BaseDataCollector): auto_restart: bool = True, health_check_interval: float = 30.0, store_raw_data: bool = True, + force_update_candles: bool = False, logger = None, log_errors_only: bool = False): """ @@ -64,6 +65,7 @@ class OKXCollector(BaseDataCollector): auto_restart: Enable automatic restart on failures health_check_interval: Seconds between health checks store_raw_data: Whether to store raw data for debugging + force_update_candles: If True, update existing candles; if False, keep existing candles unchanged logger: Logger instance for conditional logging (None for no logging) log_errors_only: If True and logger provided, only log error-level messages """ @@ -90,6 +92,7 @@ class OKXCollector(BaseDataCollector): # OKX-specific settings self.symbol = symbol self.store_raw_data = store_raw_data + self.force_update_candles = force_update_candles # WebSocket client self._ws_client: Optional[OKXWebSocketClient] = None @@ -391,6 +394,10 @@ class OKXCollector(BaseDataCollector): """ Store completed OHLCV candle in the market_data table. + Handles duplicate candles based on force_update_candles setting: + - If force_update_candles=True: UPDATE existing records with latest values + - If force_update_candles=False: IGNORE duplicates, keep existing records unchanged + Args: candle: Completed OHLCV candle """ @@ -398,27 +405,73 @@ class OKXCollector(BaseDataCollector): if not self._db_manager: return - # Store completed candles in market_data table + # Use right-aligned timestamp (end_time) following industry standard + candle_timestamp = candle.end_time + + # Store completed candles in market_data table with configurable duplicate handling with self._db_manager.get_session() as session: - market_data = MarketData( - exchange=candle.exchange, - symbol=candle.symbol, - timeframe=candle.timeframe, - timestamp=candle.start_time, # Use start_time as the candle timestamp - open=candle.open, - high=candle.high, - low=candle.low, - close=candle.close, - volume=candle.volume, - trades_count=candle.trade_count - ) - session.add(market_data) + if self.force_update_candles: + # Force update: Overwrite existing candles with new data + upsert_query = """ + INSERT INTO market_data ( + exchange, symbol, timeframe, timestamp, + open, high, low, close, volume, trades_count, + created_at, updated_at + ) VALUES ( + :exchange, :symbol, :timeframe, :timestamp, + :open, :high, :low, :close, :volume, :trades_count, + NOW(), NOW() + ) + ON CONFLICT (exchange, symbol, timeframe, timestamp) + DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + trades_count = EXCLUDED.trades_count, + updated_at = NOW() + """ + action_type = "Updated" + else: + # Keep existing: Ignore duplicates, preserve first candle + upsert_query = """ + INSERT INTO market_data ( + exchange, symbol, timeframe, timestamp, + open, high, low, close, volume, trades_count, + created_at, updated_at + ) VALUES ( + :exchange, :symbol, :timeframe, :timestamp, + :open, :high, :low, :close, :volume, :trades_count, + NOW(), NOW() + ) + ON CONFLICT (exchange, symbol, timeframe, timestamp) + DO NOTHING + """ + action_type = "Stored" + + session.execute(upsert_query, { + 'exchange': candle.exchange, + 'symbol': candle.symbol, + 'timeframe': candle.timeframe, + 'timestamp': candle_timestamp, + 'open': float(candle.open), + 'high': float(candle.high), + 'low': float(candle.low), + 'close': float(candle.close), + 'volume': float(candle.volume), + 'trades_count': candle.trade_count + }) + if self.logger: - self.logger.info(f"{self.component_name}: Stored completed candle: {candle.symbol} {candle.timeframe} at {candle.start_time}") + self.logger.info(f"{self.component_name}: {action_type} candle: {candle.symbol} {candle.timeframe} at {candle_timestamp} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}") except Exception as e: if self.logger: self.logger.error(f"{self.component_name}: Error storing completed candle: {e}") + # Log candle details for debugging + self.logger.error(f"{self.component_name}: Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}") + self._error_count += 1 async def _store_raw_data(self, channel: str, raw_message: Dict[str, Any]) -> None: """ @@ -507,6 +560,7 @@ class OKXCollector(BaseDataCollector): "websocket_connected": self._ws_client.is_connected if self._ws_client else False, "websocket_state": self._ws_client.connection_state.value if self._ws_client else "disconnected", "store_raw_data": self.store_raw_data, + "force_update_candles": self.force_update_candles, "processing_stats": { "messages_received": self._message_count, "trades_processed": self._processed_trades, diff --git a/database/schema_clean.sql b/database/schema_clean.sql index 09eaaeb..109b343 100644 --- a/database/schema_clean.sql +++ b/database/schema_clean.sql @@ -18,7 +18,7 @@ CREATE TABLE market_data ( id SERIAL PRIMARY KEY, exchange VARCHAR(50) NOT NULL DEFAULT 'okx', symbol VARCHAR(20) NOT NULL, - timeframe VARCHAR(5) NOT NULL, -- 1m, 5m, 15m, 1h, 4h, 1d + timeframe VARCHAR(5) NOT NULL, -- 1s, 5s, 10s, 15s, 30s, 1m, 5m, 15m, 1h, 4h, 1d timestamp TIMESTAMPTZ NOT NULL, open DECIMAL(18,8) NOT NULL, high DECIMAL(18,8) NOT NULL, @@ -27,6 +27,7 @@ CREATE TABLE market_data ( volume DECIMAL(18,8) NOT NULL, trades_count INTEGER, -- number of trades in this candle created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), CONSTRAINT unique_market_data UNIQUE(exchange, symbol, timeframe, timestamp) );