- Introduced `example_complete_series_aggregation.py` to demonstrate time series aggregation, emitting candles even when no trades occur. - Implemented `CompleteSeriesProcessor` extending `RealTimeCandleProcessor` to handle time-based candle emission and empty candle creation. - Refactored `OKXCollector` to utilize the new repository pattern for database operations, enhancing modularity and maintainability. - Updated database operations to centralize data handling through `DatabaseOperations`, improving error handling and logging. - Enhanced documentation to include details on the new aggregation example and repository pattern implementation, ensuring clarity for users.
10 KiB
10 KiB
Architecture Components
1. Data Collector
Responsibility: OHLCV data collection and aggregation from exchanges
class DataCollector:
def __init__(self):
self.providers = {} # Registry of data providers
self.store_raw_data = False # Optional raw data storage
def register_provider(self, name: str, provider: DataProvider):
"""Register a new data provider"""
def start_collection(self, symbols: List[str], timeframes: List[str]):
"""Start collecting OHLCV data for specified symbols and timeframes"""
def process_raw_trades(self, raw_trades: List[dict]) -> dict:
"""Aggregate raw trades into OHLCV candles"""
def store_ohlcv_data(self, ohlcv_data: dict):
"""Store OHLCV data in PostgreSQL market_data table"""
def send_market_update(self, symbol: str, ohlcv_data: dict):
"""Send Redis signal with OHLCV update to active bots"""
def store_raw_data_optional(self, raw_data: dict):
"""Optionally store raw data for detailed backtesting"""
2. Strategy Engine
Responsibility: Unified interface for all trading strategies
class BaseStrategy:
def __init__(self, parameters: dict):
self.parameters = parameters
def process_data(self, data: pd.DataFrame) -> Signal:
"""Process market data and generate signals"""
raise NotImplementedError
def get_indicators(self) -> dict:
"""Return calculated indicators for plotting"""
return {}
3. Bot Manager
Responsibility: Orchestrate bot execution and state management
class BotManager:
def __init__(self):
self.active_bots = {}
self.config_path = "config/bots/"
def load_bot_config(self, bot_id: int) -> dict:
"""Load bot configuration from JSON file"""
def start_bot(self, bot_id: int):
"""Start a bot instance with crash recovery monitoring"""
def stop_bot(self, bot_id: int):
"""Stop a bot instance and update database status"""
def process_signal(self, bot_id: int, signal: Signal):
"""Process signal and make virtual trading decision"""
def update_bot_heartbeat(self, bot_id: int):
"""Update bot heartbeat in database for monitoring"""
def restart_crashed_bots(self):
"""Monitor and restart crashed bots (max 3 attempts/hour)"""
def restore_active_bots_on_startup(self):
"""Restore active bot states after application restart"""
Communication Architecture
Redis Pub/Sub Patterns
# Real-time market data distribution
MARKET_DATA_CHANNEL = "market:{symbol}" # OHLCV updates
BOT_SIGNALS_CHANNEL = "signals:{bot_id}" # Trading decisions
BOT_STATUS_CHANNEL = "status:{bot_id}" # Bot lifecycle events
SYSTEM_EVENTS_CHANNEL = "system:events" # Global notifications
Time Aggregation Strategy
Candlestick Alignment
- Use RIGHT-ALIGNED timestamps (industry standard)
- 5-minute candle with timestamp 09:05:00 represents data from 09:00:01 to 09:05:00
- Timestamp = close time of the candle
- Aligns with major exchanges (Binance, OKX, Coinbase)
Aggregation Logic
def aggregate_to_timeframe(ticks: List[dict], timeframe: str) -> dict:
"""
Aggregate tick data to specified timeframe
timeframe: '1m', '5m', '15m', '1h', '4h', '1d'
"""
# Convert timeframe to seconds
interval_seconds = parse_timeframe(timeframe)
# Group ticks by time intervals (right-aligned)
for group in group_by_interval(ticks, interval_seconds):
candle = {
'timestamp': group.end_time, # Right-aligned
'open': group.first_price,
'high': group.max_price,
'low': group.min_price,
'close': group.last_price,
'volume': group.total_volume
}
yield candle
Backtesting Strategy
Vectorized Processing Approach
import pandas as pd
import numpy as np
def backtest_strategy_simple(strategy, market_data: pd.DataFrame, initial_balance: float = 10000):
"""
Simple vectorized backtesting using pandas operations
Parameters:
- strategy: Strategy instance with process_data method
- market_data: DataFrame with OHLCV data
- initial_balance: Starting portfolio value
Returns:
- Portfolio performance metrics and trade history
"""
# Calculate all signals at once using vectorized operations
signals = []
portfolio_value = []
current_balance = initial_balance
position = 0
for idx, row in market_data.iterrows():
# Get signal from strategy
signal = strategy.process_data(market_data.iloc[:idx+1])
# Simulate trade execution
if signal.action == 'buy' and position == 0:
position = current_balance / row['close']
current_balance = 0
elif signal.action == 'sell' and position > 0:
current_balance = position * row['close'] * 0.999 # 0.1% fee
position = 0
# Track portfolio value
total_value = current_balance + (position * row['close'])
portfolio_value.append(total_value)
signals.append(signal)
return {
'final_value': portfolio_value[-1],
'total_return': (portfolio_value[-1] / initial_balance - 1) * 100,
'signals': signals,
'portfolio_progression': portfolio_value
}
def calculate_performance_metrics(portfolio_values: List[float]) -> dict:
"""Calculate standard performance metrics"""
returns = pd.Series(portfolio_values).pct_change().dropna()
return {
'sharpe_ratio': returns.mean() / returns.std() if returns.std() > 0 else 0,
'max_drawdown': (pd.Series(portfolio_values).cummax() - pd.Series(portfolio_values)).max(),
'win_rate': (returns > 0).mean(),
'total_trades': len(returns)
}
Optimization Techniques
- Vectorized Operations: Use pandas for bulk data processing
- Efficient Indexing: Pre-calculate indicators where possible
- Memory Management: Process data in chunks for large datasets
- Simple Parallelization: Run multiple strategy tests independently
Key Design Principles
- OHLCV-First Data Strategy: Primary focus on aggregated candle data, optional raw data storage
- Signal Tracking: All trading signals recorded in database for analysis and debugging
- JSON Configuration: Strategy parameters and bot configs in JSON for rapid testing
- Real-time State Management: Bot states updated via Redis and PostgreSQL for monitoring
- Crash Recovery: Automatic bot restart and application state recovery
- Virtual Trading: Simulation-first approach with fee modeling
- Simplified Architecture: Monolithic design with clear component boundaries for future scaling
Repository Pattern for Database Operations
Database Abstraction Layer
The system uses the Repository Pattern to abstract database operations from business logic, providing a clean, maintainable, and testable interface for all data access.
# Centralized database operations
from database.operations import get_database_operations
class DataCollector:
def __init__(self):
# Use repository pattern instead of direct SQL
self.db = get_database_operations()
def store_candle(self, candle: OHLCVCandle):
"""Store candle using repository pattern"""
success = self.db.market_data.upsert_candle(candle, force_update=False)
def store_raw_trade(self, data_point: MarketDataPoint):
"""Store raw trade data using repository pattern"""
success = self.db.raw_trades.insert_market_data_point(data_point)
Repository Structure
# Clean API for database operations
class DatabaseOperations:
def __init__(self):
self.market_data = MarketDataRepository() # Candle operations
self.raw_trades = RawTradeRepository() # Raw data operations
def health_check(self) -> bool:
"""Check database connection health"""
def get_stats(self) -> dict:
"""Get database statistics and metrics"""
class MarketDataRepository:
def upsert_candle(self, candle: OHLCVCandle, force_update: bool = False) -> bool:
"""Store or update candle with duplicate handling"""
def get_candles(self, symbol: str, timeframe: str, start: datetime, end: datetime) -> List[dict]:
"""Retrieve historical candle data"""
def get_latest_candle(self, symbol: str, timeframe: str) -> Optional[dict]:
"""Get most recent candle for symbol/timeframe"""
class RawTradeRepository:
def insert_market_data_point(self, data_point: MarketDataPoint) -> bool:
"""Store raw WebSocket data"""
def get_raw_trades(self, symbol: str, data_type: str, start: datetime, end: datetime) -> List[dict]:
"""Retrieve raw trade data for analysis"""
Benefits of Repository Pattern
- No Raw SQL: Business logic never contains direct SQL queries
- Centralized Operations: All database interactions go through well-defined APIs
- Easy Testing: Repository methods can be easily mocked for unit tests
- Database Agnostic: Can change database implementations without affecting business logic
- Automatic Transaction Management: Sessions, commits, and rollbacks handled automatically
- Consistent Error Handling: Custom exceptions with proper context
- Type Safety: Full type hints for better IDE support and error detection
Database Architecture
Core Tables
- market_data: OHLCV candles for bot operations and backtesting (primary table)
- bots: Bot instances with JSON config references and status tracking
- signals: Trading decisions with confidence scores and indicator values
- trades: Virtual trade execution records with P&L tracking
- bot_performance: Portfolio snapshots for performance visualization
Optional Tables
- raw_trades: Raw tick data for advanced backtesting (partitioned by month)
Data Access Patterns
- Real-time: Bots read recent OHLCV data via indexes on (symbol, timeframe, timestamp)
- Historical: Dashboard queries aggregated performance data for charts
- Backtesting: Sequential access to historical OHLCV data by date range