## Architecture Components ### 1. Data Collector **Responsibility**: OHLCV data collection and aggregation from exchanges ```python class DataCollector: def __init__(self): self.providers = {} # Registry of data providers self.store_raw_data = False # Optional raw data storage def register_provider(self, name: str, provider: DataProvider): """Register a new data provider""" def start_collection(self, symbols: List[str], timeframes: List[str]): """Start collecting OHLCV data for specified symbols and timeframes""" def process_raw_trades(self, raw_trades: List[dict]) -> dict: """Aggregate raw trades into OHLCV candles""" def store_ohlcv_data(self, ohlcv_data: dict): """Store OHLCV data in PostgreSQL market_data table""" def send_market_update(self, symbol: str, ohlcv_data: dict): """Send Redis signal with OHLCV update to active bots""" def store_raw_data_optional(self, raw_data: dict): """Optionally store raw data for detailed backtesting""" ``` ### 2. Strategy Engine **Responsibility**: Unified interface for all trading strategies ```python class BaseStrategy: def __init__(self, parameters: dict): self.parameters = parameters def process_data(self, data: pd.DataFrame) -> Signal: """Process market data and generate signals""" raise NotImplementedError def get_indicators(self) -> dict: """Return calculated indicators for plotting""" return {} ``` ### 3. Bot Manager **Responsibility**: Orchestrate bot execution and state management ```python class BotManager: def __init__(self): self.active_bots = {} self.config_path = "config/bots/" def load_bot_config(self, bot_id: int) -> dict: """Load bot configuration from JSON file""" def start_bot(self, bot_id: int): """Start a bot instance with crash recovery monitoring""" def stop_bot(self, bot_id: int): """Stop a bot instance and update database status""" def process_signal(self, bot_id: int, signal: Signal): """Process signal and make virtual trading decision""" def update_bot_heartbeat(self, bot_id: int): """Update bot heartbeat in database for monitoring""" def restart_crashed_bots(self): """Monitor and restart crashed bots (max 3 attempts/hour)""" def restore_active_bots_on_startup(self): """Restore active bot states after application restart""" ``` ## Communication Architecture ### Redis Pub/Sub Patterns ```python # Real-time market data distribution MARKET_DATA_CHANNEL = "market:{symbol}" # OHLCV updates BOT_SIGNALS_CHANNEL = "signals:{bot_id}" # Trading decisions BOT_STATUS_CHANNEL = "status:{bot_id}" # Bot lifecycle events SYSTEM_EVENTS_CHANNEL = "system:events" # Global notifications ``` ## Time Aggregation Strategy ### Candlestick Alignment - **Use RIGHT-ALIGNED timestamps** (industry standard) - 5-minute candle with timestamp 09:05:00 represents data from 09:00:01 to 09:05:00 - Timestamp = close time of the candle - Aligns with major exchanges (Binance, OKX, Coinbase) ### Aggregation Logic ```python def aggregate_to_timeframe(ticks: List[dict], timeframe: str) -> dict: """ Aggregate tick data to specified timeframe timeframe: '1m', '5m', '15m', '1h', '4h', '1d' """ # Convert timeframe to seconds interval_seconds = parse_timeframe(timeframe) # Group ticks by time intervals (right-aligned) for group in group_by_interval(ticks, interval_seconds): candle = { 'timestamp': group.end_time, # Right-aligned 'open': group.first_price, 'high': group.max_price, 'low': group.min_price, 'close': group.last_price, 'volume': group.total_volume } yield candle ``` ## Backtesting Strategy ### Vectorized Processing Approach ```python import pandas as pd import numpy as np def backtest_strategy_simple(strategy, market_data: pd.DataFrame, initial_balance: float = 10000): """ Simple vectorized backtesting using pandas operations Parameters: - strategy: Strategy instance with process_data method - market_data: DataFrame with OHLCV data - initial_balance: Starting portfolio value Returns: - Portfolio performance metrics and trade history """ # Calculate all signals at once using vectorized operations signals = [] portfolio_value = [] current_balance = initial_balance position = 0 for idx, row in market_data.iterrows(): # Get signal from strategy signal = strategy.process_data(market_data.iloc[:idx+1]) # Simulate trade execution if signal.action == 'buy' and position == 0: position = current_balance / row['close'] current_balance = 0 elif signal.action == 'sell' and position > 0: current_balance = position * row['close'] * 0.999 # 0.1% fee position = 0 # Track portfolio value total_value = current_balance + (position * row['close']) portfolio_value.append(total_value) signals.append(signal) return { 'final_value': portfolio_value[-1], 'total_return': (portfolio_value[-1] / initial_balance - 1) * 100, 'signals': signals, 'portfolio_progression': portfolio_value } def calculate_performance_metrics(portfolio_values: List[float]) -> dict: """Calculate standard performance metrics""" returns = pd.Series(portfolio_values).pct_change().dropna() return { 'sharpe_ratio': returns.mean() / returns.std() if returns.std() > 0 else 0, 'max_drawdown': (pd.Series(portfolio_values).cummax() - pd.Series(portfolio_values)).max(), 'win_rate': (returns > 0).mean(), 'total_trades': len(returns) } ``` ### Optimization Techniques 1. **Vectorized Operations**: Use pandas for bulk data processing 2. **Efficient Indexing**: Pre-calculate indicators where possible 3. **Memory Management**: Process data in chunks for large datasets 4. **Simple Parallelization**: Run multiple strategy tests independently ## Key Design Principles 1. **OHLCV-First Data Strategy**: Primary focus on aggregated candle data, optional raw data storage 2. **Signal Tracking**: All trading signals recorded in database for analysis and debugging 3. **JSON Configuration**: Strategy parameters and bot configs in JSON for rapid testing 4. **Real-time State Management**: Bot states updated via Redis and PostgreSQL for monitoring 5. **Crash Recovery**: Automatic bot restart and application state recovery 6. **Virtual Trading**: Simulation-first approach with fee modeling 7. **Simplified Architecture**: Monolithic design with clear component boundaries for future scaling ## Database Architecture ### Core Tables - **market_data**: OHLCV candles for bot operations and backtesting (primary table) - **bots**: Bot instances with JSON config references and status tracking - **signals**: Trading decisions with confidence scores and indicator values - **trades**: Virtual trade execution records with P&L tracking - **bot_performance**: Portfolio snapshots for performance visualization ### Optional Tables - **raw_trades**: Raw tick data for advanced backtesting (partitioned by month) ### Data Access Patterns - **Real-time**: Bots read recent OHLCV data via indexes on (symbol, timeframe, timestamp) - **Historical**: Dashboard queries aggregated performance data for charts - **Backtesting**: Sequential access to historical OHLCV data by date range