12 KiB
Architecture Components Documentation
Overview
The Cycles framework has been refactored into a modular architecture with specialized components for different aspects of the backtesting workflow. This document provides detailed information about the new architectural components and how to use them.
🏗️ Component Architecture
New Components:
├── 🎯 BacktestApplication # Main workflow orchestration
├── ⚙️ ConfigManager # Configuration management
├── 📊 ResultsProcessor # Results processing & metrics
├── 🚀 BacktestRunner # Backtest execution logic
└── 📦 TimeframeTask # Individual task encapsulation
⚙️ ConfigManager
Purpose: Centralized configuration loading, validation, and access
Features
- Automatic Validation: Validates configuration structure and required fields
- Type-Safe Access: Property-based access to configuration values
- Smart Defaults: Automatic fallbacks (e.g., current date for stop_date)
- Reusable Configs: Generate configurations for different components
Basic Usage
from cycles.utils import ConfigManager
# Initialize with config file
config_manager = ConfigManager("configs/config_bbrs.json")
# Access configuration properties
start_date = config_manager.start_date
initial_usd = config_manager.initial_usd
timeframes = config_manager.timeframes
# Get specialized configurations
strategy_config = config_manager.get_strategy_manager_config()
task_config = config_manager.get_timeframe_task_config("15min")
Configuration Properties
| Property | Type | Description |
|---|---|---|
start_date |
str |
Backtest start date |
stop_date |
str |
Backtest end date (auto-defaults to current) |
initial_usd |
float |
Initial portfolio value |
timeframes |
List[str] |
List of timeframes to test |
strategies_config |
List[Dict] |
Strategy configurations |
combination_rules |
Dict |
Signal combination rules |
Configuration Methods
# Get strategy manager configuration
strategy_config = config_manager.get_strategy_manager_config()
# Returns: {"strategies": [...], "combination_rules": {...}}
# Get timeframe-specific task configuration
task_config = config_manager.get_timeframe_task_config("15min")
# Returns: {"initial_usd": 10000, "strategies": [...], "combination_rules": {...}}
Error Handling
try:
config_manager = ConfigManager("invalid_config.json")
except FileNotFoundError as e:
print(f"Config file not found: {e}")
except ValueError as e:
print(f"Invalid configuration: {e}")
📊 ResultsProcessor & BacktestMetrics
Purpose: Unified processing, aggregation, and analysis of backtest results
BacktestMetrics (Static Utilities)
from cycles.utils import BacktestMetrics
# Calculate trade-level metrics
trades = [{"profit_pct": 0.05}, {"profit_pct": -0.02}]
trade_metrics = BacktestMetrics.calculate_trade_metrics(trades)
# Returns: {n_trades, win_rate, max_drawdown, avg_trade, ...}
# Calculate portfolio-level metrics
portfolio_metrics = BacktestMetrics.calculate_portfolio_metrics(trades, 10000)
# Returns: {initial_usd, final_usd, total_fees_usd, total_return}
ResultsProcessor (Instance-Based)
from cycles.utils import ResultsProcessor
processor = ResultsProcessor()
# Process single backtest results
summary_row, trade_rows = processor.process_backtest_results(
results=backtest_results,
timeframe="15min",
config=task_config,
strategy_summary=strategy_summary
)
# Aggregate multiple results
aggregated = processor.aggregate_results(all_result_rows)
# Create metadata for output files
metadata_lines = processor.create_metadata_lines(config_manager, data_1min)
Available Metrics
Trade Metrics
n_trades: Total number of tradesn_winning_trades: Number of profitable tradeswin_rate: Percentage of winning tradestotal_profit: Sum of all profitable tradestotal_loss: Sum of all losing tradesavg_trade: Average trade returnprofit_ratio: Ratio of total profit to total lossmax_drawdown: Maximum portfolio drawdown
Portfolio Metrics
initial_usd: Starting portfolio valuefinal_usd: Ending portfolio valuetotal_fees_usd: Total trading feestotal_return: Overall portfolio return percentage
🚀 BacktestRunner & TimeframeTask
Purpose: Modular backtest execution with clean separation of concerns
BacktestRunner
from cycles.utils import BacktestRunner
runner = BacktestRunner()
# Run single timeframe backtest
summary_row, trade_rows = runner.run_single_timeframe(
data_1min=market_data,
timeframe="15min",
config=task_config,
debug=False
)
BacktestRunner Methods
| Method | Purpose | Returns |
|---|---|---|
run_single_timeframe() |
Execute backtest for one timeframe | (summary_row, trade_rows) |
_create_strategy_manager() |
Create strategy manager from config | StrategyManager |
_setup_backtester() |
Setup backtester with data and strategies | Backtest |
_execute_backtest() |
Run the backtest | Dict[results] |
_handle_debug_plotting() |
Handle debug mode plotting | None |
TimeframeTask
Purpose: Encapsulates a single timeframe backtest task for easy execution and parallelization
from cycles.utils import TimeframeTask, create_timeframe_tasks
# Create individual task
task = TimeframeTask(
timeframe="15min",
data_1min=market_data,
config=task_config
)
# Execute task
summary_row, trade_rows = task.execute(debug=False)
# Create multiple tasks from configuration
tasks = create_timeframe_tasks(
timeframes=["5min", "15min", "1h"],
data_1min=market_data,
config_manager=config_manager
)
# Execute all tasks
for task in tasks:
results = task.execute()
Parallel Execution
import concurrent.futures
from cycles.utils import create_timeframe_tasks
# Create tasks
tasks = create_timeframe_tasks(timeframes, data_1min, config_manager)
# Execute in parallel
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(task.execute, False): task for task in tasks}
for future in concurrent.futures.as_completed(futures):
task = futures[future]
try:
summary_row, trade_rows = future.result()
print(f"Completed: {task.timeframe}")
except Exception as e:
print(f"Failed: {task.timeframe} - {e}")
🎯 BacktestApplication
Purpose: Main application orchestration class that coordinates the entire workflow
Complete Workflow
from cycles.application import BacktestApplication
# Simple usage
app = BacktestApplication("configs/config_combined.json")
app.run(debug=False)
Workflow Steps
class BacktestApplication:
def run(self, debug=False):
# 1. Load data
data_1min = self.load_data()
# 2. Create tasks
tasks = self.create_tasks(data_1min)
# 3. Execute tasks (parallel or debug mode)
results_rows, trade_rows = self.execute_tasks(tasks, debug)
# 4. Save results
self.save_results(results_rows, trade_rows, data_1min)
Custom Application
from cycles.application import BacktestApplication
class CustomBacktestApplication(BacktestApplication):
def execute_tasks(self, tasks, debug=False):
# Custom execution logic
# Maybe with custom progress tracking
results = []
for i, task in enumerate(tasks):
print(f"Processing {i+1}/{len(tasks)}: {task.timeframe}")
result = task.execute(debug)
results.append(result)
return results
def save_results(self, results_rows, trade_rows, data_1min):
# Custom result saving
super().save_results(results_rows, trade_rows, data_1min)
# Additional custom processing
self.send_email_notification(results_rows)
# Usage
app = CustomBacktestApplication("config.json")
app.run()
🔧 Component Integration Examples
Simple Integration
# All-in-one approach
from cycles.application import BacktestApplication
app = BacktestApplication("config.json")
app.run(debug=False)
Modular Integration
# Step-by-step approach using individual components
from cycles.utils import ConfigManager, BacktestRunner, ResultsProcessor
# 1. Configuration
config_manager = ConfigManager("config.json")
# 2. Data loading (using existing storage utilities)
from cycles.utils import Storage
storage = Storage()
data_1min = storage.load_data('btcusd_1-min_data.csv',
config_manager.start_date,
config_manager.stop_date)
# 3. Execution
runner = BacktestRunner()
all_results = []
for timeframe in config_manager.timeframes:
task_config = config_manager.get_timeframe_task_config(timeframe)
summary_row, trade_rows = runner.run_single_timeframe(
data_1min, timeframe, task_config
)
all_results.extend(trade_rows)
# 4. Processing
processor = ResultsProcessor()
final_results = processor.aggregate_results(all_results)
Custom Workflow
# Custom workflow for specific needs
from cycles.utils import ConfigManager, BacktestRunner
config_manager = ConfigManager("config.json")
runner = BacktestRunner()
# Custom data preparation
custom_data = prepare_custom_data(config_manager.start_date)
# Custom configuration modification
for strategy in config_manager.strategies_config:
if strategy['name'] == 'default':
strategy['params']['stop_loss_pct'] = 0.02 # Custom stop loss
# Custom execution with monitoring
for timeframe in config_manager.timeframes:
print(f"Starting backtest for {timeframe}")
config = config_manager.get_timeframe_task_config(timeframe)
try:
results = runner.run_single_timeframe(custom_data, timeframe, config)
process_custom_results(results, timeframe)
except Exception as e:
handle_custom_error(e, timeframe)
🎨 Extension Points
Custom Configuration Manager
from cycles.utils import ConfigManager
class CustomConfigManager(ConfigManager):
def _validate_config(self):
super()._validate_config()
# Additional custom validation
if self.config.get('custom_field') is None:
raise ValueError("Custom field is required")
@property
def custom_setting(self):
return self.config.get('custom_field', 'default_value')
Custom Results Processor
from cycles.utils import ResultsProcessor
class CustomResultsProcessor(ResultsProcessor):
def process_backtest_results(self, results, timeframe, config, strategy_summary):
# Call parent method
summary_row, trade_rows = super().process_backtest_results(
results, timeframe, config, strategy_summary
)
# Add custom metrics
summary_row['custom_metric'] = self.calculate_custom_metric(trade_rows)
return summary_row, trade_rows
def calculate_custom_metric(self, trades):
# Custom metric calculation
return sum(t['profit_pct'] for t in trades if t['profit_pct'] > 0.05)
🚀 Migration Guide
From Old main.py
Before (Old main.py):
# Scattered configuration
config_file = args.config or "configs/config_default.json"
with open(config_file, 'r') as f:
config = json.load(f)
# Complex processing function
results = process_timeframe_data(data_1min, timeframe, config, debug)
# Manual result aggregation
all_results = []
for task in tasks:
results = process(task, debug)
all_results.extend(results)
After (New Architecture):
# Clean application approach
from cycles.application import BacktestApplication
app = BacktestApplication(config_file)
app.run(debug=debug)
# Or modular approach
from cycles.utils import ConfigManager, BacktestRunner
config_manager = ConfigManager(config_file)
runner = BacktestRunner()
results = runner.run_single_timeframe(data, timeframe, config)
Benefits of Migration
- 🧹 Cleaner Code: Reduced complexity and better organization
- 🔄 Reusability: Components can be used independently
- 🧪 Testability: Each component can be tested in isolation
- 🛠️ Extensibility: Easy to extend and customize components
- 📈 Maintainability: Clear separation of concerns
Note: All new components maintain full backward compatibility with existing configuration files and output formats.