Files
lowkey_backtest/live_trading/live_regime_strategy.py
Simon Moisy 35992ee374 Enhance SL/TP calculation and order handling in LiveRegimeStrategy and OKXClient
- Updated `calculate_sl_tp` method to handle invalid entry prices and sides, returning (None, None) when necessary.
- Improved logging for SL/TP values in `LiveTradingBot` to display "N/A" for invalid values.
- Refined order placement in `OKXClient` to ensure guaranteed fill price retrieval, with fallback mechanisms for fetching order details and ticker prices if needed.
- Added error handling for scenarios where fill prices cannot be determined.
2026-01-16 13:54:26 +08:00

301 lines
10 KiB
Python

"""
Live Regime Reversion Strategy.
Adapts the backtest regime strategy for live trading.
Uses a pre-trained ML model or trains on historical data.
"""
import logging
import pickle
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from .config import TradingConfig, PathConfig
logger = logging.getLogger(__name__)
class LiveRegimeStrategy:
"""
Live trading implementation of the ML-based regime detection
and mean reversion strategy.
Logic:
1. Calculates BTC/ETH spread Z-Score
2. Uses Random Forest to predict reversion probability
3. Applies funding rate filter
4. Generates long/short signals on ETH perpetual
"""
def __init__(
self,
trading_config: TradingConfig,
path_config: PathConfig
):
self.config = trading_config
self.paths = path_config
self.model: Optional[RandomForestClassifier] = None
self.feature_cols: Optional[list] = None
self._load_or_train_model()
def _load_or_train_model(self) -> None:
"""Load pre-trained model or train a new one."""
if self.paths.model_path.exists():
try:
with open(self.paths.model_path, 'rb') as f:
saved = pickle.load(f)
self.model = saved['model']
self.feature_cols = saved['feature_cols']
logger.info(f"Loaded model from {self.paths.model_path}")
return
except Exception as e:
logger.warning(f"Could not load model: {e}")
logger.info("No pre-trained model found. Will train on first data batch.")
def save_model(self) -> None:
"""Save trained model to file."""
if self.model is None:
return
try:
with open(self.paths.model_path, 'wb') as f:
pickle.dump({
'model': self.model,
'feature_cols': self.feature_cols,
}, f)
logger.info(f"Saved model to {self.paths.model_path}")
except Exception as e:
logger.error(f"Could not save model: {e}")
def train_model(self, features: pd.DataFrame) -> None:
"""
Train the Random Forest model on historical data.
Args:
features: DataFrame with calculated features
"""
logger.info(f"Training model on {len(features)} samples...")
z_thresh = self.config.z_entry_threshold
horizon = 102 # Optimal horizon from research
profit_target = 0.005 # 0.5% profit threshold
# Define targets
future_min = features['spread'].rolling(window=horizon).min().shift(-horizon)
future_max = features['spread'].rolling(window=horizon).max().shift(-horizon)
target_short = features['spread'] * (1 - profit_target)
target_long = features['spread'] * (1 + profit_target)
success_short = (features['z_score'] > z_thresh) & (future_min < target_short)
success_long = (features['z_score'] < -z_thresh) & (future_max > target_long)
targets = np.select([success_short, success_long], [1, 1], default=0)
# Exclude non-feature columns
exclude = ['spread', 'btc_close', 'eth_close', 'eth_volume']
self.feature_cols = [c for c in features.columns if c not in exclude]
# Clean features
X = features[self.feature_cols].fillna(0)
X = X.replace([np.inf, -np.inf], 0)
# Remove rows with invalid targets
valid_mask = ~np.isnan(targets) & future_min.notna().values & future_max.notna().values
X_clean = X[valid_mask]
y_clean = targets[valid_mask]
if len(X_clean) < 100:
logger.warning("Not enough data to train model")
return
# Train model
self.model = RandomForestClassifier(
n_estimators=300,
max_depth=5,
min_samples_leaf=30,
class_weight={0: 1, 1: 3},
random_state=42
)
self.model.fit(X_clean, y_clean)
logger.info(f"Model trained on {len(X_clean)} samples")
self.save_model()
def generate_signal(
self,
features: pd.DataFrame,
current_funding: dict
) -> dict:
"""
Generate trading signal from latest features.
Args:
features: DataFrame with calculated features
current_funding: Dictionary with funding rate data
Returns:
Signal dictionary with action, side, confidence, etc.
"""
if self.model is None:
# Train model if not available
if len(features) >= 200:
self.train_model(features)
else:
return {'action': 'hold', 'reason': 'model_not_trained'}
if self.model is None:
return {'action': 'hold', 'reason': 'insufficient_data_for_training'}
# Get latest row
latest = features.iloc[-1]
z_score = latest['z_score']
eth_price = latest['eth_close']
btc_price = latest['btc_close']
# Prepare features for prediction
X = features[self.feature_cols].iloc[[-1]].fillna(0)
X = X.replace([np.inf, -np.inf], 0)
# Get prediction probability
prob = self.model.predict_proba(X)[0, 1]
# Apply thresholds
z_thresh = self.config.z_entry_threshold
prob_thresh = self.config.model_prob_threshold
# Determine signal direction
signal = {
'action': 'hold',
'side': None,
'probability': prob,
'z_score': z_score,
'eth_price': eth_price,
'btc_price': btc_price,
'reason': '',
}
# Check for entry conditions
if prob > prob_thresh:
if z_score > z_thresh:
# Spread high (ETH expensive relative to BTC) -> Short ETH
signal['action'] = 'entry'
signal['side'] = 'short'
signal['reason'] = f'z_score={z_score:.2f}>threshold, prob={prob:.2f}'
elif z_score < -z_thresh:
# Spread low (ETH cheap relative to BTC) -> Long ETH
signal['action'] = 'entry'
signal['side'] = 'long'
signal['reason'] = f'z_score={z_score:.2f}<-threshold, prob={prob:.2f}'
else:
signal['reason'] = f'z_score={z_score:.2f} within threshold'
else:
signal['reason'] = f'prob={prob:.2f}<threshold'
# Apply funding rate filter
if signal['action'] == 'entry':
btc_funding = current_funding.get('btc_funding', 0)
funding_thresh = self.config.funding_threshold
if signal['side'] == 'long' and btc_funding > funding_thresh:
# High positive funding = overheated, don't go long
signal['action'] = 'hold'
signal['reason'] = f'funding_filter_blocked_long (funding={btc_funding:.4f})'
elif signal['side'] == 'short' and btc_funding < -funding_thresh:
# High negative funding = oversold, don't go short
signal['action'] = 'hold'
signal['reason'] = f'funding_filter_blocked_short (funding={btc_funding:.4f})'
# Check for exit conditions (mean reversion complete)
if signal['action'] == 'hold':
# Z-score crossed back through 0
if abs(z_score) < 0.3:
signal['action'] = 'check_exit'
signal['reason'] = f'z_score_reverted_to_mean ({z_score:.2f})'
logger.info(
f"Signal: {signal['action']} {signal['side'] or ''} "
f"(prob={prob:.2f}, z={z_score:.2f}, reason={signal['reason']})"
)
return signal
def calculate_position_size(
self,
signal: dict,
available_usdt: float
) -> float:
"""
Calculate position size based on signal confidence.
Args:
signal: Signal dictionary with probability
available_usdt: Available USDT balance
Returns:
Position size in USDT
"""
prob = signal.get('probability', 0.5)
# Base size: if max_position_usdt <= 0, use all available funds
if self.config.max_position_usdt <= 0:
base_size = available_usdt
else:
base_size = min(available_usdt, self.config.max_position_usdt)
# Scale by probability (1.0x at 0.5 prob, up to 1.6x at 0.8 prob)
scale = 1.0 + (prob - 0.5) * 2.0
scale = max(1.0, min(scale, 2.0)) # Clamp between 1x and 2x
size = base_size * scale
# Ensure minimum position size
if size < self.config.min_position_usdt:
return 0.0
return min(size, available_usdt * 0.95) # Leave 5% buffer
def calculate_sl_tp(
self,
entry_price: Optional[float],
side: str
) -> tuple[Optional[float], Optional[float]]:
"""
Calculate stop-loss and take-profit prices.
Args:
entry_price: Entry price
side: "long" or "short"
Returns:
Tuple of (stop_loss_price, take_profit_price), or (None, None) if
entry_price is invalid
Raises:
ValueError: If side is not "long" or "short"
"""
if entry_price is None or entry_price <= 0:
logger.error(
f"Invalid entry_price for SL/TP calculation: {entry_price}"
)
return None, None
if side not in ("long", "short"):
raise ValueError(f"Invalid side: {side}. Must be 'long' or 'short'")
sl_pct = self.config.stop_loss_pct
tp_pct = self.config.take_profit_pct
if side == "long":
stop_loss = entry_price * (1 - sl_pct)
take_profit = entry_price * (1 + tp_pct)
else: # short
stop_loss = entry_price * (1 + sl_pct)
take_profit = entry_price * (1 - tp_pct)
return stop_loss, take_profit