revert b71faa97589d220e5046b4d530d1660f6f0abe9a

revert refactor for modularity
This commit is contained in:
vasily 2025-05-23 12:47:59 +00:00
parent b71faa9758
commit 65ae3060de
11 changed files with 416 additions and 1669 deletions

343
README.md
View File

@ -1,27 +1,25 @@
# Cycles - Advanced Trading Strategy Backtesting Framework
A Python framework for backtesting cryptocurrency trading strategies with multi-timeframe analysis, strategy combination, and advanced signal processing. **Recently refactored** for improved modularity, reusability, and maintainability.
A sophisticated Python framework for backtesting cryptocurrency trading strategies with multi-timeframe analysis, strategy combination, and advanced signal processing.
## ✨ Key Features
## Features
- **🏗️ Modular Architecture**: Clean separation of concerns with reusable components
- **🔧 Multi-Strategy System**: Combine multiple trading strategies with configurable weights and rules
- **⏱️ Multi-Timeframe Analysis**: Strategies operate on different timeframes (1min, 5min, 15min, 1h, etc.)
- **📊 Advanced Strategies**:
- **Multi-Strategy Architecture**: Combine multiple trading strategies with configurable weights and rules
- **Multi-Timeframe Analysis**: Strategies can operate on different timeframes (1min, 5min, 15min, 1h, etc.)
- **Advanced Strategies**:
- **Default Strategy**: Meta-trend analysis using multiple Supertrend indicators
- **BBRS Strategy**: Bollinger Bands + RSI with market regime detection
- **🎯 Flexible Signal Combination**: Weighted consensus, majority voting, any/all combinations
- **⚡ Precise Execution**: 1-minute precision for accurate risk management
- **📈 Comprehensive Analysis**: Detailed performance metrics and trade analysis
- **📱 Enhanced CLI**: Improved command-line interface with better error handling
- **🔍 Debug Mode**: Sequential execution with interactive plotting
- **Flexible Signal Combination**: Weighted consensus, majority voting, any/all combinations
- **Precise Stop-Loss**: 1-minute precision for accurate risk management
- **Comprehensive Backtesting**: Detailed performance metrics and trade analysis
- **Data Visualization**: Interactive charts and performance plots
## 🚀 Quick Start
## Quick Start
### Prerequisites
- Python 3.8+
- [uv](https://github.com/astral-sh/uv) package manager (recommended) or pip
- [uv](https://github.com/astral-sh/uv) package manager (recommended)
### Installation
@ -30,7 +28,7 @@ A Python framework for backtesting cryptocurrency trading strategies with multi-
git clone <repository-url>
cd Cycles
# Install dependencies with uv (recommended)
# Install dependencies with uv
uv sync
# Or install with pip
@ -39,72 +37,40 @@ pip install -r requirements.txt
### Running Backtests
The new CLI provides a clean interface with automatic config discovery:
Use the `uv run` command to execute backtests with different configurations:
```bash
# Use default configuration
python main.py
# Run default strategy on 5-minute timeframe
uv run .\main.py .\configs\config_default_5min.json
# Use specific config (searches configs/ directory automatically)
python main.py config_bbrs.json
# Run default strategy on 15-minute timeframe
uv run .\main.py .\configs\config_default.json
# Debug mode with interactive plotting
python main.py --debug config_combined.json
# Run BBRS strategy with market regime detection
uv run .\main.py .\configs\config_bbrs.json
# Full path also supported
python main.py configs/config_default_5min.json
# Get help
python main.py --help
# Run combined strategies
uv run .\main.py .\configs\config_combined.json
```
### Available Configurations
- **`config_default.json`**: Default meta-trend strategy (15min)
- **`config_default_5min.json`**: Default strategy on 5-minute timeframe
- **`config_bbrs.json`**: BBRS strategy with market regime detection
- **`config_bbrs_multi_timeframe.json`**: BBRS with multiple timeframes
- **`config_combined.json`**: Multi-strategy combination with weighted consensus
## 🏛️ Architecture Overview
The framework follows a **clean, modular architecture** for maximum reusability:
```
cycles/
├── application.py # 🎯 Main application orchestration
├── utils/
│ ├── config_manager.py # ⚙️ Centralized configuration management
│ ├── results_processor.py # 📊 Results processing & metrics calculation
│ ├── backtest_runner.py # 🚀 Backtest execution logic
│ ├── storage.py # 💾 Data storage utilities
│ └── system.py # 🖥️ System utilities
├── strategies/ # 📈 Strategy implementations
│ ├── base.py # 🏗️ Base strategy framework
│ ├── default_strategy.py # 📊 Meta-trend strategy
│ ├── bbrs_strategy.py # 🎯 BBRS strategy
│ └── manager.py # 🎛️ Strategy orchestration
├── backtest.py # ⚡ Core backtesting engine
└── charts.py # 📈 Visualization components
```
## 📱 Enhanced CLI Interface
### Configuration Examples
#### Default Strategy (5-minute timeframe)
```bash
python main.py [-h] [--debug] [config]
# Examples:
python main.py # Use default config
python main.py config_bbrs.json # Use specific config
python main.py --debug config_combined.json # Debug mode with plotting
# Available configs:
# - config_default.json: Default meta-trend strategy
# - config_bbrs.json: BBRS strategy
# - config_combined.json: Multi-strategy combination
uv run .\main.py .\configs\config_default_5min.json
```
## ⚙️ Configuration
#### BBRS Strategy with Multi-timeframe Analysis
```bash
uv run .\main.py .\configs\config_bbrs_multi_timeframe.json
```
#### Combined Strategies with Weighted Consensus
```bash
uv run .\main.py .\configs\config_combined.json
```
## Configuration
Strategies are configured using JSON files in the `configs/` directory:
@ -114,219 +80,98 @@ Strategies are configured using JSON files in the `configs/` directory:
"stop_date": "2024-01-31",
"initial_usd": 10000,
"timeframes": ["15min"],
"stop_loss_pcts": [0.03, 0.05],
"strategies": [
{
"name": "default",
"weight": 1.0,
"params": {
"timeframe": "15min",
"stop_loss_pct": 0.03
"timeframe": "15min"
}
}
],
"combination_rules": {
"entry": "weighted_consensus",
"entry": "any",
"exit": "any",
"min_confidence": 0.6
"min_confidence": 0.5
}
}
```
### Strategy Configuration
### Available Strategies
#### Single Strategy
```json
{
"strategies": [
{
"name": "default",
"weight": 1.0,
"params": {
"timeframe": "15min",
"stop_loss_pct": 0.03
}
}
]
}
```
#### Multi-Strategy Combination
```json
{
"strategies": [
{
"name": "default",
"weight": 0.6,
"params": {"timeframe": "15min"}
},
{
"name": "bbrs",
"weight": 0.4,
"params": {"bb_width": 0.05}
}
],
"combination_rules": {
"entry": "weighted_consensus",
"exit": "any",
"min_confidence": 0.6
}
}
```
1. **Default Strategy**: Meta-trend analysis using Supertrend indicators
2. **BBRS Strategy**: Bollinger Bands + RSI with market regime detection
### Combination Rules
- **Entry Methods**: `any`, `all`, `majority`, `weighted_consensus`
- **Exit Methods**: `any`, `all`, `priority` (prioritizes stop-loss signals)
- **Min Confidence**: Threshold for signal acceptance (0.0 - 1.0)
- **Entry**: `any`, `all`, `majority`, `weighted_consensus`
- **Exit**: `any`, `all`, `priority` (prioritizes stop-loss signals)
## 🛠️ Programmatic Usage
## Project Structure
The new modular architecture enables programmatic usage:
```python
from cycles.application import BacktestApplication
# Simple usage
app = BacktestApplication("configs/config_bbrs.json")
app.run(debug=False)
# Custom workflow
from cycles.utils import ConfigManager, BacktestRunner
config_manager = ConfigManager("my_config.json")
runner = BacktestRunner()
for timeframe in config_manager.timeframes:
config = config_manager.get_timeframe_task_config(timeframe)
results = runner.run_single_timeframe(data, timeframe, config)
# Process results...
```
Cycles/
├── configs/ # Configuration files
├── cycles/ # Core framework
│ ├── strategies/ # Strategy implementation
│ │ ├── base.py # Base strategy classes
│ │ ├── default_strategy.py
│ │ ├── bbrs_strategy.py
│ │ └── manager.py # Strategy manager
│ ├── Analysis/ # Technical analysis
│ ├── utils/ # Utilities
│ └── charts.py # Visualization
├── docs/ # Documentation
├── data/ # Market data
├── results/ # Backtest results
└── main.py # Main entry point
```
## 📈 Available Strategies
## Documentation
### 1. Default Strategy (Meta-Trend Analysis)
- **Type**: Meta-trend analysis using multiple Supertrend indicators
- **Timeframes**: Configurable (5min, 15min, 1h, etc.)
- **Features**: Triple Supertrend confirmation, precise stop-loss
Detailed documentation is available in the `docs/` directory:
```json
{
"name": "default",
"params": {
"timeframe": "15min",
"stop_loss_pct": 0.03
}
}
- **[Strategy Manager](./docs/strategy_manager.md)** - Multi-strategy orchestration and signal combination
- **[Strategies](./docs/strategies.md)** - Individual strategy implementations and usage
- **[Timeframe System](./docs/timeframe_system.md)** - Advanced timeframe management and multi-timeframe strategies
- **[Analysis](./docs/analysis.md)** - Technical analysis components
- **[Storage Utils](./docs/utils_storage.md)** - Data storage and retrieval
- **[System Utils](./docs/utils_system.md)** - System utilities
## Examples
### Single Strategy Backtest
```bash
# Test default strategy on different timeframes
uv run .\main.py .\configs\config_default.json # 15min
uv run .\main.py .\configs\config_default_5min.json # 5min
```
### 2. BBRS Strategy (Bollinger Bands + RSI)
- **Type**: Bollinger Bands + RSI with market regime detection
- **Features**: Adaptive parameters, volume confirmation, multi-timeframe analysis
- **Market Regimes**: Trending vs sideways market detection
```json
{
"name": "bbrs",
"params": {
"bb_width": 0.05,
"strategy_name": "MarketRegimeStrategy",
"stop_loss_pct": 0.05
}
}
### Multi-Strategy Backtest
```bash
# Combine multiple strategies with different weights
uv run .\main.py .\configs\config_combined.json
```
## 📊 Output & Results
### Generated Files
- **`YYYY_MM_DD_HH_MM_backtest.csv`**: Performance summary per timeframe
- **`YYYY_MM_DD_HH_MM_trades.csv`**: Individual trade records
- **`backtest.log`**: Detailed execution logs
### Debug Mode
- **Interactive Charts**: Real-time plotting of strategy signals and trades
- **Sequential Execution**: Step-by-step analysis
- **Enhanced Logging**: Detailed strategy information
### Performance Metrics
- **Trade Statistics**: Win rate, profit ratio, drawdown analysis
- **Portfolio Metrics**: Initial/final USD, total returns, fees
- **Risk Metrics**: Maximum drawdown, stop-loss frequency
## 📚 Documentation
Comprehensive documentation available in the `docs/` directory:
- **[🏗️ Refactoring Summary](./docs/refactoring_summary.md)** - Overview of new architecture improvements
- **[🎛️ Strategy Manager](./docs/strategy_manager.md)** - Multi-strategy orchestration
- **[📈 Strategies](./docs/strategies.md)** - Individual strategy implementations
- **[⏱️ Timeframe System](./docs/timeframe_system.md)** - Multi-timeframe management
- **[📊 Analysis](./docs/analysis.md)** - Technical analysis components
- **[💾 Storage Utils](./docs/utils_storage.md)** - Data management
- **[🖥️ System Utils](./docs/utils_system.md)** - System utilities
## 🎯 Migration & Compatibility
The refactored framework is **100% backward compatible**:
- ✅ Same command-line interface
- ✅ Same configuration file format
- ✅ Same output file format
- ✅ Same functionality
**Zero breaking changes** while providing a much cleaner, more maintainable architecture.
## 🔧 Advanced Usage
### Custom Strategy Development
```python
from cycles.strategies.base import StrategyBase, StrategySignal
class MyStrategy(StrategyBase):
def get_timeframes(self):
return ["1h"]
def initialize(self, backtester):
self._resample_data(backtester.original_df)
# Setup indicators...
self.initialized = True
def get_entry_signal(self, backtester, df_index):
# Your entry logic...
return StrategySignal("ENTRY", confidence=0.8)
### Custom Configuration
Create your own configuration file and run:
```bash
uv run .\main.py .\configs\your_config.json
```
### Custom Results Processing
```python
from cycles.utils import BacktestMetrics, ResultsProcessor
## Output
class CustomProcessor(ResultsProcessor):
def process_backtest_results(self, results, timeframe, config, summary):
# Custom processing logic...
return custom_summary, custom_trades
```
Backtests generate:
- **CSV Results**: Detailed performance metrics per timeframe/strategy
- **Trade Log**: Individual trade records with entry/exit details
- **Performance Charts**: Visual analysis of strategy performance (in debug mode)
- **Log Files**: Detailed execution logs
## 🚀 Future Enhancements
The new architecture enables easy addition of:
- **🌐 Web API Interface**: REST API for remote backtesting
- **⚡ Real-time Trading**: Live trading integration
- **📊 Advanced Analytics**: Enhanced performance analysis
- **🔌 Strategy Marketplace**: Plugin system for custom strategies
- **📈 Multiple Asset Classes**: Beyond cryptocurrency support
## 🤝 Contributing
We welcome contributions! The new modular architecture makes it easier to:
- Add new strategies
- Enhance analysis capabilities
- Improve visualization
- Extend data sources
## 📄 License
## License
[Add your license information here]
---
## Contributing
**Recent Updates**: The framework has been significantly refactored for improved modularity and reusability while maintaining full backward compatibility. See [Refactoring Summary](./docs/refactoring_summary.md) for details.
[Add contributing guidelines here]

View File

@ -1,13 +0,0 @@
"""
This module contains the analysis classes for the cycles project.
"""
from .boillinger_band import BollingerBands
from .rsi import RSI
from .bb_rsi import BollingerBandsStrategy
__all__ = ["BollingerBands", "RSI", "BollingerBandsStrategy"]
__version__ = "0.1.0"
__author__ = 'TCP Cycles Team'

View File

@ -1,214 +0,0 @@
"""
Backtesting Application
This module provides the main application class that orchestrates the entire
backtesting workflow. It coordinates configuration management, data loading,
backtest execution, and result output.
"""
import logging
import datetime
import concurrent.futures
from pathlib import Path
from typing import Optional, List, Dict, Any
from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils
from cycles.utils.config_manager import ConfigManager
from cycles.utils.results_processor import ResultsProcessor
from cycles.utils.backtest_runner import create_timeframe_tasks
class BacktestApplication:
"""
Main application class for coordinating backtesting workflow.
Orchestrates configuration management, data loading, backtest execution,
and result output in a clean, modular way.
"""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize the backtesting application.
Args:
config_path: Optional path to configuration file
"""
self.config_manager = ConfigManager(config_path)
self.storage = Storage(logging=logging)
self.system_utils = SystemUtils(logging=logging)
self.results_processor = ResultsProcessor()
self.logger = logging.getLogger(__name__)
def load_data(self):
"""Load market data based on configuration."""
self.logger.info("Loading market data...")
data_1min = self.storage.load_data(
'btcusd_1-min_data.csv',
self.config_manager.start_date,
self.config_manager.stop_date
)
self.logger.info(f"Loaded {len(data_1min)} rows of 1-minute data")
return data_1min
def create_tasks(self, data_1min) -> List:
"""Create backtest tasks from configuration."""
self.logger.info("Creating backtest tasks...")
tasks = create_timeframe_tasks(
self.config_manager.timeframes,
data_1min,
self.config_manager
)
self.logger.info(f"Created {len(tasks)} backtest tasks")
return tasks
def execute_tasks(self, tasks: List, debug: bool = False) -> tuple:
"""
Execute backtest tasks.
Args:
tasks: List of TimeframeTask objects
debug: Whether to run in debug mode (sequential with plotting)
Returns:
Tuple of (results_rows, trade_rows)
"""
if debug:
return self._execute_tasks_debug(tasks)
else:
return self._execute_tasks_parallel(tasks)
def _execute_tasks_debug(self, tasks: List) -> tuple:
"""Execute tasks in debug mode (sequential)."""
self.logger.info("Executing tasks in debug mode (sequential)")
all_results_rows = []
all_trade_rows = []
for task in tasks:
self.logger.info(f"Processing timeframe: {task.timeframe}")
results, trades = task.execute(debug=True)
if results:
all_results_rows.append(results)
if trades:
all_trade_rows.extend(trades)
return all_results_rows, all_trade_rows
def _execute_tasks_parallel(self, tasks: List) -> tuple:
"""Execute tasks in parallel."""
workers = self.system_utils.get_optimal_workers()
self.logger.info(f"Executing tasks in parallel with {workers} workers")
all_results_rows = []
all_trade_rows = []
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
# Submit all tasks
futures = {
executor.submit(task.execute, False): task
for task in tasks
}
# Collect results
for future in concurrent.futures.as_completed(futures):
task = futures[future]
try:
results, trades = future.result()
if results:
all_results_rows.append(results)
if trades:
all_trade_rows.extend(trades)
self.logger.info(f"Completed timeframe: {task.timeframe}")
except Exception as e:
self.logger.error(f"Task failed for timeframe {task.timeframe}: {e}")
return all_results_rows, all_trade_rows
def save_results(self, results_rows: List[Dict[str, Any]], trade_rows: List[Dict[str, Any]],
data_1min) -> None:
"""
Save backtest results to files.
Args:
results_rows: List of result summary rows
trade_rows: List of individual trade rows
data_1min: Original 1-minute data for metadata
"""
timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
# Create metadata
metadata_lines = self.results_processor.create_metadata_lines(
self.config_manager, data_1min
)
# Save backtest results
backtest_filename = f"{timestamp}_backtest.csv"
backtest_fieldnames = [
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
"max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd"
]
self.storage.write_backtest_results(
backtest_filename, backtest_fieldnames, results_rows, metadata_lines
)
# Save trade details
trades_fieldnames = [
"entry_time", "exit_time", "entry_price", "exit_price",
"profit_pct", "type", "fee_usd"
]
self.storage.write_trades(trade_rows, trades_fieldnames)
self.logger.info(f"Results saved to {backtest_filename}")
def run(self, debug: bool = False) -> None:
"""
Run the complete backtesting workflow.
Args:
debug: Whether to run in debug mode
"""
try:
self.logger.info("Starting backtesting workflow")
self.logger.info(f"Configuration: {self.config_manager}")
# Load data
data_1min = self.load_data()
# Create and execute tasks
tasks = self.create_tasks(data_1min)
results_rows, trade_rows = self.execute_tasks(tasks, debug)
# Save results
if results_rows or trade_rows:
self.save_results(results_rows, trade_rows, data_1min)
self.logger.info("Backtesting workflow completed successfully")
else:
self.logger.warning("No results generated")
except Exception as e:
self.logger.error(f"Backtesting workflow failed: {e}")
raise
def setup_logging() -> None:
"""Setup application logging configuration."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("backtest.log"),
logging.StreamHandler()
]
)

View File

@ -1,23 +0,0 @@
"""
Utilities Module
This module provides utility classes and functions for the backtesting framework.
"""
from .storage import Storage
from .system import SystemUtils
from .data_utils import *
from .config_manager import ConfigManager
from .results_processor import ResultsProcessor, BacktestMetrics
from .backtest_runner import BacktestRunner, TimeframeTask, create_timeframe_tasks
__all__ = [
'Storage',
'SystemUtils',
'ConfigManager',
'ResultsProcessor',
'BacktestMetrics',
'BacktestRunner',
'TimeframeTask',
'create_timeframe_tasks'
]

View File

@ -1,224 +0,0 @@
"""
Backtest Runner
This module provides a high-level interface for running backtests with strategy
management. It encapsulates the backtesting workflow and provides a clean
interface for executing tests across different configurations.
"""
import pandas as pd
import logging
from typing import Dict, List, Tuple, Any, Optional
from cycles.backtest import Backtest
from cycles.charts import BacktestCharts
from cycles.strategies import create_strategy_manager
from .results_processor import ResultsProcessor
class BacktestRunner:
"""
High-level backtest execution interface.
Encapsulates the backtesting workflow, strategy management, and result
processing into a clean, reusable interface.
"""
def __init__(self, results_processor: Optional[ResultsProcessor] = None):
"""
Initialize backtest runner.
Args:
results_processor: Optional results processor instance
"""
self.logger = logging.getLogger(__name__)
self.results_processor = results_processor or ResultsProcessor()
def run_single_timeframe(self, data_1min: pd.DataFrame, timeframe: str,
config: Dict[str, Any], debug: bool = False) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Run backtest for a single timeframe configuration.
Args:
data_1min: 1-minute OHLCV data
timeframe: Timeframe identifier
config: Configuration dictionary
debug: Whether to enable debug mode
Returns:
Tuple[Dict, List]: (summary_row, trade_rows)
"""
try:
# Create and initialize strategy manager
strategy_manager = self._create_strategy_manager(config)
# Setup backtester with appropriate data
backtester = self._setup_backtester(data_1min, strategy_manager, config)
# Run backtest
results = self._execute_backtest(backtester, debug)
# Process results
strategy_summary = strategy_manager.get_strategy_summary()
summary_row, trade_rows = self.results_processor.process_backtest_results(
results, timeframe, config, strategy_summary
)
# Handle debug plotting
if debug:
self._handle_debug_plotting(backtester, results)
return summary_row, trade_rows
except Exception as e:
self.logger.error(f"Backtest failed for timeframe {timeframe}: {e}")
raise
def _create_strategy_manager(self, config: Dict[str, Any]):
"""Create and validate strategy manager from configuration."""
strategy_config = {
"strategies": config['strategies'],
"combination_rules": config['combination_rules']
}
if not strategy_config['strategies']:
raise ValueError("No strategy configuration provided")
return create_strategy_manager(strategy_config)
def _setup_backtester(self, data_1min: pd.DataFrame, strategy_manager, config: Dict[str, Any]) -> Backtest:
"""Setup backtester with appropriate data and strategy manager."""
# Get primary strategy for backtester setup
primary_strategy = strategy_manager.strategies[0]
# Determine working dataframe based on strategy type
if primary_strategy.name == "bbrs":
# BBRS strategy processes 1-minute data and handles internal resampling
working_df = data_1min.copy()
else:
# Other strategies specify their preferred timeframe
primary_strategy._resample_data(data_1min)
working_df = primary_strategy.get_primary_timeframe_data()
# Prepare working dataframe for backtester
working_df_for_backtest = working_df.copy().reset_index()
if 'index' in working_df_for_backtest.columns:
working_df_for_backtest = working_df_for_backtest.rename(columns={'index': 'timestamp'})
# Initialize backtest
backtester = Backtest(
config['initial_usd'],
working_df_for_backtest,
working_df_for_backtest,
self._strategy_manager_init
)
# Store original data and attach strategy manager
backtester.original_df = data_1min
backtester.strategy_manager = strategy_manager
# Initialize strategy manager
strategy_manager.initialize(backtester)
return backtester
def _execute_backtest(self, backtester: Backtest, debug: bool = False) -> Dict[str, Any]:
"""Execute the backtest using strategy manager functions."""
return backtester.run(
self._strategy_manager_entry,
self._strategy_manager_exit,
debug
)
def _handle_debug_plotting(self, backtester: Backtest, results: Dict[str, Any]) -> None:
"""Handle debug plotting if enabled."""
try:
# Check if any strategy has processed_data for universal plotting
processed_data = None
for strategy in backtester.strategy_manager.strategies:
if hasattr(backtester, 'processed_data') and backtester.processed_data is not None:
processed_data = backtester.processed_data
break
if processed_data is not None and not processed_data.empty:
# Format strategy data with actual executed trades for universal plotting
formatted_data = BacktestCharts.format_strategy_data_with_trades(processed_data, results)
# Plot using universal function
BacktestCharts.plot_data(formatted_data)
else:
# Fallback to meta_trend plot if available
if "meta_trend" in backtester.strategies:
meta_trend = backtester.strategies["meta_trend"]
working_df = backtester.df.set_index('timestamp')
BacktestCharts.plot(working_df, meta_trend)
else:
self.logger.info("No plotting data available")
except Exception as e:
self.logger.warning(f"Plotting failed: {e}")
# Strategy manager interface functions
@staticmethod
def _strategy_manager_init(backtester: Backtest):
"""Strategy Manager initialization function."""
# Actual initialization happens in strategy_manager.initialize()
pass
@staticmethod
def _strategy_manager_entry(backtester: Backtest, df_index: int) -> bool:
"""Strategy Manager entry function."""
return backtester.strategy_manager.get_entry_signal(backtester, df_index)
@staticmethod
def _strategy_manager_exit(backtester: Backtest, df_index: int) -> Tuple[Optional[str], Optional[float]]:
"""Strategy Manager exit function."""
return backtester.strategy_manager.get_exit_signal(backtester, df_index)
class TimeframeTask:
"""Encapsulates a single timeframe backtest task."""
def __init__(self, timeframe: str, data_1min: pd.DataFrame, config: Dict[str, Any]):
"""
Initialize timeframe task.
Args:
timeframe: Timeframe identifier
data_1min: 1-minute OHLCV data
config: Configuration for this task
"""
self.timeframe = timeframe
self.data_1min = data_1min
self.config = config
def execute(self, debug: bool = False) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Execute the timeframe task.
Args:
debug: Whether to enable debug mode
Returns:
Tuple[Dict, List]: (summary_row, trade_rows)
"""
runner = BacktestRunner()
return runner.run_single_timeframe(self.data_1min, self.timeframe, self.config, debug)
def create_timeframe_tasks(timeframes: List[str], data_1min: pd.DataFrame,
config_manager) -> List[TimeframeTask]:
"""
Create timeframe tasks from configuration.
Args:
timeframes: List of timeframes to test
data_1min: 1-minute OHLCV data
config_manager: Configuration manager instance
Returns:
List[TimeframeTask]: List of timeframe tasks
"""
tasks = []
for timeframe in timeframes:
task_config = config_manager.get_timeframe_task_config(timeframe)
tasks.append(TimeframeTask(timeframe, data_1min, task_config))
return tasks

View File

@ -1,129 +0,0 @@
"""
Configuration Manager
This module provides centralized configuration handling for the backtesting system.
It handles loading, validation, and provides a clean interface for accessing
configuration data.
"""
import json
import datetime
import logging
from typing import Dict, List, Optional, Any
from pathlib import Path
class ConfigManager:
"""
Manages configuration loading, validation, and access.
Provides a centralized way to handle configuration files with validation
and convenient access methods.
"""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize configuration manager.
Args:
config_path: Path to configuration file. If None, uses default.
"""
self.config_path = config_path or "configs/config_default.json"
self.config = self._load_config()
self._validate_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from file."""
try:
with open(self.config_path, 'r') as f:
config = json.load(f)
logging.info(f"Loaded configuration from: {self.config_path}")
return config
except FileNotFoundError:
available_configs = list(Path("configs").glob("*.json"))
raise FileNotFoundError(
f"Config file '{self.config_path}' not found. "
f"Available configs: {[str(c) for c in available_configs]}"
)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in config file '{self.config_path}': {e}")
def _validate_config(self) -> None:
"""Validate configuration structure and values."""
required_fields = ['start_date', 'initial_usd', 'timeframes', 'strategies']
for field in required_fields:
if field not in self.config:
raise ValueError(f"Missing required field '{field}' in configuration")
# Validate strategies
if not self.config['strategies']:
raise ValueError("At least one strategy must be specified")
for strategy in self.config['strategies']:
if 'name' not in strategy:
raise ValueError("Strategy must have a 'name' field")
# Validate timeframes
if not self.config['timeframes']:
raise ValueError("At least one timeframe must be specified")
logging.info("Configuration validation successful")
@property
def start_date(self) -> str:
"""Get start date."""
return self.config['start_date']
@property
def stop_date(self) -> str:
"""Get stop date, defaulting to current date if None."""
stop_date = self.config.get('stop_date')
if stop_date is None:
return datetime.datetime.now().strftime("%Y-%m-%d")
return stop_date
@property
def initial_usd(self) -> float:
"""Get initial USD amount."""
return self.config['initial_usd']
@property
def timeframes(self) -> List[str]:
"""Get list of timeframes to test."""
return self.config['timeframes']
@property
def strategies_config(self) -> List[Dict[str, Any]]:
"""Get strategies configuration."""
return self.config['strategies']
@property
def combination_rules(self) -> Dict[str, Any]:
"""Get combination rules for strategy manager."""
return self.config.get('combination_rules', {
"entry": "any",
"exit": "any",
"min_confidence": 0.5
})
def get_strategy_manager_config(self) -> Dict[str, Any]:
"""Get configuration for strategy manager."""
return {
"strategies": self.strategies_config,
"combination_rules": self.combination_rules
}
def get_timeframe_task_config(self, timeframe: str) -> Dict[str, Any]:
"""Get configuration for a specific timeframe task."""
return {
"initial_usd": self.initial_usd,
"strategies": self.strategies_config,
"combination_rules": self.combination_rules
}
def __repr__(self) -> str:
"""String representation of configuration."""
return (f"ConfigManager(config_path='{self.config_path}', "
f"strategies={len(self.strategies_config)}, "
f"timeframes={len(self.timeframes)})")

View File

@ -1,239 +0,0 @@
"""
Results Processor
This module handles processing, aggregation, and analysis of backtest results.
It provides utilities for calculating metrics, aggregating results across
timeframes, and formatting output data.
"""
import pandas as pd
import numpy as np
import logging
from typing import Dict, List, Tuple, Any, Optional
from collections import defaultdict
class BacktestMetrics:
"""Container for backtest metrics calculation."""
@staticmethod
def calculate_trade_metrics(trades: List[Dict[str, Any]]) -> Dict[str, float]:
"""Calculate trade-level metrics."""
if not trades:
return {
"n_trades": 0,
"n_winning_trades": 0,
"win_rate": 0.0,
"total_profit": 0.0,
"total_loss": 0.0,
"avg_trade": 0.0,
"profit_ratio": 0.0,
"max_drawdown": 0.0
}
n_trades = len(trades)
wins = [t for t in trades if t.get('exit') and t['exit'] > t['entry']]
n_winning_trades = len(wins)
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
total_profit = sum(trade['profit_pct'] for trade in trades)
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
avg_trade = total_profit / n_trades if n_trades > 0 else 0
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
# Calculate max drawdown
cumulative_profit = 0
max_drawdown = 0
peak = 0
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
return {
"n_trades": n_trades,
"n_winning_trades": n_winning_trades,
"win_rate": win_rate,
"total_profit": total_profit,
"total_loss": total_loss,
"avg_trade": avg_trade,
"profit_ratio": profit_ratio,
"max_drawdown": max_drawdown
}
@staticmethod
def calculate_portfolio_metrics(trades: List[Dict[str, Any]], initial_usd: float) -> Dict[str, float]:
"""Calculate portfolio-level metrics."""
final_usd = initial_usd
for trade in trades:
final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
return {
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
"total_return": (final_usd - initial_usd) / initial_usd
}
class ResultsProcessor:
"""
Processes and aggregates backtest results.
Handles result processing, metric calculation, and aggregation across
multiple timeframes and configurations.
"""
def __init__(self):
"""Initialize results processor."""
self.logger = logging.getLogger(__name__)
def process_backtest_results(self, results: Dict[str, Any], timeframe: str,
config: Dict[str, Any], strategy_summary: Dict[str, Any]) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Process results from a single backtest run.
Args:
results: Raw backtest results
timeframe: Timeframe identifier
config: Configuration used for the test
strategy_summary: Summary of strategies used
Returns:
Tuple[Dict, List]: (summary_row, trade_rows)
"""
trades = results.get('trades', [])
initial_usd = config['initial_usd']
# Calculate metrics
trade_metrics = BacktestMetrics.calculate_trade_metrics(trades)
portfolio_metrics = BacktestMetrics.calculate_portfolio_metrics(trades, initial_usd)
# Get primary strategy info for reporting
primary_strategy = strategy_summary['strategies'][0] if strategy_summary['strategies'] else {}
primary_timeframe = primary_strategy.get('timeframes', ['unknown'])[0]
stop_loss_pct = primary_strategy.get('params', {}).get('stop_loss_pct', 'N/A')
# Create summary row
summary_row = {
"timeframe": f"{timeframe}({primary_timeframe})",
"stop_loss_pct": stop_loss_pct,
"n_stop_loss": sum(1 for trade in trades if trade.get('type') == 'STOP_LOSS'),
**trade_metrics,
**portfolio_metrics
}
# Create trade rows
trade_rows = []
for trade in trades:
trade_rows.append({
"timeframe": f"{timeframe}({primary_timeframe})",
"stop_loss_pct": stop_loss_pct,
"entry_time": trade.get("entry_time"),
"exit_time": trade.get("exit_time"),
"entry_price": trade.get("entry"),
"exit_price": trade.get("exit"),
"profit_pct": trade.get("profit_pct"),
"type": trade.get("type"),
"fee_usd": trade.get("fee_usd"),
})
# Log results
strategy_names = [s['name'] for s in strategy_summary['strategies']]
self.logger.info(
f"Timeframe: {timeframe}({primary_timeframe}), Stop Loss: {stop_loss_pct}, "
f"Trades: {trade_metrics['n_trades']}, Strategies: {strategy_names}"
)
return summary_row, trade_rows
def aggregate_results(self, all_rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Aggregate results per timeframe and stop_loss_pct.
Args:
all_rows: List of result rows to aggregate
Returns:
List[Dict]: Aggregated summary rows
"""
grouped = defaultdict(list)
for row in all_rows:
key = (row['timeframe'], row['stop_loss_pct'])
grouped[key].append(row)
summary_rows = []
for (timeframe, stop_loss_pct), rows in grouped.items():
if not rows:
continue
# Aggregate metrics
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
# Average metrics across runs
avg_win_rate = np.mean([r['win_rate'] for r in rows])
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows if r['profit_ratio'] != float('inf')])
# Portfolio metrics
initial_usd = rows[0]['initial_usd'] # Should be same for all
avg_final_usd = np.mean([r['final_usd'] for r in rows])
avg_total_fees_usd = np.mean([r['total_fees_usd'] for r in rows])
summary_rows.append({
"timeframe": timeframe,
"stop_loss_pct": stop_loss_pct,
"n_trades": total_trades,
"n_stop_loss": total_stop_loss,
"win_rate": avg_win_rate,
"max_drawdown": avg_max_drawdown,
"avg_trade": avg_avg_trade,
"profit_ratio": avg_profit_ratio,
"initial_usd": initial_usd,
"final_usd": avg_final_usd,
"total_fees_usd": avg_total_fees_usd,
})
return summary_rows
def create_metadata_lines(self, config_manager, data_1min: pd.DataFrame) -> List[str]:
"""
Create metadata lines for result files.
Args:
config_manager: Configuration manager instance
data_1min: 1-minute data for price lookups
Returns:
List[str]: Metadata lines
"""
start_date = config_manager.start_date
stop_date = config_manager.stop_date
initial_usd = config_manager.initial_usd
def get_nearest_price(df: pd.DataFrame, target_date: str) -> Tuple[Optional[str], Optional[float]]:
"""Get nearest price for a given date."""
if len(df) == 0:
return None, None
target_ts = pd.to_datetime(target_date)
nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0]
nearest_time = df.index[nearest_idx]
price = df.iloc[nearest_idx]['close']
return str(nearest_time), price
nearest_start_time, start_price = get_nearest_price(data_1min, start_date)
nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date)
return [
f"Start date\t{start_date}\tPrice\t{start_price}",
f"Stop date\t{stop_date}\tPrice\t{stop_price}",
f"Initial USD\t{initial_usd}"
]

View File

@ -1,452 +0,0 @@
# Architecture Components Documentation
## Overview
The Cycles framework has been refactored into a modular architecture with specialized components for different aspects of the backtesting workflow. This document provides detailed information about the new architectural components and how to use them.
## 🏗️ Component Architecture
```
New Components:
├── 🎯 BacktestApplication # Main workflow orchestration
├── ⚙️ ConfigManager # Configuration management
├── 📊 ResultsProcessor # Results processing & metrics
├── 🚀 BacktestRunner # Backtest execution logic
└── 📦 TimeframeTask # Individual task encapsulation
```
---
## ⚙️ ConfigManager
**Purpose**: Centralized configuration loading, validation, and access
### Features
- **Automatic Validation**: Validates configuration structure and required fields
- **Type-Safe Access**: Property-based access to configuration values
- **Smart Defaults**: Automatic fallbacks (e.g., current date for stop_date)
- **Reusable Configs**: Generate configurations for different components
### Basic Usage
```python
from cycles.utils import ConfigManager
# Initialize with config file
config_manager = ConfigManager("configs/config_bbrs.json")
# Access configuration properties
start_date = config_manager.start_date
initial_usd = config_manager.initial_usd
timeframes = config_manager.timeframes
# Get specialized configurations
strategy_config = config_manager.get_strategy_manager_config()
task_config = config_manager.get_timeframe_task_config("15min")
```
### Configuration Properties
| Property | Type | Description |
|----------|------|-------------|
| `start_date` | `str` | Backtest start date |
| `stop_date` | `str` | Backtest end date (auto-defaults to current) |
| `initial_usd` | `float` | Initial portfolio value |
| `timeframes` | `List[str]` | List of timeframes to test |
| `strategies_config` | `List[Dict]` | Strategy configurations |
| `combination_rules` | `Dict` | Signal combination rules |
### Configuration Methods
```python
# Get strategy manager configuration
strategy_config = config_manager.get_strategy_manager_config()
# Returns: {"strategies": [...], "combination_rules": {...}}
# Get timeframe-specific task configuration
task_config = config_manager.get_timeframe_task_config("15min")
# Returns: {"initial_usd": 10000, "strategies": [...], "combination_rules": {...}}
```
### Error Handling
```python
try:
config_manager = ConfigManager("invalid_config.json")
except FileNotFoundError as e:
print(f"Config file not found: {e}")
except ValueError as e:
print(f"Invalid configuration: {e}")
```
---
## 📊 ResultsProcessor & BacktestMetrics
**Purpose**: Unified processing, aggregation, and analysis of backtest results
### BacktestMetrics (Static Utilities)
```python
from cycles.utils import BacktestMetrics
# Calculate trade-level metrics
trades = [{"profit_pct": 0.05}, {"profit_pct": -0.02}]
trade_metrics = BacktestMetrics.calculate_trade_metrics(trades)
# Returns: {n_trades, win_rate, max_drawdown, avg_trade, ...}
# Calculate portfolio-level metrics
portfolio_metrics = BacktestMetrics.calculate_portfolio_metrics(trades, 10000)
# Returns: {initial_usd, final_usd, total_fees_usd, total_return}
```
### ResultsProcessor (Instance-Based)
```python
from cycles.utils import ResultsProcessor
processor = ResultsProcessor()
# Process single backtest results
summary_row, trade_rows = processor.process_backtest_results(
results=backtest_results,
timeframe="15min",
config=task_config,
strategy_summary=strategy_summary
)
# Aggregate multiple results
aggregated = processor.aggregate_results(all_result_rows)
# Create metadata for output files
metadata_lines = processor.create_metadata_lines(config_manager, data_1min)
```
### Available Metrics
#### Trade Metrics
- `n_trades`: Total number of trades
- `n_winning_trades`: Number of profitable trades
- `win_rate`: Percentage of winning trades
- `total_profit`: Sum of all profitable trades
- `total_loss`: Sum of all losing trades
- `avg_trade`: Average trade return
- `profit_ratio`: Ratio of total profit to total loss
- `max_drawdown`: Maximum portfolio drawdown
#### Portfolio Metrics
- `initial_usd`: Starting portfolio value
- `final_usd`: Ending portfolio value
- `total_fees_usd`: Total trading fees
- `total_return`: Overall portfolio return percentage
---
## 🚀 BacktestRunner & TimeframeTask
**Purpose**: Modular backtest execution with clean separation of concerns
### BacktestRunner
```python
from cycles.utils import BacktestRunner
runner = BacktestRunner()
# Run single timeframe backtest
summary_row, trade_rows = runner.run_single_timeframe(
data_1min=market_data,
timeframe="15min",
config=task_config,
debug=False
)
```
#### BacktestRunner Methods
| Method | Purpose | Returns |
|--------|---------|---------|
| `run_single_timeframe()` | Execute backtest for one timeframe | `(summary_row, trade_rows)` |
| `_create_strategy_manager()` | Create strategy manager from config | `StrategyManager` |
| `_setup_backtester()` | Setup backtester with data and strategies | `Backtest` |
| `_execute_backtest()` | Run the backtest | `Dict[results]` |
| `_handle_debug_plotting()` | Handle debug mode plotting | `None` |
### TimeframeTask
**Purpose**: Encapsulates a single timeframe backtest task for easy execution and parallelization
```python
from cycles.utils import TimeframeTask, create_timeframe_tasks
# Create individual task
task = TimeframeTask(
timeframe="15min",
data_1min=market_data,
config=task_config
)
# Execute task
summary_row, trade_rows = task.execute(debug=False)
# Create multiple tasks from configuration
tasks = create_timeframe_tasks(
timeframes=["5min", "15min", "1h"],
data_1min=market_data,
config_manager=config_manager
)
# Execute all tasks
for task in tasks:
results = task.execute()
```
### Parallel Execution
```python
import concurrent.futures
from cycles.utils import create_timeframe_tasks
# Create tasks
tasks = create_timeframe_tasks(timeframes, data_1min, config_manager)
# Execute in parallel
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(task.execute, False): task for task in tasks}
for future in concurrent.futures.as_completed(futures):
task = futures[future]
try:
summary_row, trade_rows = future.result()
print(f"Completed: {task.timeframe}")
except Exception as e:
print(f"Failed: {task.timeframe} - {e}")
```
---
## 🎯 BacktestApplication
**Purpose**: Main application orchestration class that coordinates the entire workflow
### Complete Workflow
```python
from cycles.application import BacktestApplication
# Simple usage
app = BacktestApplication("configs/config_combined.json")
app.run(debug=False)
```
### Workflow Steps
```python
class BacktestApplication:
def run(self, debug=False):
# 1. Load data
data_1min = self.load_data()
# 2. Create tasks
tasks = self.create_tasks(data_1min)
# 3. Execute tasks (parallel or debug mode)
results_rows, trade_rows = self.execute_tasks(tasks, debug)
# 4. Save results
self.save_results(results_rows, trade_rows, data_1min)
```
### Custom Application
```python
from cycles.application import BacktestApplication
class CustomBacktestApplication(BacktestApplication):
def execute_tasks(self, tasks, debug=False):
# Custom execution logic
# Maybe with custom progress tracking
results = []
for i, task in enumerate(tasks):
print(f"Processing {i+1}/{len(tasks)}: {task.timeframe}")
result = task.execute(debug)
results.append(result)
return results
def save_results(self, results_rows, trade_rows, data_1min):
# Custom result saving
super().save_results(results_rows, trade_rows, data_1min)
# Additional custom processing
self.send_email_notification(results_rows)
# Usage
app = CustomBacktestApplication("config.json")
app.run()
```
---
## 🔧 Component Integration Examples
### Simple Integration
```python
# All-in-one approach
from cycles.application import BacktestApplication
app = BacktestApplication("config.json")
app.run(debug=False)
```
### Modular Integration
```python
# Step-by-step approach using individual components
from cycles.utils import ConfigManager, BacktestRunner, ResultsProcessor
# 1. Configuration
config_manager = ConfigManager("config.json")
# 2. Data loading (using existing storage utilities)
from cycles.utils import Storage
storage = Storage()
data_1min = storage.load_data('btcusd_1-min_data.csv',
config_manager.start_date,
config_manager.stop_date)
# 3. Execution
runner = BacktestRunner()
all_results = []
for timeframe in config_manager.timeframes:
task_config = config_manager.get_timeframe_task_config(timeframe)
summary_row, trade_rows = runner.run_single_timeframe(
data_1min, timeframe, task_config
)
all_results.extend(trade_rows)
# 4. Processing
processor = ResultsProcessor()
final_results = processor.aggregate_results(all_results)
```
### Custom Workflow
```python
# Custom workflow for specific needs
from cycles.utils import ConfigManager, BacktestRunner
config_manager = ConfigManager("config.json")
runner = BacktestRunner()
# Custom data preparation
custom_data = prepare_custom_data(config_manager.start_date)
# Custom configuration modification
for strategy in config_manager.strategies_config:
if strategy['name'] == 'default':
strategy['params']['stop_loss_pct'] = 0.02 # Custom stop loss
# Custom execution with monitoring
for timeframe in config_manager.timeframes:
print(f"Starting backtest for {timeframe}")
config = config_manager.get_timeframe_task_config(timeframe)
try:
results = runner.run_single_timeframe(custom_data, timeframe, config)
process_custom_results(results, timeframe)
except Exception as e:
handle_custom_error(e, timeframe)
```
---
## 🎨 Extension Points
### Custom Configuration Manager
```python
from cycles.utils import ConfigManager
class CustomConfigManager(ConfigManager):
def _validate_config(self):
super()._validate_config()
# Additional custom validation
if self.config.get('custom_field') is None:
raise ValueError("Custom field is required")
@property
def custom_setting(self):
return self.config.get('custom_field', 'default_value')
```
### Custom Results Processor
```python
from cycles.utils import ResultsProcessor
class CustomResultsProcessor(ResultsProcessor):
def process_backtest_results(self, results, timeframe, config, strategy_summary):
# Call parent method
summary_row, trade_rows = super().process_backtest_results(
results, timeframe, config, strategy_summary
)
# Add custom metrics
summary_row['custom_metric'] = self.calculate_custom_metric(trade_rows)
return summary_row, trade_rows
def calculate_custom_metric(self, trades):
# Custom metric calculation
return sum(t['profit_pct'] for t in trades if t['profit_pct'] > 0.05)
```
---
## 🚀 Migration Guide
### From Old main.py
**Before (Old main.py)**:
```python
# Scattered configuration
config_file = args.config or "configs/config_default.json"
with open(config_file, 'r') as f:
config = json.load(f)
# Complex processing function
results = process_timeframe_data(data_1min, timeframe, config, debug)
# Manual result aggregation
all_results = []
for task in tasks:
results = process(task, debug)
all_results.extend(results)
```
**After (New Architecture)**:
```python
# Clean application approach
from cycles.application import BacktestApplication
app = BacktestApplication(config_file)
app.run(debug=debug)
# Or modular approach
from cycles.utils import ConfigManager, BacktestRunner
config_manager = ConfigManager(config_file)
runner = BacktestRunner()
results = runner.run_single_timeframe(data, timeframe, config)
```
### Benefits of Migration
1. **🧹 Cleaner Code**: Reduced complexity and better organization
2. **🔄 Reusability**: Components can be used independently
3. **🧪 Testability**: Each component can be tested in isolation
4. **🛠️ Extensibility**: Easy to extend and customize components
5. **📈 Maintainability**: Clear separation of concerns
---
**Note**: All new components maintain full backward compatibility with existing configuration files and output formats.

View File

@ -4,48 +4,6 @@
The Strategy Manager is a sophisticated orchestration system that enables the combination of multiple trading strategies with configurable signal aggregation rules. It supports multi-timeframe analysis, weighted consensus voting, and flexible signal combination methods.
> **🏗️ New Architecture**: The Strategy Manager has been enhanced as part of the framework's modular refactoring. It now integrates seamlessly with the new `BacktestRunner`, `ConfigManager`, and `ResultsProcessor` components while maintaining full backward compatibility.
## New Framework Integration
### Modular Architecture Benefits
The refactored framework provides several advantages for strategy management:
- **🔧 Simplified Configuration**: Unified configuration through `ConfigManager`
- **⚡ Enhanced Execution**: Streamlined execution via `BacktestRunner`
- **📊 Better Processing**: Integrated results processing with `ResultsProcessor`
- **🔄 Improved Reusability**: Strategy manager can be used independently
### Usage in New Framework
```python
# Direct usage with new components
from cycles.utils import ConfigManager, BacktestRunner
config_manager = ConfigManager("config.json")
runner = BacktestRunner()
# Configuration is automatically prepared
strategy_config = config_manager.get_strategy_manager_config()
task_config = config_manager.get_timeframe_task_config("15min")
# Execution is handled cleanly
results = runner.run_single_timeframe(data, "15min", task_config)
```
### Integration with BacktestApplication
The Strategy Manager seamlessly integrates with the new `BacktestApplication`:
```python
from cycles.application import BacktestApplication
# Strategy manager is automatically created and managed
app = BacktestApplication("config.json")
app.run(debug=False) # Strategy manager handles all strategy coordination
```
## Architecture
### Core Components

402
main.py
View File

@ -1,99 +1,337 @@
"""
Main entry point for the backtesting application.
This module provides a clean command-line interface for running backtests
using the modular backtesting framework.
"""
import pandas as pd
import numpy as np
import logging
import concurrent.futures
import os
import datetime
import argparse
from pathlib import Path
import json
from cycles.application import BacktestApplication, setup_logging
from cycles.utils.storage import Storage
from cycles.utils.system import SystemUtils
from cycles.backtest import Backtest
from cycles.charts import BacktestCharts
from cycles.strategies import create_strategy_manager
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("backtest.log"),
logging.StreamHandler()
]
)
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Run cryptocurrency backtests with configurable strategies.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python main.py # Use default config
python main.py config_bbrs.json # Use specific config
python main.py --debug config_combined.json # Debug mode with plotting
def strategy_manager_init(backtester: Backtest):
"""Strategy Manager initialization function"""
# This will be called by Backtest.__init__, but actual initialization
# happens in strategy_manager.initialize()
pass
Available configs:
- config_default.json: Default meta-trend strategy
- config_bbrs.json: BBRS strategy
- config_combined.json: Multi-strategy combination
"""
def strategy_manager_entry(backtester: Backtest, df_index: int):
"""Strategy Manager entry function"""
return backtester.strategy_manager.get_entry_signal(backtester, df_index)
def strategy_manager_exit(backtester: Backtest, df_index: int):
"""Strategy Manager exit function"""
return backtester.strategy_manager.get_exit_signal(backtester, df_index)
def process_timeframe_data(data_1min, timeframe, config, debug=False):
"""Process a timeframe using Strategy Manager with configuration"""
results_rows = []
trade_rows = []
# Extract values from config
initial_usd = config['initial_usd']
strategy_config = {
"strategies": config['strategies'],
"combination_rules": config['combination_rules']
}
# Create and initialize strategy manager
if not strategy_config:
logging.error("No strategy configuration provided")
return results_rows, trade_rows
strategy_manager = create_strategy_manager(strategy_config)
# Get the primary timeframe from the first strategy for backtester setup
primary_strategy = strategy_manager.strategies[0]
primary_timeframe = primary_strategy.get_timeframes()[0]
# For BBRS strategy, it works with 1-minute data directly and handles internal resampling
# For other strategies, use their preferred timeframe
if primary_strategy.name == "bbrs":
# BBRS strategy processes 1-minute data and outputs signals on its internal timeframes
# Use 1-minute data for backtester working dataframe
working_df = data_1min.copy()
else:
# Other strategies specify their preferred timeframe
# Let the primary strategy resample the data to get the working dataframe
primary_strategy._resample_data(data_1min)
working_df = primary_strategy.get_primary_timeframe_data()
# Prepare working dataframe for backtester (ensure timestamp column)
working_df_for_backtest = working_df.copy().reset_index()
if 'index' in working_df_for_backtest.columns:
working_df_for_backtest = working_df_for_backtest.rename(columns={'index': 'timestamp'})
# Initialize backtest with strategy manager initialization
backtester = Backtest(initial_usd, working_df_for_backtest, working_df_for_backtest, strategy_manager_init)
# Store original min1_df for strategy processing
backtester.original_df = data_1min
# Attach strategy manager to backtester and initialize
backtester.strategy_manager = strategy_manager
strategy_manager.initialize(backtester)
# Run backtest with strategy manager functions
results = backtester.run(
strategy_manager_entry,
strategy_manager_exit,
debug
)
n_trades = results["n_trades"]
trades = results.get('trades', [])
wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']]
n_winning_trades = len(wins)
total_profit = sum(trade['profit_pct'] for trade in trades)
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
avg_trade = total_profit / n_trades if n_trades > 0 else 0
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
cumulative_profit = 0
max_drawdown = 0
peak = 0
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
final_usd = initial_usd
for trade in trades:
final_usd *= (1 + trade['profit_pct'])
total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades)
# Get stop_loss_pct from the first strategy for reporting
# In multi-strategy setups, strategies can have different stop_loss_pct values
stop_loss_pct = primary_strategy.params.get("stop_loss_pct", "N/A")
# Update row to include timeframe information
row = {
"timeframe": f"{timeframe}({primary_timeframe})", # Show actual timeframe used
"stop_loss_pct": stop_loss_pct,
"n_trades": n_trades,
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'),
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"total_profit": total_profit,
"total_loss": total_loss,
"profit_ratio": profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
}
results_rows.append(row)
for trade in trades:
trade_rows.append({
"timeframe": f"{timeframe}({primary_timeframe})",
"stop_loss_pct": stop_loss_pct,
"entry_time": trade.get("entry_time"),
"exit_time": trade.get("exit_time"),
"entry_price": trade.get("entry"),
"exit_price": trade.get("exit"),
"profit_pct": trade.get("profit_pct"),
"type": trade.get("type"),
"fee_usd": trade.get("fee_usd"),
})
parser.add_argument(
"config",
type=str,
nargs="?",
help="Path to config JSON file (default: configs/config_default.json)"
# Log strategy summary
strategy_summary = strategy_manager.get_strategy_summary()
logging.info(f"Timeframe: {timeframe}({primary_timeframe}), Stop Loss: {stop_loss_pct}, "
f"Trades: {n_trades}, Strategies: {[s['name'] for s in strategy_summary['strategies']]}")
if debug:
# Plot after each backtest run
try:
# Check if any strategy has processed_data for universal plotting
processed_data = None
for strategy in strategy_manager.strategies:
if hasattr(backtester, 'processed_data') and backtester.processed_data is not None:
processed_data = backtester.processed_data
break
if processed_data is not None and not processed_data.empty:
# Format strategy data with actual executed trades for universal plotting
formatted_data = BacktestCharts.format_strategy_data_with_trades(processed_data, results)
# Plot using universal function
BacktestCharts.plot_data(formatted_data)
else:
# Fallback to meta_trend plot if available
if "meta_trend" in backtester.strategies:
meta_trend = backtester.strategies["meta_trend"]
# Use the working dataframe for plotting
BacktestCharts.plot(working_df, meta_trend)
else:
print("No plotting data available")
except Exception as e:
print(f"Plotting failed: {e}")
return results_rows, trade_rows
def process(timeframe_info, debug=False):
"""Process a single timeframe with strategy config"""
timeframe, data_1min, config = timeframe_info
# Pass the essential data and full config
results_rows, all_trade_rows = process_timeframe_data(
data_1min, timeframe, config, debug=debug
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode (sequential execution with plotting)"
)
return parser.parse_args()
return results_rows, all_trade_rows
def aggregate_results(all_rows):
"""Aggregate results per stop_loss_pct and per rule (timeframe)"""
from collections import defaultdict
def validate_config_path(config_path: str) -> str:
"""Validate and resolve configuration path."""
if config_path is None:
return "configs/config_default.json"
# If just filename provided, look in configs directory
if not "/" in config_path and not "\\" in config_path:
config_path = f"configs/{config_path}"
# Validate file exists
if not Path(config_path).exists():
available_configs = list(Path("configs").glob("*.json"))
available_names = [c.name for c in available_configs]
raise FileNotFoundError(
f"Config file '{config_path}' not found.\n"
f"Available configs: {', '.join(available_names)}"
)
return config_path
grouped = defaultdict(list)
for row in all_rows:
key = (row['timeframe'], row['stop_loss_pct'])
grouped[key].append(row)
summary_rows = []
for (rule, stop_loss_pct), rows in grouped.items():
n_months = len(rows)
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
avg_win_rate = np.mean([r['win_rate'] for r in rows])
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
def main():
"""Main application entry point."""
# Setup logging
setup_logging()
# Parse arguments
args = parse_arguments()
try:
# Validate configuration path
config_path = validate_config_path(args.config)
# Create and run application
app = BacktestApplication(config_path)
app.run(debug=args.debug)
except FileNotFoundError as e:
print(f"Error: {e}")
return 1
except Exception as e:
print(f"Application failed: {e}")
return 1
return 0
# Calculate final USD
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows])
summary_rows.append({
"timeframe": rule,
"stop_loss_pct": stop_loss_pct,
"n_trades": total_trades,
"n_stop_loss": total_stop_loss,
"win_rate": avg_win_rate,
"max_drawdown": avg_max_drawdown,
"avg_trade": avg_avg_trade,
"profit_ratio": avg_profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
})
return summary_rows
def get_nearest_price(df, target_date):
if len(df) == 0:
return None, None
target_ts = pd.to_datetime(target_date)
nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0]
nearest_time = df.index[nearest_idx]
price = df.iloc[nearest_idx]['close']
return nearest_time, price
if __name__ == "__main__":
exit(main())
debug = True
parser = argparse.ArgumentParser(description="Run backtest with config file.")
parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.")
args = parser.parse_args()
# Use config_default.json as fallback if no config provided
config_file = args.config or "configs/config_default.json"
try:
with open(config_file, 'r') as f:
config = json.load(f)
print(f"Using config: {config_file}")
except FileNotFoundError:
print(f"Error: Config file '{config_file}' not found.")
print("Available configs: configs/config_default.json, configs/config_bbrs.json, configs/config_combined.json")
exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in config file '{config_file}': {e}")
exit(1)
start_date = config['start_date']
if config['stop_date'] is None:
stop_date = datetime.datetime.now().strftime("%Y-%m-%d")
else:
stop_date = config['stop_date']
initial_usd = config['initial_usd']
timeframes = config['timeframes']
timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
storage = Storage(logging=logging)
system_utils = SystemUtils(logging=logging)
data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date)
nearest_start_time, start_price = get_nearest_price(data_1min, start_date)
nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date)
metadata_lines = [
f"Start date\t{start_date}\tPrice\t{start_price}",
f"Stop date\t{stop_date}\tPrice\t{stop_price}",
f"Initial USD\t{initial_usd}"
]
# Create tasks for each timeframe
tasks = [
(name, data_1min, config)
for name in timeframes
]
if debug:
all_results_rows = []
all_trade_rows = []
for task in tasks:
results, trades = process(task, debug)
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
else:
workers = system_utils.get_optimal_workers()
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process, task, debug): task for task in tasks}
all_results_rows = []
all_trade_rows = []
for future in concurrent.futures.as_completed(futures):
results, trades = future.result()
if results or trades:
all_results_rows.extend(results)
all_trade_rows.extend(trades)
backtest_filename = os.path.join(f"{timestamp}_backtest.csv")
backtest_fieldnames = [
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
"max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd"
]
storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines)
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
storage.write_trades(all_trade_rows, trades_fieldnames)

View File

@ -5,7 +5,7 @@ import pandas as pd
import datetime
from cycles.utils.storage import Storage
from cycles.Analysis.bb_rsi import BollingerBandsStrategy
from cycles.Analysis.strategies import Strategy
logging.basicConfig(
level=logging.INFO,
@ -47,7 +47,7 @@ if __name__ == "__main__":
data = storage.load_data(config["data_file"], config["start_date"], config["stop_date"])
# Run strategy
strategy = BollingerBandsStrategy(config=config_strategy, logging=logging)
strategy = Strategy(config=config_strategy, logging=logging)
processed_data = strategy.run(data.copy(), config_strategy["strategy_name"])
# Get buy and sell signals