diff --git a/README.md b/README.md index 37dd47a..3528dc1 100644 --- a/README.md +++ b/README.md @@ -1,27 +1,25 @@ # Cycles - Advanced Trading Strategy Backtesting Framework -A Python framework for backtesting cryptocurrency trading strategies with multi-timeframe analysis, strategy combination, and advanced signal processing. **Recently refactored** for improved modularity, reusability, and maintainability. +A sophisticated Python framework for backtesting cryptocurrency trading strategies with multi-timeframe analysis, strategy combination, and advanced signal processing. -## โœจ Key Features +## Features -- **๐Ÿ—๏ธ Modular Architecture**: Clean separation of concerns with reusable components -- **๐Ÿ”ง Multi-Strategy System**: Combine multiple trading strategies with configurable weights and rules -- **โฑ๏ธ Multi-Timeframe Analysis**: Strategies operate on different timeframes (1min, 5min, 15min, 1h, etc.) -- **๐Ÿ“Š Advanced Strategies**: +- **Multi-Strategy Architecture**: Combine multiple trading strategies with configurable weights and rules +- **Multi-Timeframe Analysis**: Strategies can operate on different timeframes (1min, 5min, 15min, 1h, etc.) +- **Advanced Strategies**: - **Default Strategy**: Meta-trend analysis using multiple Supertrend indicators - **BBRS Strategy**: Bollinger Bands + RSI with market regime detection -- **๐ŸŽฏ Flexible Signal Combination**: Weighted consensus, majority voting, any/all combinations -- **โšก Precise Execution**: 1-minute precision for accurate risk management -- **๐Ÿ“ˆ Comprehensive Analysis**: Detailed performance metrics and trade analysis -- **๐Ÿ“ฑ Enhanced CLI**: Improved command-line interface with better error handling -- **๐Ÿ” Debug Mode**: Sequential execution with interactive plotting +- **Flexible Signal Combination**: Weighted consensus, majority voting, any/all combinations +- **Precise Stop-Loss**: 1-minute precision for accurate risk management +- **Comprehensive Backtesting**: Detailed performance metrics and trade analysis +- **Data Visualization**: Interactive charts and performance plots -## ๐Ÿš€ Quick Start +## Quick Start ### Prerequisites - Python 3.8+ -- [uv](https://github.com/astral-sh/uv) package manager (recommended) or pip +- [uv](https://github.com/astral-sh/uv) package manager (recommended) ### Installation @@ -30,7 +28,7 @@ A Python framework for backtesting cryptocurrency trading strategies with multi- git clone cd Cycles -# Install dependencies with uv (recommended) +# Install dependencies with uv uv sync # Or install with pip @@ -39,72 +37,40 @@ pip install -r requirements.txt ### Running Backtests -The new CLI provides a clean interface with automatic config discovery: +Use the `uv run` command to execute backtests with different configurations: ```bash -# Use default configuration -python main.py +# Run default strategy on 5-minute timeframe +uv run .\main.py .\configs\config_default_5min.json -# Use specific config (searches configs/ directory automatically) -python main.py config_bbrs.json +# Run default strategy on 15-minute timeframe +uv run .\main.py .\configs\config_default.json -# Debug mode with interactive plotting -python main.py --debug config_combined.json +# Run BBRS strategy with market regime detection +uv run .\main.py .\configs\config_bbrs.json -# Full path also supported -python main.py configs/config_default_5min.json - -# Get help -python main.py --help +# Run combined strategies +uv run .\main.py .\configs\config_combined.json ``` -### Available Configurations - -- **`config_default.json`**: Default meta-trend strategy (15min) -- **`config_default_5min.json`**: Default strategy on 5-minute timeframe -- **`config_bbrs.json`**: BBRS strategy with market regime detection -- **`config_bbrs_multi_timeframe.json`**: BBRS with multiple timeframes -- **`config_combined.json`**: Multi-strategy combination with weighted consensus - -## ๐Ÿ›๏ธ Architecture Overview - -The framework follows a **clean, modular architecture** for maximum reusability: - -``` -cycles/ -โ”œโ”€โ”€ application.py # ๐ŸŽฏ Main application orchestration -โ”œโ”€โ”€ utils/ -โ”‚ โ”œโ”€โ”€ config_manager.py # โš™๏ธ Centralized configuration management -โ”‚ โ”œโ”€โ”€ results_processor.py # ๐Ÿ“Š Results processing & metrics calculation -โ”‚ โ”œโ”€โ”€ backtest_runner.py # ๐Ÿš€ Backtest execution logic -โ”‚ โ”œโ”€โ”€ storage.py # ๐Ÿ’พ Data storage utilities -โ”‚ โ””โ”€โ”€ system.py # ๐Ÿ–ฅ๏ธ System utilities -โ”œโ”€โ”€ strategies/ # ๐Ÿ“ˆ Strategy implementations -โ”‚ โ”œโ”€โ”€ base.py # ๐Ÿ—๏ธ Base strategy framework -โ”‚ โ”œโ”€โ”€ default_strategy.py # ๐Ÿ“Š Meta-trend strategy -โ”‚ โ”œโ”€โ”€ bbrs_strategy.py # ๐ŸŽฏ BBRS strategy -โ”‚ โ””โ”€โ”€ manager.py # ๐ŸŽ›๏ธ Strategy orchestration -โ”œโ”€โ”€ backtest.py # โšก Core backtesting engine -โ””โ”€โ”€ charts.py # ๐Ÿ“ˆ Visualization components -``` - -## ๐Ÿ“ฑ Enhanced CLI Interface +### Configuration Examples +#### Default Strategy (5-minute timeframe) ```bash -python main.py [-h] [--debug] [config] - -# Examples: -python main.py # Use default config -python main.py config_bbrs.json # Use specific config -python main.py --debug config_combined.json # Debug mode with plotting - -# Available configs: -# - config_default.json: Default meta-trend strategy -# - config_bbrs.json: BBRS strategy -# - config_combined.json: Multi-strategy combination +uv run .\main.py .\configs\config_default_5min.json ``` -## โš™๏ธ Configuration +#### BBRS Strategy with Multi-timeframe Analysis +```bash +uv run .\main.py .\configs\config_bbrs_multi_timeframe.json +``` + +#### Combined Strategies with Weighted Consensus +```bash +uv run .\main.py .\configs\config_combined.json +``` + +## Configuration Strategies are configured using JSON files in the `configs/` directory: @@ -114,219 +80,98 @@ Strategies are configured using JSON files in the `configs/` directory: "stop_date": "2024-01-31", "initial_usd": 10000, "timeframes": ["15min"], + "stop_loss_pcts": [0.03, 0.05], "strategies": [ { "name": "default", "weight": 1.0, "params": { - "timeframe": "15min", - "stop_loss_pct": 0.03 + "timeframe": "15min" } } ], "combination_rules": { - "entry": "weighted_consensus", + "entry": "any", "exit": "any", - "min_confidence": 0.6 + "min_confidence": 0.5 } } ``` -### Strategy Configuration +### Available Strategies -#### Single Strategy -```json -{ - "strategies": [ - { - "name": "default", - "weight": 1.0, - "params": { - "timeframe": "15min", - "stop_loss_pct": 0.03 - } - } - ] -} -``` - -#### Multi-Strategy Combination -```json -{ - "strategies": [ - { - "name": "default", - "weight": 0.6, - "params": {"timeframe": "15min"} - }, - { - "name": "bbrs", - "weight": 0.4, - "params": {"bb_width": 0.05} - } - ], - "combination_rules": { - "entry": "weighted_consensus", - "exit": "any", - "min_confidence": 0.6 - } -} -``` +1. **Default Strategy**: Meta-trend analysis using Supertrend indicators +2. **BBRS Strategy**: Bollinger Bands + RSI with market regime detection ### Combination Rules -- **Entry Methods**: `any`, `all`, `majority`, `weighted_consensus` -- **Exit Methods**: `any`, `all`, `priority` (prioritizes stop-loss signals) -- **Min Confidence**: Threshold for signal acceptance (0.0 - 1.0) +- **Entry**: `any`, `all`, `majority`, `weighted_consensus` +- **Exit**: `any`, `all`, `priority` (prioritizes stop-loss signals) -## ๐Ÿ› ๏ธ Programmatic Usage +## Project Structure -The new modular architecture enables programmatic usage: - -```python -from cycles.application import BacktestApplication - -# Simple usage -app = BacktestApplication("configs/config_bbrs.json") -app.run(debug=False) - -# Custom workflow -from cycles.utils import ConfigManager, BacktestRunner - -config_manager = ConfigManager("my_config.json") -runner = BacktestRunner() - -for timeframe in config_manager.timeframes: - config = config_manager.get_timeframe_task_config(timeframe) - results = runner.run_single_timeframe(data, timeframe, config) - # Process results... +``` +Cycles/ +โ”œโ”€โ”€ configs/ # Configuration files +โ”œโ”€โ”€ cycles/ # Core framework +โ”‚ โ”œโ”€โ”€ strategies/ # Strategy implementation +โ”‚ โ”‚ โ”œโ”€โ”€ base.py # Base strategy classes +โ”‚ โ”‚ โ”œโ”€โ”€ default_strategy.py +โ”‚ โ”‚ โ”œโ”€โ”€ bbrs_strategy.py +โ”‚ โ”‚ โ””โ”€โ”€ manager.py # Strategy manager +โ”‚ โ”œโ”€โ”€ Analysis/ # Technical analysis +โ”‚ โ”œโ”€โ”€ utils/ # Utilities +โ”‚ โ””โ”€โ”€ charts.py # Visualization +โ”œโ”€โ”€ docs/ # Documentation +โ”œโ”€โ”€ data/ # Market data +โ”œโ”€โ”€ results/ # Backtest results +โ””โ”€โ”€ main.py # Main entry point ``` -## ๐Ÿ“ˆ Available Strategies +## Documentation -### 1. Default Strategy (Meta-Trend Analysis) -- **Type**: Meta-trend analysis using multiple Supertrend indicators -- **Timeframes**: Configurable (5min, 15min, 1h, etc.) -- **Features**: Triple Supertrend confirmation, precise stop-loss +Detailed documentation is available in the `docs/` directory: -```json -{ - "name": "default", - "params": { - "timeframe": "15min", - "stop_loss_pct": 0.03 - } -} +- **[Strategy Manager](./docs/strategy_manager.md)** - Multi-strategy orchestration and signal combination +- **[Strategies](./docs/strategies.md)** - Individual strategy implementations and usage +- **[Timeframe System](./docs/timeframe_system.md)** - Advanced timeframe management and multi-timeframe strategies +- **[Analysis](./docs/analysis.md)** - Technical analysis components +- **[Storage Utils](./docs/utils_storage.md)** - Data storage and retrieval +- **[System Utils](./docs/utils_system.md)** - System utilities + +## Examples + +### Single Strategy Backtest +```bash +# Test default strategy on different timeframes +uv run .\main.py .\configs\config_default.json # 15min +uv run .\main.py .\configs\config_default_5min.json # 5min ``` -### 2. BBRS Strategy (Bollinger Bands + RSI) -- **Type**: Bollinger Bands + RSI with market regime detection -- **Features**: Adaptive parameters, volume confirmation, multi-timeframe analysis -- **Market Regimes**: Trending vs sideways market detection - -```json -{ - "name": "bbrs", - "params": { - "bb_width": 0.05, - "strategy_name": "MarketRegimeStrategy", - "stop_loss_pct": 0.05 - } -} +### Multi-Strategy Backtest +```bash +# Combine multiple strategies with different weights +uv run .\main.py .\configs\config_combined.json ``` -## ๐Ÿ“Š Output & Results - -### Generated Files -- **`YYYY_MM_DD_HH_MM_backtest.csv`**: Performance summary per timeframe -- **`YYYY_MM_DD_HH_MM_trades.csv`**: Individual trade records -- **`backtest.log`**: Detailed execution logs - -### Debug Mode -- **Interactive Charts**: Real-time plotting of strategy signals and trades -- **Sequential Execution**: Step-by-step analysis -- **Enhanced Logging**: Detailed strategy information - -### Performance Metrics -- **Trade Statistics**: Win rate, profit ratio, drawdown analysis -- **Portfolio Metrics**: Initial/final USD, total returns, fees -- **Risk Metrics**: Maximum drawdown, stop-loss frequency - -## ๐Ÿ“š Documentation - -Comprehensive documentation available in the `docs/` directory: - -- **[๐Ÿ—๏ธ Refactoring Summary](./docs/refactoring_summary.md)** - Overview of new architecture improvements -- **[๐ŸŽ›๏ธ Strategy Manager](./docs/strategy_manager.md)** - Multi-strategy orchestration -- **[๐Ÿ“ˆ Strategies](./docs/strategies.md)** - Individual strategy implementations -- **[โฑ๏ธ Timeframe System](./docs/timeframe_system.md)** - Multi-timeframe management -- **[๐Ÿ“Š Analysis](./docs/analysis.md)** - Technical analysis components -- **[๐Ÿ’พ Storage Utils](./docs/utils_storage.md)** - Data management -- **[๐Ÿ–ฅ๏ธ System Utils](./docs/utils_system.md)** - System utilities - -## ๐ŸŽฏ Migration & Compatibility - -The refactored framework is **100% backward compatible**: - -- โœ… Same command-line interface -- โœ… Same configuration file format -- โœ… Same output file format -- โœ… Same functionality - -**Zero breaking changes** while providing a much cleaner, more maintainable architecture. - -## ๐Ÿ”ง Advanced Usage - -### Custom Strategy Development -```python -from cycles.strategies.base import StrategyBase, StrategySignal - -class MyStrategy(StrategyBase): - def get_timeframes(self): - return ["1h"] - - def initialize(self, backtester): - self._resample_data(backtester.original_df) - # Setup indicators... - self.initialized = True - - def get_entry_signal(self, backtester, df_index): - # Your entry logic... - return StrategySignal("ENTRY", confidence=0.8) +### Custom Configuration +Create your own configuration file and run: +```bash +uv run .\main.py .\configs\your_config.json ``` -### Custom Results Processing -```python -from cycles.utils import BacktestMetrics, ResultsProcessor +## Output -class CustomProcessor(ResultsProcessor): - def process_backtest_results(self, results, timeframe, config, summary): - # Custom processing logic... - return custom_summary, custom_trades -``` +Backtests generate: +- **CSV Results**: Detailed performance metrics per timeframe/strategy +- **Trade Log**: Individual trade records with entry/exit details +- **Performance Charts**: Visual analysis of strategy performance (in debug mode) +- **Log Files**: Detailed execution logs -## ๐Ÿš€ Future Enhancements - -The new architecture enables easy addition of: - -- **๐ŸŒ Web API Interface**: REST API for remote backtesting -- **โšก Real-time Trading**: Live trading integration -- **๐Ÿ“Š Advanced Analytics**: Enhanced performance analysis -- **๐Ÿ”Œ Strategy Marketplace**: Plugin system for custom strategies -- **๐Ÿ“ˆ Multiple Asset Classes**: Beyond cryptocurrency support - -## ๐Ÿค Contributing - -We welcome contributions! The new modular architecture makes it easier to: -- Add new strategies -- Enhance analysis capabilities -- Improve visualization -- Extend data sources - -## ๐Ÿ“„ License +## License [Add your license information here] ---- +## Contributing -**Recent Updates**: The framework has been significantly refactored for improved modularity and reusability while maintaining full backward compatibility. See [Refactoring Summary](./docs/refactoring_summary.md) for details. +[Add contributing guidelines here] diff --git a/cycles/Analysis/__init__.py b/cycles/Analysis/__init__.py index 65fb73e..e69de29 100644 --- a/cycles/Analysis/__init__.py +++ b/cycles/Analysis/__init__.py @@ -1,13 +0,0 @@ -""" -This module contains the analysis classes for the cycles project. -""" - -from .boillinger_band import BollingerBands -from .rsi import RSI -from .bb_rsi import BollingerBandsStrategy - - -__all__ = ["BollingerBands", "RSI", "BollingerBandsStrategy"] - -__version__ = "0.1.0" -__author__ = 'TCP Cycles Team' \ No newline at end of file diff --git a/cycles/application.py b/cycles/application.py deleted file mode 100644 index 386dba9..0000000 --- a/cycles/application.py +++ /dev/null @@ -1,214 +0,0 @@ -""" -Backtesting Application - -This module provides the main application class that orchestrates the entire -backtesting workflow. It coordinates configuration management, data loading, -backtest execution, and result output. -""" - -import logging -import datetime -import concurrent.futures -from pathlib import Path -from typing import Optional, List, Dict, Any - -from cycles.utils.storage import Storage -from cycles.utils.system import SystemUtils -from cycles.utils.config_manager import ConfigManager -from cycles.utils.results_processor import ResultsProcessor -from cycles.utils.backtest_runner import create_timeframe_tasks - - -class BacktestApplication: - """ - Main application class for coordinating backtesting workflow. - - Orchestrates configuration management, data loading, backtest execution, - and result output in a clean, modular way. - """ - - def __init__(self, config_path: Optional[str] = None): - """ - Initialize the backtesting application. - - Args: - config_path: Optional path to configuration file - """ - self.config_manager = ConfigManager(config_path) - self.storage = Storage(logging=logging) - self.system_utils = SystemUtils(logging=logging) - self.results_processor = ResultsProcessor() - - self.logger = logging.getLogger(__name__) - - def load_data(self): - """Load market data based on configuration.""" - self.logger.info("Loading market data...") - - data_1min = self.storage.load_data( - 'btcusd_1-min_data.csv', - self.config_manager.start_date, - self.config_manager.stop_date - ) - - self.logger.info(f"Loaded {len(data_1min)} rows of 1-minute data") - return data_1min - - def create_tasks(self, data_1min) -> List: - """Create backtest tasks from configuration.""" - self.logger.info("Creating backtest tasks...") - - tasks = create_timeframe_tasks( - self.config_manager.timeframes, - data_1min, - self.config_manager - ) - - self.logger.info(f"Created {len(tasks)} backtest tasks") - return tasks - - def execute_tasks(self, tasks: List, debug: bool = False) -> tuple: - """ - Execute backtest tasks. - - Args: - tasks: List of TimeframeTask objects - debug: Whether to run in debug mode (sequential with plotting) - - Returns: - Tuple of (results_rows, trade_rows) - """ - if debug: - return self._execute_tasks_debug(tasks) - else: - return self._execute_tasks_parallel(tasks) - - def _execute_tasks_debug(self, tasks: List) -> tuple: - """Execute tasks in debug mode (sequential).""" - self.logger.info("Executing tasks in debug mode (sequential)") - - all_results_rows = [] - all_trade_rows = [] - - for task in tasks: - self.logger.info(f"Processing timeframe: {task.timeframe}") - results, trades = task.execute(debug=True) - - if results: - all_results_rows.append(results) - if trades: - all_trade_rows.extend(trades) - - return all_results_rows, all_trade_rows - - def _execute_tasks_parallel(self, tasks: List) -> tuple: - """Execute tasks in parallel.""" - workers = self.system_utils.get_optimal_workers() - self.logger.info(f"Executing tasks in parallel with {workers} workers") - - all_results_rows = [] - all_trade_rows = [] - - with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: - # Submit all tasks - futures = { - executor.submit(task.execute, False): task - for task in tasks - } - - # Collect results - for future in concurrent.futures.as_completed(futures): - task = futures[future] - try: - results, trades = future.result() - - if results: - all_results_rows.append(results) - if trades: - all_trade_rows.extend(trades) - - self.logger.info(f"Completed timeframe: {task.timeframe}") - - except Exception as e: - self.logger.error(f"Task failed for timeframe {task.timeframe}: {e}") - - return all_results_rows, all_trade_rows - - def save_results(self, results_rows: List[Dict[str, Any]], trade_rows: List[Dict[str, Any]], - data_1min) -> None: - """ - Save backtest results to files. - - Args: - results_rows: List of result summary rows - trade_rows: List of individual trade rows - data_1min: Original 1-minute data for metadata - """ - timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M") - - # Create metadata - metadata_lines = self.results_processor.create_metadata_lines( - self.config_manager, data_1min - ) - - # Save backtest results - backtest_filename = f"{timestamp}_backtest.csv" - backtest_fieldnames = [ - "timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", - "max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd" - ] - - self.storage.write_backtest_results( - backtest_filename, backtest_fieldnames, results_rows, metadata_lines - ) - - # Save trade details - trades_fieldnames = [ - "entry_time", "exit_time", "entry_price", "exit_price", - "profit_pct", "type", "fee_usd" - ] - - self.storage.write_trades(trade_rows, trades_fieldnames) - - self.logger.info(f"Results saved to {backtest_filename}") - - def run(self, debug: bool = False) -> None: - """ - Run the complete backtesting workflow. - - Args: - debug: Whether to run in debug mode - """ - try: - self.logger.info("Starting backtesting workflow") - self.logger.info(f"Configuration: {self.config_manager}") - - # Load data - data_1min = self.load_data() - - # Create and execute tasks - tasks = self.create_tasks(data_1min) - results_rows, trade_rows = self.execute_tasks(tasks, debug) - - # Save results - if results_rows or trade_rows: - self.save_results(results_rows, trade_rows, data_1min) - self.logger.info("Backtesting workflow completed successfully") - else: - self.logger.warning("No results generated") - - except Exception as e: - self.logger.error(f"Backtesting workflow failed: {e}") - raise - - -def setup_logging() -> None: - """Setup application logging configuration.""" - logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s", - handlers=[ - logging.FileHandler("backtest.log"), - logging.StreamHandler() - ] - ) \ No newline at end of file diff --git a/cycles/utils/__init__.py b/cycles/utils/__init__.py index bbbed92..e69de29 100644 --- a/cycles/utils/__init__.py +++ b/cycles/utils/__init__.py @@ -1,23 +0,0 @@ -""" -Utilities Module - -This module provides utility classes and functions for the backtesting framework. -""" - -from .storage import Storage -from .system import SystemUtils -from .data_utils import * -from .config_manager import ConfigManager -from .results_processor import ResultsProcessor, BacktestMetrics -from .backtest_runner import BacktestRunner, TimeframeTask, create_timeframe_tasks - -__all__ = [ - 'Storage', - 'SystemUtils', - 'ConfigManager', - 'ResultsProcessor', - 'BacktestMetrics', - 'BacktestRunner', - 'TimeframeTask', - 'create_timeframe_tasks' -] diff --git a/cycles/utils/backtest_runner.py b/cycles/utils/backtest_runner.py deleted file mode 100644 index 01150e5..0000000 --- a/cycles/utils/backtest_runner.py +++ /dev/null @@ -1,224 +0,0 @@ -""" -Backtest Runner - -This module provides a high-level interface for running backtests with strategy -management. It encapsulates the backtesting workflow and provides a clean -interface for executing tests across different configurations. -""" - -import pandas as pd -import logging -from typing import Dict, List, Tuple, Any, Optional - -from cycles.backtest import Backtest -from cycles.charts import BacktestCharts -from cycles.strategies import create_strategy_manager -from .results_processor import ResultsProcessor - - -class BacktestRunner: - """ - High-level backtest execution interface. - - Encapsulates the backtesting workflow, strategy management, and result - processing into a clean, reusable interface. - """ - - def __init__(self, results_processor: Optional[ResultsProcessor] = None): - """ - Initialize backtest runner. - - Args: - results_processor: Optional results processor instance - """ - self.logger = logging.getLogger(__name__) - self.results_processor = results_processor or ResultsProcessor() - - def run_single_timeframe(self, data_1min: pd.DataFrame, timeframe: str, - config: Dict[str, Any], debug: bool = False) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: - """ - Run backtest for a single timeframe configuration. - - Args: - data_1min: 1-minute OHLCV data - timeframe: Timeframe identifier - config: Configuration dictionary - debug: Whether to enable debug mode - - Returns: - Tuple[Dict, List]: (summary_row, trade_rows) - """ - try: - # Create and initialize strategy manager - strategy_manager = self._create_strategy_manager(config) - - # Setup backtester with appropriate data - backtester = self._setup_backtester(data_1min, strategy_manager, config) - - # Run backtest - results = self._execute_backtest(backtester, debug) - - # Process results - strategy_summary = strategy_manager.get_strategy_summary() - summary_row, trade_rows = self.results_processor.process_backtest_results( - results, timeframe, config, strategy_summary - ) - - # Handle debug plotting - if debug: - self._handle_debug_plotting(backtester, results) - - return summary_row, trade_rows - - except Exception as e: - self.logger.error(f"Backtest failed for timeframe {timeframe}: {e}") - raise - - def _create_strategy_manager(self, config: Dict[str, Any]): - """Create and validate strategy manager from configuration.""" - strategy_config = { - "strategies": config['strategies'], - "combination_rules": config['combination_rules'] - } - - if not strategy_config['strategies']: - raise ValueError("No strategy configuration provided") - - return create_strategy_manager(strategy_config) - - def _setup_backtester(self, data_1min: pd.DataFrame, strategy_manager, config: Dict[str, Any]) -> Backtest: - """Setup backtester with appropriate data and strategy manager.""" - # Get primary strategy for backtester setup - primary_strategy = strategy_manager.strategies[0] - - # Determine working dataframe based on strategy type - if primary_strategy.name == "bbrs": - # BBRS strategy processes 1-minute data and handles internal resampling - working_df = data_1min.copy() - else: - # Other strategies specify their preferred timeframe - primary_strategy._resample_data(data_1min) - working_df = primary_strategy.get_primary_timeframe_data() - - # Prepare working dataframe for backtester - working_df_for_backtest = working_df.copy().reset_index() - if 'index' in working_df_for_backtest.columns: - working_df_for_backtest = working_df_for_backtest.rename(columns={'index': 'timestamp'}) - - # Initialize backtest - backtester = Backtest( - config['initial_usd'], - working_df_for_backtest, - working_df_for_backtest, - self._strategy_manager_init - ) - - # Store original data and attach strategy manager - backtester.original_df = data_1min - backtester.strategy_manager = strategy_manager - - # Initialize strategy manager - strategy_manager.initialize(backtester) - - return backtester - - def _execute_backtest(self, backtester: Backtest, debug: bool = False) -> Dict[str, Any]: - """Execute the backtest using strategy manager functions.""" - return backtester.run( - self._strategy_manager_entry, - self._strategy_manager_exit, - debug - ) - - def _handle_debug_plotting(self, backtester: Backtest, results: Dict[str, Any]) -> None: - """Handle debug plotting if enabled.""" - try: - # Check if any strategy has processed_data for universal plotting - processed_data = None - for strategy in backtester.strategy_manager.strategies: - if hasattr(backtester, 'processed_data') and backtester.processed_data is not None: - processed_data = backtester.processed_data - break - - if processed_data is not None and not processed_data.empty: - # Format strategy data with actual executed trades for universal plotting - formatted_data = BacktestCharts.format_strategy_data_with_trades(processed_data, results) - # Plot using universal function - BacktestCharts.plot_data(formatted_data) - else: - # Fallback to meta_trend plot if available - if "meta_trend" in backtester.strategies: - meta_trend = backtester.strategies["meta_trend"] - working_df = backtester.df.set_index('timestamp') - BacktestCharts.plot(working_df, meta_trend) - else: - self.logger.info("No plotting data available") - except Exception as e: - self.logger.warning(f"Plotting failed: {e}") - - # Strategy manager interface functions - @staticmethod - def _strategy_manager_init(backtester: Backtest): - """Strategy Manager initialization function.""" - # Actual initialization happens in strategy_manager.initialize() - pass - - @staticmethod - def _strategy_manager_entry(backtester: Backtest, df_index: int) -> bool: - """Strategy Manager entry function.""" - return backtester.strategy_manager.get_entry_signal(backtester, df_index) - - @staticmethod - def _strategy_manager_exit(backtester: Backtest, df_index: int) -> Tuple[Optional[str], Optional[float]]: - """Strategy Manager exit function.""" - return backtester.strategy_manager.get_exit_signal(backtester, df_index) - - -class TimeframeTask: - """Encapsulates a single timeframe backtest task.""" - - def __init__(self, timeframe: str, data_1min: pd.DataFrame, config: Dict[str, Any]): - """ - Initialize timeframe task. - - Args: - timeframe: Timeframe identifier - data_1min: 1-minute OHLCV data - config: Configuration for this task - """ - self.timeframe = timeframe - self.data_1min = data_1min - self.config = config - - def execute(self, debug: bool = False) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: - """ - Execute the timeframe task. - - Args: - debug: Whether to enable debug mode - - Returns: - Tuple[Dict, List]: (summary_row, trade_rows) - """ - runner = BacktestRunner() - return runner.run_single_timeframe(self.data_1min, self.timeframe, self.config, debug) - - -def create_timeframe_tasks(timeframes: List[str], data_1min: pd.DataFrame, - config_manager) -> List[TimeframeTask]: - """ - Create timeframe tasks from configuration. - - Args: - timeframes: List of timeframes to test - data_1min: 1-minute OHLCV data - config_manager: Configuration manager instance - - Returns: - List[TimeframeTask]: List of timeframe tasks - """ - tasks = [] - for timeframe in timeframes: - task_config = config_manager.get_timeframe_task_config(timeframe) - tasks.append(TimeframeTask(timeframe, data_1min, task_config)) - return tasks \ No newline at end of file diff --git a/cycles/utils/config_manager.py b/cycles/utils/config_manager.py deleted file mode 100644 index 02d5e2d..0000000 --- a/cycles/utils/config_manager.py +++ /dev/null @@ -1,129 +0,0 @@ -""" -Configuration Manager - -This module provides centralized configuration handling for the backtesting system. -It handles loading, validation, and provides a clean interface for accessing -configuration data. -""" - -import json -import datetime -import logging -from typing import Dict, List, Optional, Any -from pathlib import Path - - -class ConfigManager: - """ - Manages configuration loading, validation, and access. - - Provides a centralized way to handle configuration files with validation - and convenient access methods. - """ - - def __init__(self, config_path: Optional[str] = None): - """ - Initialize configuration manager. - - Args: - config_path: Path to configuration file. If None, uses default. - """ - self.config_path = config_path or "configs/config_default.json" - self.config = self._load_config() - self._validate_config() - - def _load_config(self) -> Dict[str, Any]: - """Load configuration from file.""" - try: - with open(self.config_path, 'r') as f: - config = json.load(f) - logging.info(f"Loaded configuration from: {self.config_path}") - return config - except FileNotFoundError: - available_configs = list(Path("configs").glob("*.json")) - raise FileNotFoundError( - f"Config file '{self.config_path}' not found. " - f"Available configs: {[str(c) for c in available_configs]}" - ) - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON in config file '{self.config_path}': {e}") - - def _validate_config(self) -> None: - """Validate configuration structure and values.""" - required_fields = ['start_date', 'initial_usd', 'timeframes', 'strategies'] - - for field in required_fields: - if field not in self.config: - raise ValueError(f"Missing required field '{field}' in configuration") - - # Validate strategies - if not self.config['strategies']: - raise ValueError("At least one strategy must be specified") - - for strategy in self.config['strategies']: - if 'name' not in strategy: - raise ValueError("Strategy must have a 'name' field") - - # Validate timeframes - if not self.config['timeframes']: - raise ValueError("At least one timeframe must be specified") - - logging.info("Configuration validation successful") - - @property - def start_date(self) -> str: - """Get start date.""" - return self.config['start_date'] - - @property - def stop_date(self) -> str: - """Get stop date, defaulting to current date if None.""" - stop_date = self.config.get('stop_date') - if stop_date is None: - return datetime.datetime.now().strftime("%Y-%m-%d") - return stop_date - - @property - def initial_usd(self) -> float: - """Get initial USD amount.""" - return self.config['initial_usd'] - - @property - def timeframes(self) -> List[str]: - """Get list of timeframes to test.""" - return self.config['timeframes'] - - @property - def strategies_config(self) -> List[Dict[str, Any]]: - """Get strategies configuration.""" - return self.config['strategies'] - - @property - def combination_rules(self) -> Dict[str, Any]: - """Get combination rules for strategy manager.""" - return self.config.get('combination_rules', { - "entry": "any", - "exit": "any", - "min_confidence": 0.5 - }) - - def get_strategy_manager_config(self) -> Dict[str, Any]: - """Get configuration for strategy manager.""" - return { - "strategies": self.strategies_config, - "combination_rules": self.combination_rules - } - - def get_timeframe_task_config(self, timeframe: str) -> Dict[str, Any]: - """Get configuration for a specific timeframe task.""" - return { - "initial_usd": self.initial_usd, - "strategies": self.strategies_config, - "combination_rules": self.combination_rules - } - - def __repr__(self) -> str: - """String representation of configuration.""" - return (f"ConfigManager(config_path='{self.config_path}', " - f"strategies={len(self.strategies_config)}, " - f"timeframes={len(self.timeframes)})") \ No newline at end of file diff --git a/cycles/utils/results_processor.py b/cycles/utils/results_processor.py deleted file mode 100644 index 955617a..0000000 --- a/cycles/utils/results_processor.py +++ /dev/null @@ -1,239 +0,0 @@ -""" -Results Processor - -This module handles processing, aggregation, and analysis of backtest results. -It provides utilities for calculating metrics, aggregating results across -timeframes, and formatting output data. -""" - -import pandas as pd -import numpy as np -import logging -from typing import Dict, List, Tuple, Any, Optional -from collections import defaultdict - - -class BacktestMetrics: - """Container for backtest metrics calculation.""" - - @staticmethod - def calculate_trade_metrics(trades: List[Dict[str, Any]]) -> Dict[str, float]: - """Calculate trade-level metrics.""" - if not trades: - return { - "n_trades": 0, - "n_winning_trades": 0, - "win_rate": 0.0, - "total_profit": 0.0, - "total_loss": 0.0, - "avg_trade": 0.0, - "profit_ratio": 0.0, - "max_drawdown": 0.0 - } - - n_trades = len(trades) - wins = [t for t in trades if t.get('exit') and t['exit'] > t['entry']] - n_winning_trades = len(wins) - win_rate = n_winning_trades / n_trades if n_trades > 0 else 0 - - total_profit = sum(trade['profit_pct'] for trade in trades) - total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0) - avg_trade = total_profit / n_trades if n_trades > 0 else 0 - profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf') - - # Calculate max drawdown - cumulative_profit = 0 - max_drawdown = 0 - peak = 0 - - for trade in trades: - cumulative_profit += trade['profit_pct'] - if cumulative_profit > peak: - peak = cumulative_profit - drawdown = peak - cumulative_profit - if drawdown > max_drawdown: - max_drawdown = drawdown - - return { - "n_trades": n_trades, - "n_winning_trades": n_winning_trades, - "win_rate": win_rate, - "total_profit": total_profit, - "total_loss": total_loss, - "avg_trade": avg_trade, - "profit_ratio": profit_ratio, - "max_drawdown": max_drawdown - } - - @staticmethod - def calculate_portfolio_metrics(trades: List[Dict[str, Any]], initial_usd: float) -> Dict[str, float]: - """Calculate portfolio-level metrics.""" - final_usd = initial_usd - for trade in trades: - final_usd *= (1 + trade['profit_pct']) - - total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades) - - return { - "initial_usd": initial_usd, - "final_usd": final_usd, - "total_fees_usd": total_fees_usd, - "total_return": (final_usd - initial_usd) / initial_usd - } - - -class ResultsProcessor: - """ - Processes and aggregates backtest results. - - Handles result processing, metric calculation, and aggregation across - multiple timeframes and configurations. - """ - - def __init__(self): - """Initialize results processor.""" - self.logger = logging.getLogger(__name__) - - def process_backtest_results(self, results: Dict[str, Any], timeframe: str, - config: Dict[str, Any], strategy_summary: Dict[str, Any]) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: - """ - Process results from a single backtest run. - - Args: - results: Raw backtest results - timeframe: Timeframe identifier - config: Configuration used for the test - strategy_summary: Summary of strategies used - - Returns: - Tuple[Dict, List]: (summary_row, trade_rows) - """ - trades = results.get('trades', []) - initial_usd = config['initial_usd'] - - # Calculate metrics - trade_metrics = BacktestMetrics.calculate_trade_metrics(trades) - portfolio_metrics = BacktestMetrics.calculate_portfolio_metrics(trades, initial_usd) - - # Get primary strategy info for reporting - primary_strategy = strategy_summary['strategies'][0] if strategy_summary['strategies'] else {} - primary_timeframe = primary_strategy.get('timeframes', ['unknown'])[0] - stop_loss_pct = primary_strategy.get('params', {}).get('stop_loss_pct', 'N/A') - - # Create summary row - summary_row = { - "timeframe": f"{timeframe}({primary_timeframe})", - "stop_loss_pct": stop_loss_pct, - "n_stop_loss": sum(1 for trade in trades if trade.get('type') == 'STOP_LOSS'), - **trade_metrics, - **portfolio_metrics - } - - # Create trade rows - trade_rows = [] - for trade in trades: - trade_rows.append({ - "timeframe": f"{timeframe}({primary_timeframe})", - "stop_loss_pct": stop_loss_pct, - "entry_time": trade.get("entry_time"), - "exit_time": trade.get("exit_time"), - "entry_price": trade.get("entry"), - "exit_price": trade.get("exit"), - "profit_pct": trade.get("profit_pct"), - "type": trade.get("type"), - "fee_usd": trade.get("fee_usd"), - }) - - # Log results - strategy_names = [s['name'] for s in strategy_summary['strategies']] - self.logger.info( - f"Timeframe: {timeframe}({primary_timeframe}), Stop Loss: {stop_loss_pct}, " - f"Trades: {trade_metrics['n_trades']}, Strategies: {strategy_names}" - ) - - return summary_row, trade_rows - - def aggregate_results(self, all_rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """ - Aggregate results per timeframe and stop_loss_pct. - - Args: - all_rows: List of result rows to aggregate - - Returns: - List[Dict]: Aggregated summary rows - """ - grouped = defaultdict(list) - for row in all_rows: - key = (row['timeframe'], row['stop_loss_pct']) - grouped[key].append(row) - - summary_rows = [] - for (timeframe, stop_loss_pct), rows in grouped.items(): - if not rows: - continue - - # Aggregate metrics - total_trades = sum(r['n_trades'] for r in rows) - total_stop_loss = sum(r['n_stop_loss'] for r in rows) - - # Average metrics across runs - avg_win_rate = np.mean([r['win_rate'] for r in rows]) - avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows]) - avg_avg_trade = np.mean([r['avg_trade'] for r in rows]) - avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows if r['profit_ratio'] != float('inf')]) - - # Portfolio metrics - initial_usd = rows[0]['initial_usd'] # Should be same for all - avg_final_usd = np.mean([r['final_usd'] for r in rows]) - avg_total_fees_usd = np.mean([r['total_fees_usd'] for r in rows]) - - summary_rows.append({ - "timeframe": timeframe, - "stop_loss_pct": stop_loss_pct, - "n_trades": total_trades, - "n_stop_loss": total_stop_loss, - "win_rate": avg_win_rate, - "max_drawdown": avg_max_drawdown, - "avg_trade": avg_avg_trade, - "profit_ratio": avg_profit_ratio, - "initial_usd": initial_usd, - "final_usd": avg_final_usd, - "total_fees_usd": avg_total_fees_usd, - }) - - return summary_rows - - def create_metadata_lines(self, config_manager, data_1min: pd.DataFrame) -> List[str]: - """ - Create metadata lines for result files. - - Args: - config_manager: Configuration manager instance - data_1min: 1-minute data for price lookups - - Returns: - List[str]: Metadata lines - """ - start_date = config_manager.start_date - stop_date = config_manager.stop_date - initial_usd = config_manager.initial_usd - - def get_nearest_price(df: pd.DataFrame, target_date: str) -> Tuple[Optional[str], Optional[float]]: - """Get nearest price for a given date.""" - if len(df) == 0: - return None, None - target_ts = pd.to_datetime(target_date) - nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0] - nearest_time = df.index[nearest_idx] - price = df.iloc[nearest_idx]['close'] - return str(nearest_time), price - - nearest_start_time, start_price = get_nearest_price(data_1min, start_date) - nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date) - - return [ - f"Start date\t{start_date}\tPrice\t{start_price}", - f"Stop date\t{stop_date}\tPrice\t{stop_price}", - f"Initial USD\t{initial_usd}" - ] \ No newline at end of file diff --git a/docs/new_architecture.md b/docs/new_architecture.md deleted file mode 100644 index b950d61..0000000 --- a/docs/new_architecture.md +++ /dev/null @@ -1,452 +0,0 @@ -# Architecture Components Documentation - -## Overview - -The Cycles framework has been refactored into a modular architecture with specialized components for different aspects of the backtesting workflow. This document provides detailed information about the new architectural components and how to use them. - -## ๐Ÿ—๏ธ Component Architecture - -``` -New Components: -โ”œโ”€โ”€ ๐ŸŽฏ BacktestApplication # Main workflow orchestration -โ”œโ”€โ”€ โš™๏ธ ConfigManager # Configuration management -โ”œโ”€โ”€ ๐Ÿ“Š ResultsProcessor # Results processing & metrics -โ”œโ”€โ”€ ๐Ÿš€ BacktestRunner # Backtest execution logic -โ””โ”€โ”€ ๐Ÿ“ฆ TimeframeTask # Individual task encapsulation -``` - ---- - -## โš™๏ธ ConfigManager - -**Purpose**: Centralized configuration loading, validation, and access - -### Features -- **Automatic Validation**: Validates configuration structure and required fields -- **Type-Safe Access**: Property-based access to configuration values -- **Smart Defaults**: Automatic fallbacks (e.g., current date for stop_date) -- **Reusable Configs**: Generate configurations for different components - -### Basic Usage - -```python -from cycles.utils import ConfigManager - -# Initialize with config file -config_manager = ConfigManager("configs/config_bbrs.json") - -# Access configuration properties -start_date = config_manager.start_date -initial_usd = config_manager.initial_usd -timeframes = config_manager.timeframes - -# Get specialized configurations -strategy_config = config_manager.get_strategy_manager_config() -task_config = config_manager.get_timeframe_task_config("15min") -``` - -### Configuration Properties - -| Property | Type | Description | -|----------|------|-------------| -| `start_date` | `str` | Backtest start date | -| `stop_date` | `str` | Backtest end date (auto-defaults to current) | -| `initial_usd` | `float` | Initial portfolio value | -| `timeframes` | `List[str]` | List of timeframes to test | -| `strategies_config` | `List[Dict]` | Strategy configurations | -| `combination_rules` | `Dict` | Signal combination rules | - -### Configuration Methods - -```python -# Get strategy manager configuration -strategy_config = config_manager.get_strategy_manager_config() -# Returns: {"strategies": [...], "combination_rules": {...}} - -# Get timeframe-specific task configuration -task_config = config_manager.get_timeframe_task_config("15min") -# Returns: {"initial_usd": 10000, "strategies": [...], "combination_rules": {...}} -``` - -### Error Handling - -```python -try: - config_manager = ConfigManager("invalid_config.json") -except FileNotFoundError as e: - print(f"Config file not found: {e}") -except ValueError as e: - print(f"Invalid configuration: {e}") -``` - ---- - -## ๐Ÿ“Š ResultsProcessor & BacktestMetrics - -**Purpose**: Unified processing, aggregation, and analysis of backtest results - -### BacktestMetrics (Static Utilities) - -```python -from cycles.utils import BacktestMetrics - -# Calculate trade-level metrics -trades = [{"profit_pct": 0.05}, {"profit_pct": -0.02}] -trade_metrics = BacktestMetrics.calculate_trade_metrics(trades) -# Returns: {n_trades, win_rate, max_drawdown, avg_trade, ...} - -# Calculate portfolio-level metrics -portfolio_metrics = BacktestMetrics.calculate_portfolio_metrics(trades, 10000) -# Returns: {initial_usd, final_usd, total_fees_usd, total_return} -``` - -### ResultsProcessor (Instance-Based) - -```python -from cycles.utils import ResultsProcessor - -processor = ResultsProcessor() - -# Process single backtest results -summary_row, trade_rows = processor.process_backtest_results( - results=backtest_results, - timeframe="15min", - config=task_config, - strategy_summary=strategy_summary -) - -# Aggregate multiple results -aggregated = processor.aggregate_results(all_result_rows) - -# Create metadata for output files -metadata_lines = processor.create_metadata_lines(config_manager, data_1min) -``` - -### Available Metrics - -#### Trade Metrics -- `n_trades`: Total number of trades -- `n_winning_trades`: Number of profitable trades -- `win_rate`: Percentage of winning trades -- `total_profit`: Sum of all profitable trades -- `total_loss`: Sum of all losing trades -- `avg_trade`: Average trade return -- `profit_ratio`: Ratio of total profit to total loss -- `max_drawdown`: Maximum portfolio drawdown - -#### Portfolio Metrics -- `initial_usd`: Starting portfolio value -- `final_usd`: Ending portfolio value -- `total_fees_usd`: Total trading fees -- `total_return`: Overall portfolio return percentage - ---- - -## ๐Ÿš€ BacktestRunner & TimeframeTask - -**Purpose**: Modular backtest execution with clean separation of concerns - -### BacktestRunner - -```python -from cycles.utils import BacktestRunner - -runner = BacktestRunner() - -# Run single timeframe backtest -summary_row, trade_rows = runner.run_single_timeframe( - data_1min=market_data, - timeframe="15min", - config=task_config, - debug=False -) -``` - -#### BacktestRunner Methods - -| Method | Purpose | Returns | -|--------|---------|---------| -| `run_single_timeframe()` | Execute backtest for one timeframe | `(summary_row, trade_rows)` | -| `_create_strategy_manager()` | Create strategy manager from config | `StrategyManager` | -| `_setup_backtester()` | Setup backtester with data and strategies | `Backtest` | -| `_execute_backtest()` | Run the backtest | `Dict[results]` | -| `_handle_debug_plotting()` | Handle debug mode plotting | `None` | - -### TimeframeTask - -**Purpose**: Encapsulates a single timeframe backtest task for easy execution and parallelization - -```python -from cycles.utils import TimeframeTask, create_timeframe_tasks - -# Create individual task -task = TimeframeTask( - timeframe="15min", - data_1min=market_data, - config=task_config -) - -# Execute task -summary_row, trade_rows = task.execute(debug=False) - -# Create multiple tasks from configuration -tasks = create_timeframe_tasks( - timeframes=["5min", "15min", "1h"], - data_1min=market_data, - config_manager=config_manager -) - -# Execute all tasks -for task in tasks: - results = task.execute() -``` - -### Parallel Execution - -```python -import concurrent.futures -from cycles.utils import create_timeframe_tasks - -# Create tasks -tasks = create_timeframe_tasks(timeframes, data_1min, config_manager) - -# Execute in parallel -with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: - futures = {executor.submit(task.execute, False): task for task in tasks} - - for future in concurrent.futures.as_completed(futures): - task = futures[future] - try: - summary_row, trade_rows = future.result() - print(f"Completed: {task.timeframe}") - except Exception as e: - print(f"Failed: {task.timeframe} - {e}") -``` - ---- - -## ๐ŸŽฏ BacktestApplication - -**Purpose**: Main application orchestration class that coordinates the entire workflow - -### Complete Workflow - -```python -from cycles.application import BacktestApplication - -# Simple usage -app = BacktestApplication("configs/config_combined.json") -app.run(debug=False) -``` - -### Workflow Steps - -```python -class BacktestApplication: - def run(self, debug=False): - # 1. Load data - data_1min = self.load_data() - - # 2. Create tasks - tasks = self.create_tasks(data_1min) - - # 3. Execute tasks (parallel or debug mode) - results_rows, trade_rows = self.execute_tasks(tasks, debug) - - # 4. Save results - self.save_results(results_rows, trade_rows, data_1min) -``` - -### Custom Application - -```python -from cycles.application import BacktestApplication - -class CustomBacktestApplication(BacktestApplication): - def execute_tasks(self, tasks, debug=False): - # Custom execution logic - # Maybe with custom progress tracking - results = [] - for i, task in enumerate(tasks): - print(f"Processing {i+1}/{len(tasks)}: {task.timeframe}") - result = task.execute(debug) - results.append(result) - return results - - def save_results(self, results_rows, trade_rows, data_1min): - # Custom result saving - super().save_results(results_rows, trade_rows, data_1min) - # Additional custom processing - self.send_email_notification(results_rows) - -# Usage -app = CustomBacktestApplication("config.json") -app.run() -``` - ---- - -## ๐Ÿ”ง Component Integration Examples - -### Simple Integration - -```python -# All-in-one approach -from cycles.application import BacktestApplication - -app = BacktestApplication("config.json") -app.run(debug=False) -``` - -### Modular Integration - -```python -# Step-by-step approach using individual components -from cycles.utils import ConfigManager, BacktestRunner, ResultsProcessor - -# 1. Configuration -config_manager = ConfigManager("config.json") - -# 2. Data loading (using existing storage utilities) -from cycles.utils import Storage -storage = Storage() -data_1min = storage.load_data('btcusd_1-min_data.csv', - config_manager.start_date, - config_manager.stop_date) - -# 3. Execution -runner = BacktestRunner() -all_results = [] - -for timeframe in config_manager.timeframes: - task_config = config_manager.get_timeframe_task_config(timeframe) - summary_row, trade_rows = runner.run_single_timeframe( - data_1min, timeframe, task_config - ) - all_results.extend(trade_rows) - -# 4. Processing -processor = ResultsProcessor() -final_results = processor.aggregate_results(all_results) -``` - -### Custom Workflow - -```python -# Custom workflow for specific needs -from cycles.utils import ConfigManager, BacktestRunner - -config_manager = ConfigManager("config.json") -runner = BacktestRunner() - -# Custom data preparation -custom_data = prepare_custom_data(config_manager.start_date) - -# Custom configuration modification -for strategy in config_manager.strategies_config: - if strategy['name'] == 'default': - strategy['params']['stop_loss_pct'] = 0.02 # Custom stop loss - -# Custom execution with monitoring -for timeframe in config_manager.timeframes: - print(f"Starting backtest for {timeframe}") - config = config_manager.get_timeframe_task_config(timeframe) - - try: - results = runner.run_single_timeframe(custom_data, timeframe, config) - process_custom_results(results, timeframe) - except Exception as e: - handle_custom_error(e, timeframe) -``` - ---- - -## ๐ŸŽจ Extension Points - -### Custom Configuration Manager - -```python -from cycles.utils import ConfigManager - -class CustomConfigManager(ConfigManager): - def _validate_config(self): - super()._validate_config() - # Additional custom validation - if self.config.get('custom_field') is None: - raise ValueError("Custom field is required") - - @property - def custom_setting(self): - return self.config.get('custom_field', 'default_value') -``` - -### Custom Results Processor - -```python -from cycles.utils import ResultsProcessor - -class CustomResultsProcessor(ResultsProcessor): - def process_backtest_results(self, results, timeframe, config, strategy_summary): - # Call parent method - summary_row, trade_rows = super().process_backtest_results( - results, timeframe, config, strategy_summary - ) - - # Add custom metrics - summary_row['custom_metric'] = self.calculate_custom_metric(trade_rows) - - return summary_row, trade_rows - - def calculate_custom_metric(self, trades): - # Custom metric calculation - return sum(t['profit_pct'] for t in trades if t['profit_pct'] > 0.05) -``` - ---- - -## ๐Ÿš€ Migration Guide - -### From Old main.py - -**Before (Old main.py)**: -```python -# Scattered configuration -config_file = args.config or "configs/config_default.json" -with open(config_file, 'r') as f: - config = json.load(f) - -# Complex processing function -results = process_timeframe_data(data_1min, timeframe, config, debug) - -# Manual result aggregation -all_results = [] -for task in tasks: - results = process(task, debug) - all_results.extend(results) -``` - -**After (New Architecture)**: -```python -# Clean application approach -from cycles.application import BacktestApplication -app = BacktestApplication(config_file) -app.run(debug=debug) - -# Or modular approach -from cycles.utils import ConfigManager, BacktestRunner -config_manager = ConfigManager(config_file) -runner = BacktestRunner() -results = runner.run_single_timeframe(data, timeframe, config) -``` - -### Benefits of Migration - -1. **๐Ÿงน Cleaner Code**: Reduced complexity and better organization -2. **๐Ÿ”„ Reusability**: Components can be used independently -3. **๐Ÿงช Testability**: Each component can be tested in isolation -4. **๐Ÿ› ๏ธ Extensibility**: Easy to extend and customize components -5. **๐Ÿ“ˆ Maintainability**: Clear separation of concerns - ---- - -**Note**: All new components maintain full backward compatibility with existing configuration files and output formats. \ No newline at end of file diff --git a/docs/strategy_manager.md b/docs/strategy_manager.md index 0f3d7b3..8a326d8 100644 --- a/docs/strategy_manager.md +++ b/docs/strategy_manager.md @@ -4,48 +4,6 @@ The Strategy Manager is a sophisticated orchestration system that enables the combination of multiple trading strategies with configurable signal aggregation rules. It supports multi-timeframe analysis, weighted consensus voting, and flexible signal combination methods. -> **๐Ÿ—๏ธ New Architecture**: The Strategy Manager has been enhanced as part of the framework's modular refactoring. It now integrates seamlessly with the new `BacktestRunner`, `ConfigManager`, and `ResultsProcessor` components while maintaining full backward compatibility. - -## New Framework Integration - -### Modular Architecture Benefits - -The refactored framework provides several advantages for strategy management: - -- **๐Ÿ”ง Simplified Configuration**: Unified configuration through `ConfigManager` -- **โšก Enhanced Execution**: Streamlined execution via `BacktestRunner` -- **๐Ÿ“Š Better Processing**: Integrated results processing with `ResultsProcessor` -- **๐Ÿ”„ Improved Reusability**: Strategy manager can be used independently - -### Usage in New Framework - -```python -# Direct usage with new components -from cycles.utils import ConfigManager, BacktestRunner - -config_manager = ConfigManager("config.json") -runner = BacktestRunner() - -# Configuration is automatically prepared -strategy_config = config_manager.get_strategy_manager_config() -task_config = config_manager.get_timeframe_task_config("15min") - -# Execution is handled cleanly -results = runner.run_single_timeframe(data, "15min", task_config) -``` - -### Integration with BacktestApplication - -The Strategy Manager seamlessly integrates with the new `BacktestApplication`: - -```python -from cycles.application import BacktestApplication - -# Strategy manager is automatically created and managed -app = BacktestApplication("config.json") -app.run(debug=False) # Strategy manager handles all strategy coordination -``` - ## Architecture ### Core Components diff --git a/main.py b/main.py index 53eb089..3482029 100644 --- a/main.py +++ b/main.py @@ -1,99 +1,337 @@ -""" -Main entry point for the backtesting application. - -This module provides a clean command-line interface for running backtests -using the modular backtesting framework. -""" - +import pandas as pd +import numpy as np +import logging +import concurrent.futures +import os +import datetime import argparse -from pathlib import Path +import json -from cycles.application import BacktestApplication, setup_logging +from cycles.utils.storage import Storage +from cycles.utils.system import SystemUtils +from cycles.backtest import Backtest +from cycles.charts import BacktestCharts +from cycles.strategies import create_strategy_manager +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler("backtest.log"), + logging.StreamHandler() + ] +) -def parse_arguments(): - """Parse command line arguments.""" - parser = argparse.ArgumentParser( - description="Run cryptocurrency backtests with configurable strategies.", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - python main.py # Use default config - python main.py config_bbrs.json # Use specific config - python main.py --debug config_combined.json # Debug mode with plotting +def strategy_manager_init(backtester: Backtest): + """Strategy Manager initialization function""" + # This will be called by Backtest.__init__, but actual initialization + # happens in strategy_manager.initialize() + pass -Available configs: - - config_default.json: Default meta-trend strategy - - config_bbrs.json: BBRS strategy - - config_combined.json: Multi-strategy combination - """ +def strategy_manager_entry(backtester: Backtest, df_index: int): + """Strategy Manager entry function""" + return backtester.strategy_manager.get_entry_signal(backtester, df_index) + +def strategy_manager_exit(backtester: Backtest, df_index: int): + """Strategy Manager exit function""" + return backtester.strategy_manager.get_exit_signal(backtester, df_index) + +def process_timeframe_data(data_1min, timeframe, config, debug=False): + """Process a timeframe using Strategy Manager with configuration""" + + results_rows = [] + trade_rows = [] + + # Extract values from config + initial_usd = config['initial_usd'] + strategy_config = { + "strategies": config['strategies'], + "combination_rules": config['combination_rules'] + } + + # Create and initialize strategy manager + if not strategy_config: + logging.error("No strategy configuration provided") + return results_rows, trade_rows + + strategy_manager = create_strategy_manager(strategy_config) + + # Get the primary timeframe from the first strategy for backtester setup + primary_strategy = strategy_manager.strategies[0] + primary_timeframe = primary_strategy.get_timeframes()[0] + + # For BBRS strategy, it works with 1-minute data directly and handles internal resampling + # For other strategies, use their preferred timeframe + if primary_strategy.name == "bbrs": + # BBRS strategy processes 1-minute data and outputs signals on its internal timeframes + # Use 1-minute data for backtester working dataframe + working_df = data_1min.copy() + else: + # Other strategies specify their preferred timeframe + # Let the primary strategy resample the data to get the working dataframe + primary_strategy._resample_data(data_1min) + working_df = primary_strategy.get_primary_timeframe_data() + + # Prepare working dataframe for backtester (ensure timestamp column) + working_df_for_backtest = working_df.copy().reset_index() + if 'index' in working_df_for_backtest.columns: + working_df_for_backtest = working_df_for_backtest.rename(columns={'index': 'timestamp'}) + + # Initialize backtest with strategy manager initialization + backtester = Backtest(initial_usd, working_df_for_backtest, working_df_for_backtest, strategy_manager_init) + + # Store original min1_df for strategy processing + backtester.original_df = data_1min + + # Attach strategy manager to backtester and initialize + backtester.strategy_manager = strategy_manager + strategy_manager.initialize(backtester) + + # Run backtest with strategy manager functions + results = backtester.run( + strategy_manager_entry, + strategy_manager_exit, + debug ) + + n_trades = results["n_trades"] + trades = results.get('trades', []) + wins = [1 for t in trades if t['exit'] is not None and t['exit'] > t['entry']] + n_winning_trades = len(wins) + total_profit = sum(trade['profit_pct'] for trade in trades) + total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0) + win_rate = n_winning_trades / n_trades if n_trades > 0 else 0 + avg_trade = total_profit / n_trades if n_trades > 0 else 0 + profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf') + cumulative_profit = 0 + max_drawdown = 0 + peak = 0 + + for trade in trades: + cumulative_profit += trade['profit_pct'] + + if cumulative_profit > peak: + peak = cumulative_profit + drawdown = peak - cumulative_profit + + if drawdown > max_drawdown: + max_drawdown = drawdown + + final_usd = initial_usd + + for trade in trades: + final_usd *= (1 + trade['profit_pct']) + + total_fees_usd = sum(trade.get('fee_usd', 0.0) for trade in trades) + + # Get stop_loss_pct from the first strategy for reporting + # In multi-strategy setups, strategies can have different stop_loss_pct values + stop_loss_pct = primary_strategy.params.get("stop_loss_pct", "N/A") + + # Update row to include timeframe information + row = { + "timeframe": f"{timeframe}({primary_timeframe})", # Show actual timeframe used + "stop_loss_pct": stop_loss_pct, + "n_trades": n_trades, + "n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP_LOSS'), + "win_rate": win_rate, + "max_drawdown": max_drawdown, + "avg_trade": avg_trade, + "total_profit": total_profit, + "total_loss": total_loss, + "profit_ratio": profit_ratio, + "initial_usd": initial_usd, + "final_usd": final_usd, + "total_fees_usd": total_fees_usd, + } + results_rows.append(row) + + for trade in trades: + trade_rows.append({ + "timeframe": f"{timeframe}({primary_timeframe})", + "stop_loss_pct": stop_loss_pct, + "entry_time": trade.get("entry_time"), + "exit_time": trade.get("exit_time"), + "entry_price": trade.get("entry"), + "exit_price": trade.get("exit"), + "profit_pct": trade.get("profit_pct"), + "type": trade.get("type"), + "fee_usd": trade.get("fee_usd"), + }) - parser.add_argument( - "config", - type=str, - nargs="?", - help="Path to config JSON file (default: configs/config_default.json)" + # Log strategy summary + strategy_summary = strategy_manager.get_strategy_summary() + logging.info(f"Timeframe: {timeframe}({primary_timeframe}), Stop Loss: {stop_loss_pct}, " + f"Trades: {n_trades}, Strategies: {[s['name'] for s in strategy_summary['strategies']]}") + + if debug: + # Plot after each backtest run + try: + # Check if any strategy has processed_data for universal plotting + processed_data = None + for strategy in strategy_manager.strategies: + if hasattr(backtester, 'processed_data') and backtester.processed_data is not None: + processed_data = backtester.processed_data + break + + if processed_data is not None and not processed_data.empty: + # Format strategy data with actual executed trades for universal plotting + formatted_data = BacktestCharts.format_strategy_data_with_trades(processed_data, results) + # Plot using universal function + BacktestCharts.plot_data(formatted_data) + else: + # Fallback to meta_trend plot if available + if "meta_trend" in backtester.strategies: + meta_trend = backtester.strategies["meta_trend"] + # Use the working dataframe for plotting + BacktestCharts.plot(working_df, meta_trend) + else: + print("No plotting data available") + except Exception as e: + print(f"Plotting failed: {e}") + + return results_rows, trade_rows + +def process(timeframe_info, debug=False): + """Process a single timeframe with strategy config""" + timeframe, data_1min, config = timeframe_info + + # Pass the essential data and full config + results_rows, all_trade_rows = process_timeframe_data( + data_1min, timeframe, config, debug=debug ) - - parser.add_argument( - "--debug", - action="store_true", - help="Enable debug mode (sequential execution with plotting)" - ) - - return parser.parse_args() + return results_rows, all_trade_rows +def aggregate_results(all_rows): + """Aggregate results per stop_loss_pct and per rule (timeframe)""" + from collections import defaultdict -def validate_config_path(config_path: str) -> str: - """Validate and resolve configuration path.""" - if config_path is None: - return "configs/config_default.json" - - # If just filename provided, look in configs directory - if not "/" in config_path and not "\\" in config_path: - config_path = f"configs/{config_path}" - - # Validate file exists - if not Path(config_path).exists(): - available_configs = list(Path("configs").glob("*.json")) - available_names = [c.name for c in available_configs] - - raise FileNotFoundError( - f"Config file '{config_path}' not found.\n" - f"Available configs: {', '.join(available_names)}" - ) - - return config_path + grouped = defaultdict(list) + for row in all_rows: + key = (row['timeframe'], row['stop_loss_pct']) + grouped[key].append(row) + summary_rows = [] + for (rule, stop_loss_pct), rows in grouped.items(): + n_months = len(rows) + total_trades = sum(r['n_trades'] for r in rows) + total_stop_loss = sum(r['n_stop_loss'] for r in rows) + avg_win_rate = np.mean([r['win_rate'] for r in rows]) + avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows]) + avg_avg_trade = np.mean([r['avg_trade'] for r in rows]) + avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows]) -def main(): - """Main application entry point.""" - # Setup logging - setup_logging() - - # Parse arguments - args = parse_arguments() - - try: - # Validate configuration path - config_path = validate_config_path(args.config) - - # Create and run application - app = BacktestApplication(config_path) - app.run(debug=args.debug) - - except FileNotFoundError as e: - print(f"Error: {e}") - return 1 - except Exception as e: - print(f"Application failed: {e}") - return 1 - - return 0 + # Calculate final USD + final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows]) + total_fees_usd = np.mean([r.get('total_fees_usd') for r in rows]) + summary_rows.append({ + "timeframe": rule, + "stop_loss_pct": stop_loss_pct, + "n_trades": total_trades, + "n_stop_loss": total_stop_loss, + "win_rate": avg_win_rate, + "max_drawdown": avg_max_drawdown, + "avg_trade": avg_avg_trade, + "profit_ratio": avg_profit_ratio, + "initial_usd": initial_usd, + "final_usd": final_usd, + "total_fees_usd": total_fees_usd, + }) + return summary_rows + +def get_nearest_price(df, target_date): + if len(df) == 0: + return None, None + target_ts = pd.to_datetime(target_date) + nearest_idx = df.index.get_indexer([target_ts], method='nearest')[0] + nearest_time = df.index[nearest_idx] + price = df.iloc[nearest_idx]['close'] + return nearest_time, price if __name__ == "__main__": - exit(main()) + debug = True + + parser = argparse.ArgumentParser(description="Run backtest with config file.") + parser.add_argument("config", type=str, nargs="?", help="Path to config JSON file.") + args = parser.parse_args() + + # Use config_default.json as fallback if no config provided + config_file = args.config or "configs/config_default.json" + + try: + with open(config_file, 'r') as f: + config = json.load(f) + print(f"Using config: {config_file}") + except FileNotFoundError: + print(f"Error: Config file '{config_file}' not found.") + print("Available configs: configs/config_default.json, configs/config_bbrs.json, configs/config_combined.json") + exit(1) + except json.JSONDecodeError as e: + print(f"Error: Invalid JSON in config file '{config_file}': {e}") + exit(1) + + start_date = config['start_date'] + if config['stop_date'] is None: + stop_date = datetime.datetime.now().strftime("%Y-%m-%d") + else: + stop_date = config['stop_date'] + initial_usd = config['initial_usd'] + timeframes = config['timeframes'] + + timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M") + + storage = Storage(logging=logging) + system_utils = SystemUtils(logging=logging) + + data_1min = storage.load_data('btcusd_1-min_data.csv', start_date, stop_date) + + nearest_start_time, start_price = get_nearest_price(data_1min, start_date) + nearest_stop_time, stop_price = get_nearest_price(data_1min, stop_date) + + metadata_lines = [ + f"Start date\t{start_date}\tPrice\t{start_price}", + f"Stop date\t{stop_date}\tPrice\t{stop_price}", + f"Initial USD\t{initial_usd}" + ] + + # Create tasks for each timeframe + tasks = [ + (name, data_1min, config) + for name in timeframes + ] + + if debug: + all_results_rows = [] + all_trade_rows = [] + for task in tasks: + results, trades = process(task, debug) + if results or trades: + all_results_rows.extend(results) + all_trade_rows.extend(trades) + else: + workers = system_utils.get_optimal_workers() + + with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: + futures = {executor.submit(process, task, debug): task for task in tasks} + all_results_rows = [] + all_trade_rows = [] + + for future in concurrent.futures.as_completed(futures): + results, trades = future.result() + + if results or trades: + all_results_rows.extend(results) + all_trade_rows.extend(trades) + + backtest_filename = os.path.join(f"{timestamp}_backtest.csv") + backtest_fieldnames = [ + "timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", + "max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd" + ] + storage.write_backtest_results(backtest_filename, backtest_fieldnames, all_results_rows, metadata_lines) + + trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"] + storage.write_trades(all_trade_rows, trades_fieldnames) \ No newline at end of file diff --git a/test_bbrsi.py b/test_bbrsi.py index fc1f50b..7c43dba 100644 --- a/test_bbrsi.py +++ b/test_bbrsi.py @@ -5,7 +5,7 @@ import pandas as pd import datetime from cycles.utils.storage import Storage -from cycles.Analysis.bb_rsi import BollingerBandsStrategy +from cycles.Analysis.strategies import Strategy logging.basicConfig( level=logging.INFO, @@ -47,7 +47,7 @@ if __name__ == "__main__": data = storage.load_data(config["data_file"], config["start_date"], config["stop_date"]) # Run strategy - strategy = BollingerBandsStrategy(config=config_strategy, logging=logging) + strategy = Strategy(config=config_strategy, logging=logging) processed_data = strategy.run(data.copy(), config_strategy["strategy_name"]) # Get buy and sell signals