Cycles/result_processor.py

446 lines
18 KiB
Python

import pandas as pd
import numpy as np
import os
import csv
import logging
from typing import List, Dict, Any, Optional, Tuple
from collections import defaultdict
from cycles.utils.storage import Storage
class ResultProcessor:
"""Handles processing, aggregation, and saving of backtest results"""
def __init__(self, storage: Storage, logging_instance: Optional[logging.Logger] = None):
"""
Initialize result processor
Args:
storage: Storage instance for file operations
logging_instance: Optional logging instance
"""
self.storage = storage
self.logging = logging_instance
def process_timeframe_results(
self,
min1_df: pd.DataFrame,
df: pd.DataFrame,
stop_loss_pcts: List[float],
timeframe_name: str,
initial_usd: float,
progress_callback=None
) -> Tuple[List[Dict], List[Dict]]:
"""
Process results for a single timeframe with multiple stop loss values
Args:
min1_df: 1-minute data DataFrame
df: Resampled timeframe DataFrame
stop_loss_pcts: List of stop loss percentages to test
timeframe_name: Name of the timeframe (e.g., '1D', '6h')
initial_usd: Initial USD amount
progress_callback: Optional progress callback function
Returns:
Tuple of (results_rows, trade_rows)
"""
from cycles.backtest import Backtest
df = df.copy().reset_index(drop=True)
results_rows = []
trade_rows = []
for stop_loss_pct in stop_loss_pcts:
try:
results = Backtest.run(
min1_df,
df,
initial_usd=initial_usd,
stop_loss_pct=stop_loss_pct,
progress_callback=progress_callback,
verbose=False # Default to False for production runs
)
# Calculate metrics
metrics = self._calculate_metrics(results, initial_usd, stop_loss_pct, timeframe_name)
results_rows.append(metrics)
# Process trades
if 'trades' not in results:
raise ValueError(f"Backtest results missing 'trades' field for {timeframe_name} with {stop_loss_pct} stop loss")
trades = self._process_trades(results['trades'], timeframe_name, stop_loss_pct)
trade_rows.extend(trades)
if self.logging:
self.logging.info(f"Timeframe: {timeframe_name}, Stop Loss: {stop_loss_pct}, Trades: {results['n_trades']}")
except Exception as e:
error_msg = f"Error processing {timeframe_name} with stop loss {stop_loss_pct}: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
return results_rows, trade_rows
def _calculate_metrics(
self,
results: Dict[str, Any],
initial_usd: float,
stop_loss_pct: float,
timeframe_name: str
) -> Dict[str, Any]:
"""Calculate performance metrics from backtest results"""
if 'trades' not in results:
raise ValueError(f"Backtest results missing 'trades' field for {timeframe_name} with {stop_loss_pct} stop loss")
trades = results['trades']
n_trades = results["n_trades"]
# Validate that all required fields are present
required_fields = ['final_usd', 'max_drawdown', 'total_fees_usd', 'n_trades', 'n_stop_loss', 'win_rate', 'avg_trade']
missing_fields = [field for field in required_fields if field not in results]
if missing_fields:
raise ValueError(f"Backtest results missing required fields: {missing_fields}")
# Calculate win metrics - validate trade fields
winning_trades = []
for t in trades:
if 'exit' not in t:
raise ValueError(f"Trade missing 'exit' field: {t}")
if 'entry' not in t:
raise ValueError(f"Trade missing 'entry' field: {t}")
if t['exit'] is not None and t['exit'] > t['entry']:
winning_trades.append(t)
n_winning_trades = len(winning_trades)
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
# Calculate profit metrics
total_profit = sum(trade['profit_pct'] for trade in trades if trade['profit_pct'] > 0)
total_loss = abs(sum(trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0))
avg_trade = sum(trade['profit_pct'] for trade in trades) / n_trades if n_trades > 0 else 0
profit_ratio = total_profit / total_loss if total_loss > 0 else (float('inf') if total_profit > 0 else 0)
# Get values directly from backtest results (no defaults)
max_drawdown = results['max_drawdown']
final_usd = results['final_usd']
total_fees_usd = results['total_fees_usd']
n_stop_loss = results['n_stop_loss'] # Get stop loss count directly from backtest
# Validate no None values
if max_drawdown is None:
raise ValueError(f"max_drawdown is None for {timeframe_name} with {stop_loss_pct} stop loss")
if final_usd is None:
raise ValueError(f"final_usd is None for {timeframe_name} with {stop_loss_pct} stop loss")
if total_fees_usd is None:
raise ValueError(f"total_fees_usd is None for {timeframe_name} with {stop_loss_pct} stop loss")
if n_stop_loss is None:
raise ValueError(f"n_stop_loss is None for {timeframe_name} with {stop_loss_pct} stop loss")
return {
"timeframe": timeframe_name,
"stop_loss_pct": stop_loss_pct,
"n_trades": n_trades,
"n_stop_loss": n_stop_loss,
"win_rate": win_rate,
"max_drawdown": max_drawdown,
"avg_trade": avg_trade,
"total_profit": total_profit,
"total_loss": total_loss,
"profit_ratio": profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
}
def _calculate_max_drawdown(self, trades: List[Dict]) -> float:
"""Calculate maximum drawdown from trade sequence"""
cumulative_profit = 0
max_drawdown = 0
peak = 0
for trade in trades:
cumulative_profit += trade['profit_pct']
if cumulative_profit > peak:
peak = cumulative_profit
drawdown = peak - cumulative_profit
if drawdown > max_drawdown:
max_drawdown = drawdown
return max_drawdown
def _process_trades(
self,
trades: List[Dict],
timeframe_name: str,
stop_loss_pct: float
) -> List[Dict]:
"""Process individual trades with metadata"""
processed_trades = []
for trade in trades:
# Validate all required trade fields
required_fields = ["entry_time", "exit_time", "entry", "exit", "profit_pct", "type", "fee_usd"]
missing_fields = [field for field in required_fields if field not in trade]
if missing_fields:
raise ValueError(f"Trade missing required fields: {missing_fields} in trade: {trade}")
processed_trade = {
"timeframe": timeframe_name,
"stop_loss_pct": stop_loss_pct,
"entry_time": trade["entry_time"],
"exit_time": trade["exit_time"],
"entry_price": trade["entry"],
"exit_price": trade["exit"],
"profit_pct": trade["profit_pct"],
"type": trade["type"],
"fee_usd": trade["fee_usd"],
}
processed_trades.append(processed_trade)
return processed_trades
def _debug_output(self, results: Dict[str, Any]) -> None:
"""Output debug information for backtest results"""
if 'trades' not in results:
raise ValueError("Backtest results missing 'trades' field for debug output")
trades = results['trades']
# Print stop loss trades
stop_loss_trades = []
for t in trades:
if 'type' not in t:
raise ValueError(f"Trade missing 'type' field: {t}")
if t['type'] == 'STOP':
stop_loss_trades.append(t)
if stop_loss_trades:
print("Stop Loss Trades:")
for trade in stop_loss_trades:
print(trade)
# Print large loss trades
large_loss_trades = []
for t in trades:
if 'profit_pct' not in t:
raise ValueError(f"Trade missing 'profit_pct' field: {t}")
if t['profit_pct'] < -0.09:
large_loss_trades.append(t)
if large_loss_trades:
print("Large Loss Trades:")
for trade in large_loss_trades:
print("Large loss trade:", trade)
def aggregate_results(self, all_results: List[Dict]) -> List[Dict]:
"""
Aggregate results per stop_loss_pct and timeframe
Args:
all_results: List of result dictionaries from all timeframes
Returns:
List of aggregated summary rows
"""
grouped = defaultdict(list)
for row in all_results:
key = (row['timeframe'], row['stop_loss_pct'])
grouped[key].append(row)
summary_rows = []
for (timeframe, stop_loss_pct), rows in grouped.items():
summary = self._aggregate_group(rows, timeframe, stop_loss_pct)
summary_rows.append(summary)
return summary_rows
def _aggregate_group(self, rows: List[Dict], timeframe: str, stop_loss_pct: float) -> Dict:
"""Aggregate a group of rows with the same timeframe and stop loss"""
if not rows:
raise ValueError(f"No rows to aggregate for {timeframe} with {stop_loss_pct} stop loss")
# Validate all rows have required fields
required_fields = ['n_trades', 'n_stop_loss', 'win_rate', 'max_drawdown', 'avg_trade', 'profit_ratio', 'final_usd', 'total_fees_usd', 'initial_usd']
for i, row in enumerate(rows):
missing_fields = [field for field in required_fields if field not in row]
if missing_fields:
raise ValueError(f"Row {i} missing required fields: {missing_fields}")
total_trades = sum(r['n_trades'] for r in rows)
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
# Calculate averages (no defaults, expect all values to be present)
avg_win_rate = np.mean([r['win_rate'] for r in rows])
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
# Handle infinite profit ratios properly
finite_profit_ratios = [r['profit_ratio'] for r in rows if not np.isinf(r['profit_ratio'])]
avg_profit_ratio = np.mean(finite_profit_ratios) if finite_profit_ratios else 0
# Calculate final USD and fees (no defaults)
final_usd = np.mean([r['final_usd'] for r in rows])
total_fees_usd = np.mean([r['total_fees_usd'] for r in rows])
initial_usd = rows[0]['initial_usd']
return {
"timeframe": timeframe,
"stop_loss_pct": stop_loss_pct,
"n_trades": total_trades,
"n_stop_loss": total_stop_loss,
"win_rate": avg_win_rate,
"max_drawdown": avg_max_drawdown,
"avg_trade": avg_avg_trade,
"profit_ratio": avg_profit_ratio,
"initial_usd": initial_usd,
"final_usd": final_usd,
"total_fees_usd": total_fees_usd,
}
def save_trade_file(self, trades: List[Dict], timeframe: str, stop_loss_pct: float) -> None:
"""
Save individual trade file with summary header
Args:
trades: List of trades for this combination
timeframe: Timeframe name
stop_loss_pct: Stop loss percentage
"""
if not trades:
return
try:
# Generate filename
sl_percent = int(round(stop_loss_pct * 100))
trades_filename = os.path.join(self.storage.results_dir, f"trades_{timeframe}_ST{sl_percent}pct.csv")
# Prepare summary from first trade
sample_trade = trades[0]
summary_fields = ["timeframe", "stop_loss_pct", "n_trades", "win_rate"]
summary_values = [timeframe, stop_loss_pct, len(trades), "calculated_elsewhere"]
# Write file with header and trades
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
with open(trades_filename, "w", newline="") as f:
# Write summary header
f.write("\t".join(summary_fields) + "\n")
f.write("\t".join(str(v) for v in summary_values) + "\n")
# Write trades
writer = csv.DictWriter(f, fieldnames=trades_fieldnames)
writer.writeheader()
for trade in trades:
# Validate all required fields are present
missing_fields = [k for k in trades_fieldnames if k not in trade]
if missing_fields:
raise ValueError(f"Trade missing required fields for CSV: {missing_fields} in trade: {trade}")
writer.writerow({k: trade[k] for k in trades_fieldnames})
if self.logging:
self.logging.info(f"Trades saved to {trades_filename}")
except Exception as e:
error_msg = f"Failed to save trades file for {timeframe}_ST{int(round(stop_loss_pct * 100))}pct: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
def save_backtest_results(
self,
results: List[Dict],
metadata_lines: List[str],
timestamp: str
) -> str:
"""
Save aggregated backtest results to CSV file
Args:
results: List of aggregated result dictionaries
metadata_lines: List of metadata strings
timestamp: Timestamp for filename
Returns:
Path to saved file
"""
try:
filename = f"{timestamp}_backtest.csv"
fieldnames = [
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
"max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd"
]
filepath = self.storage.write_backtest_results(filename, fieldnames, results, metadata_lines)
if self.logging:
self.logging.info(f"Backtest results saved to {filepath}")
return filepath
except Exception as e:
error_msg = f"Failed to save backtest results: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e
def get_price_info(self, data_df: pd.DataFrame, date: str) -> Tuple[Optional[str], Optional[float]]:
"""
Get nearest price information for a given date
Args:
data_df: DataFrame with price data
date: Target date string
Returns:
Tuple of (nearest_time, price) or (None, None) if no data
"""
try:
if len(data_df) == 0:
return None, None
target_ts = pd.to_datetime(date)
nearest_idx = data_df.index.get_indexer([target_ts], method='nearest')[0]
nearest_time = data_df.index[nearest_idx]
price = data_df.iloc[nearest_idx]['close']
return str(nearest_time), float(price)
except Exception as e:
if self.logging:
self.logging.warning(f"Could not get price info for {date}: {e}")
return None, None
def save_all_trade_files(self, all_trades: List[Dict]) -> None:
"""
Save all trade files in batch after parallel execution completes
Args:
all_trades: List of all trades from all tasks
"""
if not all_trades:
return
try:
# Group trades by timeframe and stop loss
trade_groups = {}
for trade in all_trades:
timeframe = trade.get('timeframe')
stop_loss_pct = trade.get('stop_loss_pct')
if timeframe and stop_loss_pct is not None:
key = (timeframe, stop_loss_pct)
if key not in trade_groups:
trade_groups[key] = []
trade_groups[key].append(trade)
# Save each group
for (timeframe, stop_loss_pct), trades in trade_groups.items():
self.save_trade_file(trades, timeframe, stop_loss_pct)
if self.logging:
self.logging.info(f"Saved {len(trade_groups)} trade files in batch")
except Exception as e:
error_msg = f"Failed to save trade files in batch: {e}"
if self.logging:
self.logging.error(error_msg)
raise RuntimeError(error_msg) from e