Vasily.onl c9ae507bb7 Implement Incremental Trading Framework
- Introduced a comprehensive framework for incremental trading strategies, including modules for strategy execution, backtesting, and data processing.
- Added key components such as `IncTrader`, `IncBacktester`, and various trading strategies (e.g., `MetaTrendStrategy`, `BBRSStrategy`, `RandomStrategy`) to facilitate real-time trading and backtesting.
- Implemented a robust backtesting framework with configuration management, parallel execution, and result analysis capabilities.
- Developed an incremental indicators framework to support real-time data processing with constant memory usage.
- Enhanced documentation to provide clear usage examples and architecture overview, ensuring maintainability and ease of understanding for future development.
- Ensured compatibility with existing strategies and maintained a focus on performance and scalability throughout the implementation.
2025-05-28 16:29:48 +08:00

637 lines
26 KiB
Python

"""
Base classes for the incremental strategy system.
This module contains the fundamental building blocks for all incremental trading strategies:
- IncStrategySignal: Represents trading signals with confidence and metadata
- IncStrategyBase: Abstract base class that all incremental strategies must inherit from
- TimeframeAggregator: Built-in timeframe aggregation for minute-level data processing
The incremental approach allows strategies to:
- Process new data points without full recalculation
- Maintain bounded memory usage regardless of data history length
- Provide real-time performance with minimal latency
- Support both initialization and incremental modes
- Accept minute-level data and internally aggregate to any timeframe
"""
import pandas as pd
from abc import ABC, abstractmethod
from typing import Dict, Optional, List, Union, Any
from collections import deque
import logging
import time
logger = logging.getLogger(__name__)
class IncStrategySignal:
"""
Represents a trading signal from an incremental strategy.
A signal encapsulates the strategy's recommendation along with confidence
level, optional price target, and additional metadata.
Attributes:
signal_type (str): Type of signal - "ENTRY", "EXIT", or "HOLD"
confidence (float): Confidence level from 0.0 to 1.0
price (Optional[float]): Optional specific price for the signal
metadata (Dict): Additional signal data and context
Example:
# Entry signal with high confidence
signal = IncStrategySignal("ENTRY", confidence=0.8)
# Exit signal with stop loss price
signal = IncStrategySignal("EXIT", confidence=1.0, price=50000,
metadata={"type": "STOP_LOSS"})
"""
def __init__(self, signal_type: str, confidence: float = 1.0,
price: Optional[float] = None, metadata: Optional[Dict] = None):
"""
Initialize a strategy signal.
Args:
signal_type: Type of signal ("ENTRY", "EXIT", "HOLD")
confidence: Confidence level (0.0 to 1.0)
price: Optional specific price for the signal
metadata: Additional signal data and context
"""
self.signal_type = signal_type
self.confidence = max(0.0, min(1.0, confidence)) # Clamp to [0,1]
self.price = price
self.metadata = metadata or {}
@classmethod
def BUY(cls, confidence: float = 1.0, price: Optional[float] = None, **metadata):
"""Create a BUY signal."""
return cls("ENTRY", confidence, price, metadata)
@classmethod
def SELL(cls, confidence: float = 1.0, price: Optional[float] = None, **metadata):
"""Create a SELL signal."""
return cls("EXIT", confidence, price, metadata)
@classmethod
def HOLD(cls, confidence: float = 0.0, **metadata):
"""Create a HOLD signal."""
return cls("HOLD", confidence, None, metadata)
def __repr__(self) -> str:
"""String representation of the signal."""
return (f"IncStrategySignal(type={self.signal_type}, "
f"confidence={self.confidence:.2f}, "
f"price={self.price}, metadata={self.metadata})")
class TimeframeAggregator:
"""
Handles real-time aggregation of minute data to higher timeframes.
This class accumulates minute-level OHLCV data and produces complete
bars when a timeframe period is completed. Integrated into IncStrategyBase
to provide consistent minute-level data processing across all strategies.
"""
def __init__(self, timeframe_minutes: int = 15):
"""
Initialize timeframe aggregator.
Args:
timeframe_minutes: Target timeframe in minutes (e.g., 60 for 1h, 15 for 15min)
"""
self.timeframe_minutes = timeframe_minutes
self.current_bar = None
self.current_bar_start = None
self.last_completed_bar = None
def update(self, timestamp: pd.Timestamp, ohlcv_data: Dict[str, float]) -> Optional[Dict[str, float]]:
"""
Update with new minute data and return completed bar if timeframe is complete.
Args:
timestamp: Timestamp of the data
ohlcv_data: OHLCV data dictionary
Returns:
Completed OHLCV bar if timeframe period ended, None otherwise
"""
# Calculate which timeframe bar this timestamp belongs to
bar_start = self._get_bar_start_time(timestamp)
# Check if we're starting a new bar
if self.current_bar_start != bar_start:
# Save the completed bar (if any)
completed_bar = self.current_bar.copy() if self.current_bar is not None else None
# Start new bar
self.current_bar_start = bar_start
self.current_bar = {
'timestamp': bar_start,
'open': ohlcv_data['close'], # Use current close as open for new bar
'high': ohlcv_data['close'],
'low': ohlcv_data['close'],
'close': ohlcv_data['close'],
'volume': ohlcv_data['volume']
}
# Return the completed bar (if any)
if completed_bar is not None:
self.last_completed_bar = completed_bar
return completed_bar
else:
# Update current bar with new data
if self.current_bar is not None:
self.current_bar['high'] = max(self.current_bar['high'], ohlcv_data['high'])
self.current_bar['low'] = min(self.current_bar['low'], ohlcv_data['low'])
self.current_bar['close'] = ohlcv_data['close']
self.current_bar['volume'] += ohlcv_data['volume']
return None # No completed bar yet
def _get_bar_start_time(self, timestamp: pd.Timestamp) -> pd.Timestamp:
"""Calculate the start time of the timeframe bar for given timestamp.
This method aligns with pandas resampling to ensure consistency
with the original strategy's bar boundaries.
"""
# Use pandas-style resampling alignment
# This ensures bars align to standard boundaries (e.g., 00:00, 00:15, 00:30, 00:45)
freq_str = f'{self.timeframe_minutes}min'
try:
# Create a temporary series with the timestamp and resample to get the bar start
temp_series = pd.Series([1], index=[timestamp])
resampled = temp_series.resample(freq_str)
# Get the first group's name (which is the bar start time)
for bar_start, _ in resampled:
return bar_start
except Exception:
# Fallback to original method if resampling fails
pass
# Fallback method
minutes_since_midnight = timestamp.hour * 60 + timestamp.minute
bar_minutes = (minutes_since_midnight // self.timeframe_minutes) * self.timeframe_minutes
return timestamp.replace(
hour=bar_minutes // 60,
minute=bar_minutes % 60,
second=0,
microsecond=0
)
def get_current_bar(self) -> Optional[Dict[str, float]]:
"""Get the current incomplete bar (for debugging)."""
return self.current_bar.copy() if self.current_bar is not None else None
def reset(self):
"""Reset aggregator state."""
self.current_bar = None
self.current_bar_start = None
self.last_completed_bar = None
class IncStrategyBase(ABC):
"""
Abstract base class for all incremental trading strategies.
This class defines the interface that all incremental strategies must implement:
- get_minimum_buffer_size(): Specify minimum data requirements
- process_data_point(): Process new data points incrementally
- supports_incremental_calculation(): Whether strategy supports incremental mode
- get_entry_signal(): Generate entry signals
- get_exit_signal(): Generate exit signals
The incremental approach allows strategies to:
- Process new data points without full recalculation
- Maintain bounded memory usage regardless of data history length
- Provide real-time performance with minimal latency
- Support both initialization and incremental modes
- Accept minute-level data and internally aggregate to any timeframe
New Features:
- Built-in TimeframeAggregator for minute-level data processing
- update_minute_data() method for real-time trading systems
- Automatic timeframe detection and aggregation
- Backward compatibility with existing update() methods
Attributes:
name (str): Strategy name
weight (float): Strategy weight for combination
params (Dict): Strategy parameters
calculation_mode (str): Current mode ('initialization' or 'incremental')
is_warmed_up (bool): Whether strategy has sufficient data for reliable signals
timeframe_buffers (Dict): Rolling buffers for different timeframes
indicator_states (Dict): Internal indicator calculation states
timeframe_aggregator (TimeframeAggregator): Built-in aggregator for minute data
Example:
class MyIncStrategy(IncStrategyBase):
def get_minimum_buffer_size(self):
return {"15min": 50} # Strategy works on 15min timeframe
def process_data_point(self, timestamp, ohlcv_data):
# Process new data incrementally
self._update_indicators(ohlcv_data)
return self.get_current_signal()
def get_entry_signal(self):
# Generate signal based on current state
if self._should_enter():
return IncStrategySignal.BUY(confidence=0.8)
return IncStrategySignal.HOLD()
# Usage with minute-level data:
strategy = MyIncStrategy(params={"timeframe_minutes": 15})
for minute_data in live_stream:
signal = strategy.process_data_point(minute_data['timestamp'], minute_data)
"""
def __init__(self, name: str, weight: float = 1.0, params: Optional[Dict] = None):
"""
Initialize the incremental strategy base.
Args:
name: Strategy name/identifier
weight: Strategy weight for combination (default: 1.0)
params: Strategy-specific parameters
"""
self.name = name
self.weight = weight
self.params = params or {}
# Calculation state
self._calculation_mode = "initialization"
self._is_warmed_up = False
self._data_points_received = 0
# Data management
self._timeframe_buffers = {}
self._timeframe_last_update = {}
self._indicator_states = {}
self._last_signals = {}
self._signal_history = deque(maxlen=100) # Keep last 100 signals
# Performance tracking
self._performance_metrics = {
'update_times': deque(maxlen=1000),
'signal_generation_times': deque(maxlen=1000),
'state_validation_failures': 0,
'data_gaps_handled': 0,
'minute_data_points_processed': 0,
'timeframe_bars_completed': 0
}
# Configuration
self._buffer_size_multiplier = 1.5 # Extra buffer for safety
self._state_validation_enabled = True
self._max_acceptable_gap = pd.Timedelta(minutes=5)
# Timeframe aggregation
self._primary_timeframe_minutes = self._extract_timeframe_minutes()
self._timeframe_aggregator = None
if self._primary_timeframe_minutes > 1:
self._timeframe_aggregator = TimeframeAggregator(self._primary_timeframe_minutes)
logger.info(f"Initialized incremental strategy: {self.name}")
def _extract_timeframe_minutes(self) -> int:
"""Extract timeframe in minutes from strategy parameters."""
timeframe = self.params.get("timeframe", "1min")
if isinstance(timeframe, str):
if timeframe.endswith("min"):
return int(timeframe[:-3])
elif timeframe.endswith("h"):
return int(timeframe[:-1]) * 60
elif timeframe.endswith("d"):
return int(timeframe[:-1]) * 24 * 60
elif isinstance(timeframe, int):
return timeframe
# Default to 1 minute
return 1
def process_data_point(self, timestamp: pd.Timestamp, ohlcv_data: Dict[str, float]) -> Optional[IncStrategySignal]:
"""
Process a new data point and return signal if generated.
This is the main entry point for incremental processing. It handles
timeframe aggregation, buffer updates, and signal generation.
Args:
timestamp: Timestamp of the data point
ohlcv_data: OHLCV data dictionary
Returns:
IncStrategySignal if a signal is generated, None otherwise
"""
start_time = time.time()
try:
# Update performance metrics
self._performance_metrics['minute_data_points_processed'] += 1
self._data_points_received += 1
# Handle timeframe aggregation if needed
if self._timeframe_aggregator is not None:
completed_bar = self._timeframe_aggregator.update(timestamp, ohlcv_data)
if completed_bar is not None:
# Process the completed timeframe bar
self._performance_metrics['timeframe_bars_completed'] += 1
return self._process_timeframe_bar(completed_bar['timestamp'], completed_bar)
else:
# No complete bar yet, return None
return None
else:
# Process minute data directly
return self._process_timeframe_bar(timestamp, ohlcv_data)
except Exception as e:
logger.error(f"Error processing data point in {self.name}: {e}")
return None
finally:
# Track processing time
processing_time = time.time() - start_time
self._performance_metrics['update_times'].append(processing_time)
def _process_timeframe_bar(self, timestamp: pd.Timestamp, ohlcv_data: Dict[str, float]) -> Optional[IncStrategySignal]:
"""Process a complete timeframe bar and generate signals."""
# Update timeframe buffers
self._update_timeframe_buffers(ohlcv_data, timestamp)
# Call strategy-specific calculation
self.calculate_on_data(ohlcv_data, timestamp)
# Check if strategy is warmed up
if not self._is_warmed_up:
self._check_warmup_status()
# Generate signal if warmed up
if self._is_warmed_up:
signal_start = time.time()
signal = self.get_current_signal()
signal_time = time.time() - signal_start
self._performance_metrics['signal_generation_times'].append(signal_time)
# Store signal in history
if signal and signal.signal_type != "HOLD":
self._signal_history.append({
'timestamp': timestamp,
'signal': signal,
'strategy_state': self.get_current_state_summary()
})
return signal
return None
def _check_warmup_status(self):
"""Check if strategy has enough data to be considered warmed up."""
min_buffer_sizes = self.get_minimum_buffer_size()
for timeframe, min_size in min_buffer_sizes.items():
buffer = self._timeframe_buffers.get(timeframe, deque())
if len(buffer) < min_size:
return # Not enough data yet
# All buffers have sufficient data
self._is_warmed_up = True
self._calculation_mode = "incremental"
logger.info(f"Strategy {self.name} is now warmed up after {self._data_points_received} data points")
def get_current_signal(self) -> IncStrategySignal:
"""Get the current signal based on strategy state."""
# Try entry signal first
entry_signal = self.get_entry_signal()
if entry_signal and entry_signal.signal_type != "HOLD":
return entry_signal
# Check exit signal
exit_signal = self.get_exit_signal()
if exit_signal and exit_signal.signal_type != "HOLD":
return exit_signal
# Default to hold
return IncStrategySignal.HOLD()
def get_current_incomplete_bar(self) -> Optional[Dict[str, float]]:
"""Get current incomplete timeframe bar (for debugging)."""
if self._timeframe_aggregator is not None:
return self._timeframe_aggregator.get_current_bar()
return None
# Properties
@property
def calculation_mode(self) -> str:
"""Get current calculation mode."""
return self._calculation_mode
@property
def is_warmed_up(self) -> bool:
"""Check if strategy is warmed up."""
return self._is_warmed_up
# Abstract methods that must be implemented by strategies
@abstractmethod
def get_minimum_buffer_size(self) -> Dict[str, int]:
"""
Get minimum buffer sizes for each timeframe.
This method specifies how much historical data the strategy needs
for each timeframe to generate reliable signals.
Returns:
Dict[str, int]: Mapping of timeframe to minimum buffer size
Example:
return {"15min": 50, "1h": 24} # 50 15min bars, 24 1h bars
"""
pass
@abstractmethod
def calculate_on_data(self, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None:
"""
Process new data point and update internal indicators.
This method is called for each new timeframe bar and should update
all internal indicators and strategy state incrementally.
Args:
new_data_point: New OHLCV data point
timestamp: Timestamp of the data point
"""
pass
@abstractmethod
def supports_incremental_calculation(self) -> bool:
"""
Check if strategy supports incremental calculation.
Returns:
bool: True if strategy can process data incrementally
"""
pass
@abstractmethod
def get_entry_signal(self) -> IncStrategySignal:
"""
Generate entry signal based on current strategy state.
This method should use the current internal state to determine
whether an entry signal should be generated.
Returns:
IncStrategySignal: Entry signal with confidence level
"""
pass
@abstractmethod
def get_exit_signal(self) -> IncStrategySignal:
"""
Generate exit signal based on current strategy state.
This method should use the current internal state to determine
whether an exit signal should be generated.
Returns:
IncStrategySignal: Exit signal with confidence level
"""
pass
# Utility methods
def get_confidence(self) -> float:
"""
Get strategy confidence for the current market state.
Default implementation returns 1.0. Strategies can override
this to provide dynamic confidence based on market conditions.
Returns:
float: Confidence level (0.0 to 1.0)
"""
return 1.0
def reset_calculation_state(self) -> None:
"""Reset internal calculation state for reinitialization."""
self._calculation_mode = "initialization"
self._is_warmed_up = False
self._data_points_received = 0
self._timeframe_buffers.clear()
self._timeframe_last_update.clear()
self._indicator_states.clear()
self._last_signals.clear()
self._signal_history.clear()
# Reset timeframe aggregator
if self._timeframe_aggregator is not None:
self._timeframe_aggregator.reset()
# Reset performance metrics
for key in self._performance_metrics:
if isinstance(self._performance_metrics[key], deque):
self._performance_metrics[key].clear()
else:
self._performance_metrics[key] = 0
def get_current_state_summary(self) -> Dict[str, Any]:
"""Get summary of current calculation state for debugging."""
return {
'strategy_name': self.name,
'calculation_mode': self._calculation_mode,
'is_warmed_up': self._is_warmed_up,
'data_points_received': self._data_points_received,
'timeframes': list(self._timeframe_buffers.keys()),
'buffer_sizes': {tf: len(buf) for tf, buf in self._timeframe_buffers.items()},
'indicator_states': {name: state.get_state_summary() if hasattr(state, 'get_state_summary') else str(state)
for name, state in self._indicator_states.items()},
'last_signals': self._last_signals,
'timeframe_aggregator': {
'enabled': self._timeframe_aggregator is not None,
'primary_timeframe_minutes': self._primary_timeframe_minutes,
'current_incomplete_bar': self.get_current_incomplete_bar()
},
'performance_metrics': {
'avg_update_time': sum(self._performance_metrics['update_times']) / len(self._performance_metrics['update_times'])
if self._performance_metrics['update_times'] else 0,
'avg_signal_time': sum(self._performance_metrics['signal_generation_times']) / len(self._performance_metrics['signal_generation_times'])
if self._performance_metrics['signal_generation_times'] else 0,
'validation_failures': self._performance_metrics['state_validation_failures'],
'data_gaps_handled': self._performance_metrics['data_gaps_handled'],
'minute_data_points_processed': self._performance_metrics['minute_data_points_processed'],
'timeframe_bars_completed': self._performance_metrics['timeframe_bars_completed']
}
}
def _update_timeframe_buffers(self, new_data_point: Dict[str, float], timestamp: pd.Timestamp) -> None:
"""Update all timeframe buffers with new data point."""
# Get minimum buffer sizes
min_buffer_sizes = self.get_minimum_buffer_size()
for timeframe in min_buffer_sizes.keys():
# Calculate actual buffer size with multiplier
min_size = min_buffer_sizes[timeframe]
actual_buffer_size = int(min_size * self._buffer_size_multiplier)
# Initialize buffer if needed
if timeframe not in self._timeframe_buffers:
self._timeframe_buffers[timeframe] = deque(maxlen=actual_buffer_size)
self._timeframe_last_update[timeframe] = None
# Add data point to buffer
data_point = new_data_point.copy()
data_point['timestamp'] = timestamp
self._timeframe_buffers[timeframe].append(data_point)
self._timeframe_last_update[timeframe] = timestamp
def _get_timeframe_buffer(self, timeframe: str) -> pd.DataFrame:
"""Get current buffer for specific timeframe as DataFrame."""
if timeframe not in self._timeframe_buffers:
return pd.DataFrame()
buffer_data = list(self._timeframe_buffers[timeframe])
if not buffer_data:
return pd.DataFrame()
df = pd.DataFrame(buffer_data)
if 'timestamp' in df.columns:
df = df.set_index('timestamp')
return df
def handle_data_gap(self, gap_duration: pd.Timedelta) -> None:
"""Handle gaps in data stream."""
self._performance_metrics['data_gaps_handled'] += 1
if gap_duration > self._max_acceptable_gap:
logger.warning(f"Data gap {gap_duration} exceeds maximum acceptable gap {self._max_acceptable_gap}")
self._trigger_reinitialization()
else:
logger.info(f"Handling acceptable data gap: {gap_duration}")
# For small gaps, continue with current state
def _trigger_reinitialization(self) -> None:
"""Trigger strategy reinitialization due to data gap or corruption."""
logger.info(f"Triggering reinitialization for strategy {self.name}")
self.reset_calculation_state()
# Compatibility methods for original strategy interface
def get_timeframes(self) -> List[str]:
"""Get required timeframes (compatibility method)."""
return list(self.get_minimum_buffer_size().keys())
def initialize(self, backtester) -> None:
"""Initialize strategy (compatibility method)."""
# This method provides compatibility with the original strategy interface
# The actual initialization happens through the incremental interface
self.initialized = True
logger.info(f"Incremental strategy {self.name} initialized in compatibility mode")
def __repr__(self) -> str:
"""String representation of the strategy."""
return (f"{self.__class__.__name__}(name={self.name}, "
f"weight={self.weight}, mode={self._calculation_mode}, "
f"warmed_up={self._is_warmed_up}, "
f"data_points={self._data_points_received})")