165 lines
5.1 KiB
Markdown
165 lines
5.1 KiB
Markdown
|
|
## Architecture Components
|
||
|
|
|
||
|
|
### 1. Data Collector
|
||
|
|
**Responsibility**: Unified data collection from multiple exchanges
|
||
|
|
```python
|
||
|
|
class DataCollector:
|
||
|
|
def __init__(self):
|
||
|
|
self.providers = {} # Registry of data providers
|
||
|
|
|
||
|
|
def register_provider(self, name: str, provider: DataProvider):
|
||
|
|
"""Register a new data provider"""
|
||
|
|
|
||
|
|
def start_collection(self, symbols: List[str]):
|
||
|
|
"""Start collecting data for specified symbols"""
|
||
|
|
|
||
|
|
def process_raw_data(self, raw_data: dict):
|
||
|
|
"""Process raw data into OHLCV format"""
|
||
|
|
|
||
|
|
def send_signal_to_bots(self, processed_data: dict):
|
||
|
|
"""Send Redis signal to active bots"""
|
||
|
|
```
|
||
|
|
|
||
|
|
### 2. Strategy Engine
|
||
|
|
**Responsibility**: Unified interface for all trading strategies
|
||
|
|
```python
|
||
|
|
class BaseStrategy:
|
||
|
|
def __init__(self, parameters: dict):
|
||
|
|
self.parameters = parameters
|
||
|
|
|
||
|
|
def process_data(self, data: pd.DataFrame) -> Signal:
|
||
|
|
"""Process market data and generate signals"""
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
def get_indicators(self) -> dict:
|
||
|
|
"""Return calculated indicators for plotting"""
|
||
|
|
return {}
|
||
|
|
```
|
||
|
|
|
||
|
|
### 3. Bot Manager
|
||
|
|
**Responsibility**: Orchestrate bot execution and state management
|
||
|
|
```python
|
||
|
|
class BotManager:
|
||
|
|
def __init__(self):
|
||
|
|
self.active_bots = {}
|
||
|
|
|
||
|
|
def start_bot(self, bot_id: int):
|
||
|
|
"""Start a bot instance"""
|
||
|
|
|
||
|
|
def stop_bot(self, bot_id: int):
|
||
|
|
"""Stop a bot instance"""
|
||
|
|
|
||
|
|
def process_signal(self, bot_id: int, signal: Signal):
|
||
|
|
"""Process signal and make trading decision"""
|
||
|
|
|
||
|
|
def update_bot_state(self, bot_id: int, state: dict):
|
||
|
|
"""Update bot state in database"""
|
||
|
|
```
|
||
|
|
|
||
|
|
## Communication Architecture
|
||
|
|
|
||
|
|
### Redis Pub/Sub Patterns
|
||
|
|
```python
|
||
|
|
# Real-time market data
|
||
|
|
MARKET_DATA_CHANNEL = "market_data:{symbol}"
|
||
|
|
|
||
|
|
# Bot-specific signals
|
||
|
|
BOT_SIGNAL_CHANNEL = "bot_signals:{bot_id}"
|
||
|
|
|
||
|
|
# Trade updates
|
||
|
|
TRADE_UPDATE_CHANNEL = "trade_updates:{bot_id}"
|
||
|
|
|
||
|
|
# System events
|
||
|
|
SYSTEM_EVENT_CHANNEL = "system_events"
|
||
|
|
```
|
||
|
|
|
||
|
|
### WebSocket Communication
|
||
|
|
```python
|
||
|
|
# Frontend real-time updates
|
||
|
|
WS_BOT_STATUS = "/ws/bot/{bot_id}/status"
|
||
|
|
WS_MARKET_DATA = "/ws/market/{symbol}"
|
||
|
|
WS_PORTFOLIO = "/ws/portfolio/{bot_id}"
|
||
|
|
```
|
||
|
|
|
||
|
|
## Time Aggregation Strategy
|
||
|
|
|
||
|
|
### Candlestick Alignment
|
||
|
|
- **Use RIGHT-ALIGNED timestamps** (industry standard)
|
||
|
|
- 5-minute candle with timestamp 09:05:00 represents data from 09:00:01 to 09:05:00
|
||
|
|
- Timestamp = close time of the candle
|
||
|
|
- Aligns with major exchanges (Binance, OKX, Coinbase)
|
||
|
|
|
||
|
|
### Aggregation Logic
|
||
|
|
```python
|
||
|
|
def aggregate_to_timeframe(ticks: List[dict], timeframe: str) -> dict:
|
||
|
|
"""
|
||
|
|
Aggregate tick data to specified timeframe
|
||
|
|
timeframe: '1m', '5m', '15m', '1h', '4h', '1d'
|
||
|
|
"""
|
||
|
|
# Convert timeframe to seconds
|
||
|
|
interval_seconds = parse_timeframe(timeframe)
|
||
|
|
|
||
|
|
# Group ticks by time intervals (right-aligned)
|
||
|
|
for group in group_by_interval(ticks, interval_seconds):
|
||
|
|
candle = {
|
||
|
|
'timestamp': group.end_time, # Right-aligned
|
||
|
|
'open': group.first_price,
|
||
|
|
'high': group.max_price,
|
||
|
|
'low': group.min_price,
|
||
|
|
'close': group.last_price,
|
||
|
|
'volume': group.total_volume
|
||
|
|
}
|
||
|
|
yield candle
|
||
|
|
```
|
||
|
|
|
||
|
|
## Backtesting Optimization
|
||
|
|
|
||
|
|
### Parallel Processing Strategy
|
||
|
|
```python
|
||
|
|
import multiprocessing as mp
|
||
|
|
from joblib import Parallel, delayed
|
||
|
|
import numba
|
||
|
|
|
||
|
|
@numba.jit(nopython=True)
|
||
|
|
def calculate_signals_vectorized(prices, parameters):
|
||
|
|
"""Vectorized signal calculation using Numba"""
|
||
|
|
# High-performance signal calculation
|
||
|
|
return signals
|
||
|
|
|
||
|
|
def backtest_strategy_batch(data_batch, strategy_params):
|
||
|
|
"""Backtest a batch of data in parallel"""
|
||
|
|
# Process batch of signals
|
||
|
|
signals = calculate_signals_vectorized(data_batch, strategy_params)
|
||
|
|
|
||
|
|
# Simulate trades incrementally
|
||
|
|
portfolio = simulate_trades(signals, data_batch)
|
||
|
|
return portfolio
|
||
|
|
|
||
|
|
# Parallel backtesting
|
||
|
|
def run_parallel_backtest(data, strategy_params, n_jobs=4):
|
||
|
|
data_batches = split_data_into_batches(data, n_jobs)
|
||
|
|
|
||
|
|
results = Parallel(n_jobs=n_jobs)(
|
||
|
|
delayed(backtest_strategy_batch)(batch, strategy_params)
|
||
|
|
for batch in data_batches
|
||
|
|
)
|
||
|
|
|
||
|
|
return combine_results(results)
|
||
|
|
```
|
||
|
|
|
||
|
|
### Optimization Techniques
|
||
|
|
1. **Vectorized Operations**: Use NumPy/Pandas for bulk calculations
|
||
|
|
2. **Numba JIT**: Compile critical loops for C-like performance
|
||
|
|
3. **Batch Processing**: Process signals in batches, simulate trades incrementally
|
||
|
|
4. **Memory Management**: Use efficient data structures (arrays vs lists)
|
||
|
|
5. **Parallel Execution**: Utilize multiple CPU cores for independent calculations
|
||
|
|
|
||
|
|
## Key Design Principles
|
||
|
|
|
||
|
|
1. **Data Separation**: Raw and processed data stored separately for audit trail
|
||
|
|
2. **Signal Tracking**: All signals recorded (executed or not) for analysis
|
||
|
|
3. **Real-time State**: Bot states updated in real-time for monitoring
|
||
|
|
4. **Audit Trail**: Complete record of all trading activities
|
||
|
|
5. **Scalability**: Architecture supports multiple bots and strategies
|
||
|
|
6. **Modularity**: Clear separation between data collection, strategy execution, and trading
|
||
|
|
7. **Fault Tolerance**: Redis for reliable message delivery, database transactions for consistency
|