""" Risk management utilities for backtesting. Handles funding rate calculations and liquidation detection. """ from dataclasses import dataclass import pandas as pd from engine.logging_config import get_logger from engine.market import MarketConfig, calculate_liquidation_price logger = get_logger(__name__) @dataclass class LiquidationEvent: """ Record of a liquidation event during backtesting. Attributes: entry_time: Timestamp when position was opened entry_price: Price at position entry liquidation_time: Timestamp when liquidation occurred liquidation_price: Calculated liquidation price actual_price: Actual price that triggered liquidation (high/low) direction: 'long' or 'short' margin_lost_pct: Percentage of margin lost (typically 100%) """ entry_time: pd.Timestamp entry_price: float liquidation_time: pd.Timestamp liquidation_price: float actual_price: float direction: str margin_lost_pct: float = 1.0 def calculate_funding( close: pd.Series, long_entries: pd.DataFrame, long_exits: pd.DataFrame, short_entries: pd.DataFrame, short_exits: pd.DataFrame, market_config: MarketConfig, leverage: int ) -> float: """ Calculate total funding paid/received for perpetual positions. Simplified model: applies funding rate every 8 hours to open positions. Positive rate means longs pay shorts. Args: close: Price series long_entries: Long entry signals long_exits: Long exit signals short_entries: Short entry signals short_exits: Short exit signals market_config: Market configuration with funding parameters leverage: Position leverage Returns: Total funding paid (positive) or received (negative) """ if market_config.funding_interval_hours == 0: return 0.0 funding_rate = market_config.funding_rate interval_hours = market_config.funding_interval_hours # Determine position state at each bar long_position = long_entries.cumsum() - long_exits.cumsum() short_position = short_entries.cumsum() - short_exits.cumsum() # Clamp to 0/1 (either in position or not) long_position = (long_position > 0).astype(int) short_position = (short_position > 0).astype(int) # Find funding timestamps (every 8 hours: 00:00, 08:00, 16:00 UTC) funding_times = close.index[close.index.hour % interval_hours == 0] total_funding = 0.0 for ts in funding_times: if ts not in close.index: continue price = close.loc[ts] # Long pays funding, short receives (when rate > 0) if isinstance(long_position, pd.DataFrame): long_open = long_position.loc[ts].any() short_open = short_position.loc[ts].any() else: long_open = long_position.loc[ts] > 0 short_open = short_position.loc[ts] > 0 position_value = price * leverage if long_open: total_funding += position_value * funding_rate if short_open: total_funding -= position_value * funding_rate return total_funding def inject_liquidation_exits( close: pd.Series, high: pd.Series, low: pd.Series, long_entries: pd.DataFrame | pd.Series, long_exits: pd.DataFrame | pd.Series, short_entries: pd.DataFrame | pd.Series, short_exits: pd.DataFrame | pd.Series, leverage: int, maintenance_margin_rate: float ) -> tuple[pd.DataFrame | pd.Series, pd.DataFrame | pd.Series, list[LiquidationEvent]]: """ Modify exit signals to force position closure at liquidation points. This function simulates realistic liquidation behavior by: 1. Finding positions that would be liquidated before their normal exit 2. Injecting forced exit signals at the liquidation bar 3. Recording all liquidation events Args: close: Close price series high: High price series low: Low price series long_entries: Long entry signals long_exits: Long exit signals short_entries: Short entry signals short_exits: Short exit signals leverage: Position leverage maintenance_margin_rate: Maintenance margin rate for liquidation Returns: Tuple of (modified_long_exits, modified_short_exits, liquidation_events) """ if leverage <= 1: return long_exits, short_exits, [] liquidation_events: list[LiquidationEvent] = [] # Convert to DataFrame if Series for consistent handling is_series = isinstance(long_entries, pd.Series) if is_series: long_entries_df = long_entries.to_frame() long_exits_df = long_exits.to_frame() short_entries_df = short_entries.to_frame() short_exits_df = short_exits.to_frame() else: long_entries_df = long_entries long_exits_df = long_exits.copy() short_entries_df = short_entries short_exits_df = short_exits.copy() modified_long_exits = long_exits_df.copy() modified_short_exits = short_exits_df.copy() # Process long positions long_mask = long_entries_df.any(axis=1) for entry_idx in close.index[long_mask]: entry_price = close.loc[entry_idx] liq_price = calculate_liquidation_price( entry_price, leverage, is_long=True, maintenance_margin_rate=maintenance_margin_rate ) # Find the normal exit for this entry subsequent_exits = long_exits_df.loc[entry_idx:].any(axis=1) exit_indices = subsequent_exits[subsequent_exits].index normal_exit_idx = exit_indices[0] if len(exit_indices) > 0 else close.index[-1] # Check if liquidation occurs before normal exit price_range = low.loc[entry_idx:normal_exit_idx] if (price_range < liq_price).any(): liq_bar = price_range[price_range < liq_price].index[0] # Inject forced exit at liquidation bar for col in modified_long_exits.columns: modified_long_exits.loc[liq_bar, col] = True # Record the liquidation event liquidation_events.append(LiquidationEvent( entry_time=entry_idx, entry_price=entry_price, liquidation_time=liq_bar, liquidation_price=liq_price, actual_price=low.loc[liq_bar], direction='long', margin_lost_pct=1.0 )) logger.warning( "LIQUIDATION (Long): Entry %s ($%.2f) -> Liquidated %s " "(liq=$%.2f, low=$%.2f)", entry_idx.strftime('%Y-%m-%d'), entry_price, liq_bar.strftime('%Y-%m-%d'), liq_price, low.loc[liq_bar] ) # Process short positions short_mask = short_entries_df.any(axis=1) for entry_idx in close.index[short_mask]: entry_price = close.loc[entry_idx] liq_price = calculate_liquidation_price( entry_price, leverage, is_long=False, maintenance_margin_rate=maintenance_margin_rate ) # Find the normal exit for this entry subsequent_exits = short_exits_df.loc[entry_idx:].any(axis=1) exit_indices = subsequent_exits[subsequent_exits].index normal_exit_idx = exit_indices[0] if len(exit_indices) > 0 else close.index[-1] # Check if liquidation occurs before normal exit price_range = high.loc[entry_idx:normal_exit_idx] if (price_range > liq_price).any(): liq_bar = price_range[price_range > liq_price].index[0] # Inject forced exit at liquidation bar for col in modified_short_exits.columns: modified_short_exits.loc[liq_bar, col] = True # Record the liquidation event liquidation_events.append(LiquidationEvent( entry_time=entry_idx, entry_price=entry_price, liquidation_time=liq_bar, liquidation_price=liq_price, actual_price=high.loc[liq_bar], direction='short', margin_lost_pct=1.0 )) logger.warning( "LIQUIDATION (Short): Entry %s ($%.2f) -> Liquidated %s " "(liq=$%.2f, high=$%.2f)", entry_idx.strftime('%Y-%m-%d'), entry_price, liq_bar.strftime('%Y-%m-%d'), liq_price, high.loc[liq_bar] ) # Convert back to Series if input was Series if is_series: modified_long_exits = modified_long_exits.iloc[:, 0] modified_short_exits = modified_short_exits.iloc[:, 0] return modified_long_exits, modified_short_exits, liquidation_events def calculate_liquidation_adjustment( liquidation_events: list[LiquidationEvent], init_cash: float, leverage: int ) -> tuple[float, float]: """ Calculate the return adjustment for liquidated positions. VectorBT calculates trade P&L using close price at exit bar. For liquidations, the actual loss is 100% of the position margin. This function calculates the difference between what VectorBT recorded and what actually would have happened. In our portfolio setup: - Long/short each get half the capital (init_cash * leverage / 2) - Each trade uses 100% of that allocation (size=1.0, percent) - On liquidation, the margin for that trade is lost entirely The adjustment is the DIFFERENCE between: - VectorBT's calculated P&L (exit at close price) - Actual liquidation P&L (100% margin loss) Args: liquidation_events: List of liquidation events init_cash: Initial portfolio cash (before leverage) leverage: Position leverage used Returns: Tuple of (total_margin_lost, adjustment_pct) - total_margin_lost: Estimated total margin lost from liquidations - adjustment_pct: Percentage adjustment to apply to returns """ if not liquidation_events: return 0.0, 0.0 # In our setup, each side (long/short) gets half the capital # Margin per side = init_cash / 2 margin_per_side = init_cash / 2 # For each liquidation, VectorBT recorded some P&L based on close price # The actual P&L should be -100% of the margin used for that trade # # We estimate the adjustment as: # - Each liquidation should have resulted in ~-20% loss (at 5x leverage) # - VectorBT may have recorded a different value # - The margin loss is (1/leverage) per trade that gets liquidated # Calculate liquidation loss rate based on leverage # At 5x leverage, liquidation = ~19.6% adverse move = 100% margin loss liq_loss_rate = 1.0 / leverage # Approximate loss per trade as % of position # Count liquidations n_liquidations = len(liquidation_events) # Estimate total margin lost: # Each liquidation on average loses the margin for that trade # Since VectorBT uses half capital per side, and we trade 100% size, # each liquidation loses approximately margin_per_side # But we cap at available capital total_margin_lost = min(n_liquidations * margin_per_side * liq_loss_rate, init_cash) # Calculate as percentage of initial capital adjustment_pct = (total_margin_lost / init_cash) * 100 return total_margin_lost, adjustment_pct def check_liquidations( close: pd.Series, high: pd.Series, low: pd.Series, long_entries: pd.DataFrame, long_exits: pd.DataFrame, short_entries: pd.DataFrame, short_exits: pd.DataFrame, leverage: int, maintenance_margin_rate: float ) -> int: """ Check for liquidation events and log warnings. Args: close: Close price series high: High price series low: Low price series long_entries: Long entry signals long_exits: Long exit signals short_entries: Short entry signals short_exits: Short exit signals leverage: Position leverage maintenance_margin_rate: Maintenance margin rate for liquidation Returns: Count of liquidation warnings """ warnings = 0 # For long positions long_mask = ( long_entries.any(axis=1) if isinstance(long_entries, pd.DataFrame) else long_entries ) for entry_idx in close.index[long_mask]: entry_price = close.loc[entry_idx] liq_price = calculate_liquidation_price( entry_price, leverage, is_long=True, maintenance_margin_rate=maintenance_margin_rate ) subsequent = low.loc[entry_idx:] if (subsequent < liq_price).any(): liq_bar = subsequent[subsequent < liq_price].index[0] logger.warning( "LIQUIDATION WARNING (Long): Entry at %s ($%.2f), " "would liquidate at %s (liq_price=$%.2f, low=$%.2f)", entry_idx, entry_price, liq_bar, liq_price, low.loc[liq_bar] ) warnings += 1 # For short positions short_mask = ( short_entries.any(axis=1) if isinstance(short_entries, pd.DataFrame) else short_entries ) for entry_idx in close.index[short_mask]: entry_price = close.loc[entry_idx] liq_price = calculate_liquidation_price( entry_price, leverage, is_long=False, maintenance_margin_rate=maintenance_margin_rate ) subsequent = high.loc[entry_idx:] if (subsequent > liq_price).any(): liq_bar = subsequent[subsequent > liq_price].index[0] logger.warning( "LIQUIDATION WARNING (Short): Entry at %s ($%.2f), " "would liquidate at %s (liq_price=$%.2f, high=$%.2f)", entry_idx, entry_price, liq_bar, liq_price, high.loc[liq_bar] ) warnings += 1 return warnings