From fc7e8e9f8a72e32e0be68ba1d82091d672f6a534 Mon Sep 17 00:00:00 2001 From: Ajasra Date: Thu, 29 May 2025 14:45:11 +0800 Subject: [PATCH] plot optimisation to reduce points --- test/backtest/strategy_run.py | 155 +++++++++++----------------------- 1 file changed, 49 insertions(+), 106 deletions(-) diff --git a/test/backtest/strategy_run.py b/test/backtest/strategy_run.py index 21fd992..9a7f12c 100644 --- a/test/backtest/strategy_run.py +++ b/test/backtest/strategy_run.py @@ -259,6 +259,41 @@ class StrategyRunner: logger.error(f"Error loading market data: {e}") return pd.DataFrame() + def aggregate_market_data_for_plotting(self, df: pd.DataFrame, max_points: int = 2000) -> pd.DataFrame: + """ + Aggregate market data to reduce the number of points for plotting. + + Args: + df: Full market data DataFrame + max_points: Maximum number of points to keep for plotting + + Returns: + Aggregated DataFrame suitable for plotting + """ + if df.empty or len(df) <= max_points: + return df + + try: + # Calculate step size to get approximately max_points + step = len(df) // max_points + + # Sample every nth row to reduce data points + aggregated_df = df.iloc[::step].copy() + + # Always include the first and last points + if len(aggregated_df) > 0: + if aggregated_df.index[0] != df.index[0]: + aggregated_df = pd.concat([df.iloc[[0]], aggregated_df]) + if aggregated_df.index[-1] != df.index[-1]: + aggregated_df = pd.concat([aggregated_df, df.iloc[[-1]]]) + + logger.info(f"Market data aggregated: {len(df)} → {len(aggregated_df)} points for plotting") + return aggregated_df.sort_values('timestamp') + + except Exception as e: + logger.warning(f"Error aggregating market data: {e}, using original data") + return df + def create_strategy_plot(self, result: Dict[str, Any], save_path: str) -> None: """ Create and save a comprehensive plot for a strategy's performance. @@ -486,8 +521,11 @@ Period: {result['backtest_period']} # 2. Full Market Price Chart with Entry/Exit Points if self.market_data is not None and not self.market_data.empty: + # Aggregate market data for plotting performance + plot_market_data = self.aggregate_market_data_for_plotting(self.market_data) + # Plot full market price data - ax2.plot(self.market_data['timestamp'], self.market_data['close'], + ax2.plot(plot_market_data['timestamp'], plot_market_data['close'], linewidth=1.5, color='black', alpha=0.7, label='Market Price') # Add entry points (green circles) @@ -505,7 +543,7 @@ Period: {result['backtest_period']} ax2.grid(True, alpha=0.3) ax2.legend() - if len(self.market_data) > 100: + if len(plot_market_data) > 100: ax2.tick_params(axis='x', rotation=45) else: # Fallback to signal-only price data @@ -541,12 +579,15 @@ Period: {result['backtest_period']} # 3. Combined View: Price and Portfolio Performance if self.market_data is not None and not self.market_data.empty and portfolio_times: + # Use the same aggregated data for consistency + plot_market_data = self.aggregate_market_data_for_plotting(self.market_data) + # Create dual y-axis plot ax3_price = ax3 ax3_portfolio = ax3.twinx() # Plot price on left axis - line1 = ax3_price.plot(self.market_data['timestamp'], self.market_data['close'], + line1 = ax3_price.plot(plot_market_data['timestamp'], plot_market_data['close'], linewidth=1.5, color='black', alpha=0.7, label='Market Price') ax3_price.set_ylabel('Market Price ($)', color='black') ax3_price.tick_params(axis='y', labelcolor='black') @@ -574,7 +615,7 @@ Period: {result['backtest_period']} lines2, labels2 = ax3_portfolio.get_legend_handles_labels() ax3_price.legend(lines1 + lines2, labels1 + labels2, loc='upper left') - if len(self.market_data) > 100: + if len(plot_market_data) > 100: ax3_price.tick_params(axis='x', rotation=45) else: ax3.text(0.5, 0.5, 'No data available for combined view', @@ -611,50 +652,30 @@ Period: {result['backtest_period']} # Show progress for file saving if tqdm is available if TQDM_AVAILABLE: - file_ops = ["JSON", "Plot", "Detailed Plot", "Trades CSV", "Signals CSV"] - save_progress = tqdm(file_ops, desc=f"💾 Saving {strategy_name[:15]}", - leave=False, ncols=80, position=1) - else: - save_progress = None + logger.info(f"💾 Saving files for {strategy_name}...") # Save JSON result - if save_progress: - save_progress.set_description(f"💾 Saving JSON") json_path = os.path.join(self.results_dir, f"{base_filename}.json") with open(json_path, 'w') as f: json.dump(result, f, indent=2, default=str) logger.info(f"📄 Individual strategy result saved: {json_path}") - if save_progress: - save_progress.update(1) # Save plot if strategy was successful if result['success'] and PLOTTING_AVAILABLE: - if save_progress: - save_progress.set_description(f"💾 Saving plot") plot_path = os.path.join(self.results_dir, f"{base_filename}_plot.png") self.create_strategy_plot(result, plot_path) - if save_progress: - save_progress.update(1) # Save detailed plot with portfolio and signals if result['success'] and PLOTTING_AVAILABLE: - if save_progress: - save_progress.set_description(f"💾 Saving detailed plot") detailed_plot_path = os.path.join(self.results_dir, f"{base_filename}_detailed_plot.png") self.create_detailed_strategy_plot(result, detailed_plot_path) - if save_progress: - save_progress.update(1) # Save trades CSV if available if result['success'] and result.get('trades'): - if save_progress: - save_progress.set_description(f"💾 Saving trades CSV") trades_df = pd.DataFrame(result['trades']) trades_csv_path = os.path.join(self.results_dir, f"{base_filename}_trades.csv") trades_df.to_csv(trades_csv_path, index=False) logger.info(f"📊 Trades data saved: {trades_csv_path}") - if save_progress: - save_progress.update(1) # Save signals data signals_data = [] @@ -686,18 +707,10 @@ Period: {result['backtest_period']} }) if signals_data: - if save_progress: - save_progress.set_description(f"💾 Saving signals CSV") signals_df = pd.DataFrame(signals_data) signals_csv_path = os.path.join(self.results_dir, f"{base_filename}_signals.csv") signals_df.to_csv(signals_csv_path, index=False) logger.info(f"📡 Signals data saved: {signals_csv_path}") - if save_progress: - save_progress.update(1) - - # Close progress bar - if save_progress: - save_progress.close() except Exception as e: logger.error(f"Error saving individual strategy results for {result['strategy_name']}: {e}") @@ -855,79 +868,9 @@ Period: {result['backtest_period']} # Create a custom backtester wrapper with progress tracking if TQDM_AVAILABLE: - # Get estimated data points for progress tracking - try: - # Load a small sample to estimate total rows - sample_path = os.path.join(data_dir, data_file) - total_lines = sum(1 for _ in open(sample_path)) - 1 # Subtract header - - # Estimate rows for the date range - from datetime import datetime - start_dt = datetime.strptime(start_date, "%Y-%m-%d") - end_dt = datetime.strptime(end_date, "%Y-%m-%d") - days_in_range = (end_dt - start_dt).days + 1 - - # Rough estimate: assume 1440 minutes per day for 1-minute data - estimated_rows = days_in_range * 1440 - estimated_rows = min(estimated_rows, total_lines) # Cap at actual file size - - strategy_progress = tqdm(total=estimated_rows, - desc=f"⚡ Strategy {strategy_index}/{total_strategies}: {strategy_name[:25]}", - leave=False, ncols=120, position=1, - unit="rows", unit_scale=True, - bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") - - # Since we can't directly hook into backtester progress, we'll simulate based on time - import threading - import time as time_module - - backtest_complete = threading.Event() - results_container = {} - - def run_backtest_thread(): - try: - results_container['results'] = backtester.run_single_strategy(strategy, trader_params) - results_container['success'] = True - except Exception as e: - results_container['error'] = e - results_container['success'] = False - finally: - backtest_complete.set() - - # Start backtest - backtest_thread = threading.Thread(target=run_backtest_thread) - backtest_thread.start() - - # Update progress based on time (rough estimation) - rows_processed = 0 - update_interval = max(1, estimated_rows // 100) # Update every 1% of data - - while not backtest_complete.is_set(): - time_module.sleep(0.5) - if rows_processed < estimated_rows * 0.95: # Don't go past 95% until done - rows_processed += update_interval - strategy_progress.update(update_interval) - - # Complete the progress bar - backtest_thread.join() - remaining = estimated_rows - rows_processed - if remaining > 0: - strategy_progress.update(remaining) - - strategy_progress.close() - - # Check results - if not results_container.get('success', False): - raise results_container.get('error', Exception("Backtest failed")) - - results = results_container['results'] - - except Exception as e: - if 'strategy_progress' in locals(): - strategy_progress.close() - # Fall back to running without progress - logger.warning(f"Progress tracking failed, running without progress bar: {e}") - results = backtester.run_single_strategy(strategy, trader_params) + # Simple progress indication without threading + logger.info(f"⚡ Running Strategy {strategy_index}/{total_strategies}: {strategy_name}") + results = backtester.run_single_strategy(strategy, trader_params) else: # Run without progress tracking results = backtester.run_single_strategy(strategy, trader_params)