Add force_update_candles configuration to OKX data collector

- Introduced `force_update_candles` option in `okx_config.json` to control candle update behavior.
- Updated `OKXCollector` to handle candle storage based on the `force_update_candles` setting, allowing for either updating existing records or preserving them.
- Enhanced logging to reflect the action taken during candle storage, improving traceability.
- Updated database schema to include `updated_at` timestamp for better tracking of data changes.
This commit is contained in:
Vasily.onl 2025-06-02 12:41:31 +08:00
parent 02a51521a0
commit 5b4547edd5
3 changed files with 72 additions and 16 deletions

View File

@ -52,6 +52,7 @@
"database": { "database": {
"store_processed_data": true, "store_processed_data": true,
"store_raw_data": true, "store_raw_data": true,
"force_update_candles": false,
"batch_size": 100, "batch_size": 100,
"flush_interval": 5.0 "flush_interval": 5.0
}, },

View File

@ -52,6 +52,7 @@ class OKXCollector(BaseDataCollector):
auto_restart: bool = True, auto_restart: bool = True,
health_check_interval: float = 30.0, health_check_interval: float = 30.0,
store_raw_data: bool = True, store_raw_data: bool = True,
force_update_candles: bool = False,
logger = None, logger = None,
log_errors_only: bool = False): log_errors_only: bool = False):
""" """
@ -64,6 +65,7 @@ class OKXCollector(BaseDataCollector):
auto_restart: Enable automatic restart on failures auto_restart: Enable automatic restart on failures
health_check_interval: Seconds between health checks health_check_interval: Seconds between health checks
store_raw_data: Whether to store raw data for debugging store_raw_data: Whether to store raw data for debugging
force_update_candles: If True, update existing candles; if False, keep existing candles unchanged
logger: Logger instance for conditional logging (None for no logging) logger: Logger instance for conditional logging (None for no logging)
log_errors_only: If True and logger provided, only log error-level messages log_errors_only: If True and logger provided, only log error-level messages
""" """
@ -90,6 +92,7 @@ class OKXCollector(BaseDataCollector):
# OKX-specific settings # OKX-specific settings
self.symbol = symbol self.symbol = symbol
self.store_raw_data = store_raw_data self.store_raw_data = store_raw_data
self.force_update_candles = force_update_candles
# WebSocket client # WebSocket client
self._ws_client: Optional[OKXWebSocketClient] = None self._ws_client: Optional[OKXWebSocketClient] = None
@ -391,6 +394,10 @@ class OKXCollector(BaseDataCollector):
""" """
Store completed OHLCV candle in the market_data table. Store completed OHLCV candle in the market_data table.
Handles duplicate candles based on force_update_candles setting:
- If force_update_candles=True: UPDATE existing records with latest values
- If force_update_candles=False: IGNORE duplicates, keep existing records unchanged
Args: Args:
candle: Completed OHLCV candle candle: Completed OHLCV candle
""" """
@ -398,27 +405,73 @@ class OKXCollector(BaseDataCollector):
if not self._db_manager: if not self._db_manager:
return return
# Store completed candles in market_data table # Use right-aligned timestamp (end_time) following industry standard
candle_timestamp = candle.end_time
# Store completed candles in market_data table with configurable duplicate handling
with self._db_manager.get_session() as session: with self._db_manager.get_session() as session:
market_data = MarketData( if self.force_update_candles:
exchange=candle.exchange, # Force update: Overwrite existing candles with new data
symbol=candle.symbol, upsert_query = """
timeframe=candle.timeframe, INSERT INTO market_data (
timestamp=candle.start_time, # Use start_time as the candle timestamp exchange, symbol, timeframe, timestamp,
open=candle.open, open, high, low, close, volume, trades_count,
high=candle.high, created_at, updated_at
low=candle.low, ) VALUES (
close=candle.close, :exchange, :symbol, :timeframe, :timestamp,
volume=candle.volume, :open, :high, :low, :close, :volume, :trades_count,
trades_count=candle.trade_count NOW(), NOW()
) )
session.add(market_data) ON CONFLICT (exchange, symbol, timeframe, timestamp)
DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume,
trades_count = EXCLUDED.trades_count,
updated_at = NOW()
"""
action_type = "Updated"
else:
# Keep existing: Ignore duplicates, preserve first candle
upsert_query = """
INSERT INTO market_data (
exchange, symbol, timeframe, timestamp,
open, high, low, close, volume, trades_count,
created_at, updated_at
) VALUES (
:exchange, :symbol, :timeframe, :timestamp,
:open, :high, :low, :close, :volume, :trades_count,
NOW(), NOW()
)
ON CONFLICT (exchange, symbol, timeframe, timestamp)
DO NOTHING
"""
action_type = "Stored"
session.execute(upsert_query, {
'exchange': candle.exchange,
'symbol': candle.symbol,
'timeframe': candle.timeframe,
'timestamp': candle_timestamp,
'open': float(candle.open),
'high': float(candle.high),
'low': float(candle.low),
'close': float(candle.close),
'volume': float(candle.volume),
'trades_count': candle.trade_count
})
if self.logger: if self.logger:
self.logger.info(f"{self.component_name}: Stored completed candle: {candle.symbol} {candle.timeframe} at {candle.start_time}") self.logger.info(f"{self.component_name}: {action_type} candle: {candle.symbol} {candle.timeframe} at {candle_timestamp} (force_update={self.force_update_candles}) - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}, Vol: {candle.volume}, Trades: {candle.trade_count}")
except Exception as e: except Exception as e:
if self.logger: if self.logger:
self.logger.error(f"{self.component_name}: Error storing completed candle: {e}") self.logger.error(f"{self.component_name}: Error storing completed candle: {e}")
# Log candle details for debugging
self.logger.error(f"{self.component_name}: Failed candle details: {candle.symbol} {candle.timeframe} {candle.end_time} - OHLCV: {candle.open}/{candle.high}/{candle.low}/{candle.close}")
self._error_count += 1
async def _store_raw_data(self, channel: str, raw_message: Dict[str, Any]) -> None: async def _store_raw_data(self, channel: str, raw_message: Dict[str, Any]) -> None:
""" """
@ -507,6 +560,7 @@ class OKXCollector(BaseDataCollector):
"websocket_connected": self._ws_client.is_connected if self._ws_client else False, "websocket_connected": self._ws_client.is_connected if self._ws_client else False,
"websocket_state": self._ws_client.connection_state.value if self._ws_client else "disconnected", "websocket_state": self._ws_client.connection_state.value if self._ws_client else "disconnected",
"store_raw_data": self.store_raw_data, "store_raw_data": self.store_raw_data,
"force_update_candles": self.force_update_candles,
"processing_stats": { "processing_stats": {
"messages_received": self._message_count, "messages_received": self._message_count,
"trades_processed": self._processed_trades, "trades_processed": self._processed_trades,

View File

@ -18,7 +18,7 @@ CREATE TABLE market_data (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
exchange VARCHAR(50) NOT NULL DEFAULT 'okx', exchange VARCHAR(50) NOT NULL DEFAULT 'okx',
symbol VARCHAR(20) NOT NULL, symbol VARCHAR(20) NOT NULL,
timeframe VARCHAR(5) NOT NULL, -- 1m, 5m, 15m, 1h, 4h, 1d timeframe VARCHAR(5) NOT NULL, -- 1s, 5s, 10s, 15s, 30s, 1m, 5m, 15m, 1h, 4h, 1d
timestamp TIMESTAMPTZ NOT NULL, timestamp TIMESTAMPTZ NOT NULL,
open DECIMAL(18,8) NOT NULL, open DECIMAL(18,8) NOT NULL,
high DECIMAL(18,8) NOT NULL, high DECIMAL(18,8) NOT NULL,
@ -27,6 +27,7 @@ CREATE TABLE market_data (
volume DECIMAL(18,8) NOT NULL, volume DECIMAL(18,8) NOT NULL,
trades_count INTEGER, -- number of trades in this candle trades_count INTEGER, -- number of trades in this candle
created_at TIMESTAMPTZ DEFAULT NOW(), created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT unique_market_data UNIQUE(exchange, symbol, timeframe, timestamp) CONSTRAINT unique_market_data UNIQUE(exchange, symbol, timeframe, timestamp)
); );