446 lines
18 KiB
Python
446 lines
18 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import os
|
|
import csv
|
|
import logging
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from collections import defaultdict
|
|
|
|
from cycles.utils.storage import Storage
|
|
|
|
|
|
class ResultProcessor:
|
|
"""Handles processing, aggregation, and saving of backtest results"""
|
|
|
|
def __init__(self, storage: Storage, logging_instance: Optional[logging.Logger] = None):
|
|
"""
|
|
Initialize result processor
|
|
|
|
Args:
|
|
storage: Storage instance for file operations
|
|
logging_instance: Optional logging instance
|
|
"""
|
|
self.storage = storage
|
|
self.logging = logging_instance
|
|
|
|
def process_timeframe_results(
|
|
self,
|
|
min1_df: pd.DataFrame,
|
|
df: pd.DataFrame,
|
|
stop_loss_pcts: List[float],
|
|
timeframe_name: str,
|
|
initial_usd: float,
|
|
progress_callback=None
|
|
) -> Tuple[List[Dict], List[Dict]]:
|
|
"""
|
|
Process results for a single timeframe with multiple stop loss values
|
|
|
|
Args:
|
|
min1_df: 1-minute data DataFrame
|
|
df: Resampled timeframe DataFrame
|
|
stop_loss_pcts: List of stop loss percentages to test
|
|
timeframe_name: Name of the timeframe (e.g., '1D', '6h')
|
|
initial_usd: Initial USD amount
|
|
progress_callback: Optional progress callback function
|
|
|
|
Returns:
|
|
Tuple of (results_rows, trade_rows)
|
|
"""
|
|
from cycles.backtest import Backtest
|
|
|
|
df = df.copy().reset_index(drop=True)
|
|
results_rows = []
|
|
trade_rows = []
|
|
|
|
for stop_loss_pct in stop_loss_pcts:
|
|
try:
|
|
results = Backtest.run(
|
|
min1_df,
|
|
df,
|
|
initial_usd=initial_usd,
|
|
stop_loss_pct=stop_loss_pct,
|
|
progress_callback=progress_callback,
|
|
verbose=False # Default to False for production runs
|
|
)
|
|
|
|
# Calculate metrics
|
|
metrics = self._calculate_metrics(results, initial_usd, stop_loss_pct, timeframe_name)
|
|
results_rows.append(metrics)
|
|
|
|
# Process trades
|
|
if 'trades' not in results:
|
|
raise ValueError(f"Backtest results missing 'trades' field for {timeframe_name} with {stop_loss_pct} stop loss")
|
|
trades = self._process_trades(results['trades'], timeframe_name, stop_loss_pct)
|
|
trade_rows.extend(trades)
|
|
|
|
if self.logging:
|
|
self.logging.info(f"Timeframe: {timeframe_name}, Stop Loss: {stop_loss_pct}, Trades: {results['n_trades']}")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error processing {timeframe_name} with stop loss {stop_loss_pct}: {e}"
|
|
if self.logging:
|
|
self.logging.error(error_msg)
|
|
raise RuntimeError(error_msg) from e
|
|
|
|
return results_rows, trade_rows
|
|
|
|
def _calculate_metrics(
|
|
self,
|
|
results: Dict[str, Any],
|
|
initial_usd: float,
|
|
stop_loss_pct: float,
|
|
timeframe_name: str
|
|
) -> Dict[str, Any]:
|
|
"""Calculate performance metrics from backtest results"""
|
|
if 'trades' not in results:
|
|
raise ValueError(f"Backtest results missing 'trades' field for {timeframe_name} with {stop_loss_pct} stop loss")
|
|
trades = results['trades']
|
|
n_trades = results["n_trades"]
|
|
|
|
# Validate that all required fields are present
|
|
required_fields = ['final_usd', 'max_drawdown', 'total_fees_usd', 'n_trades', 'n_stop_loss', 'win_rate', 'avg_trade']
|
|
missing_fields = [field for field in required_fields if field not in results]
|
|
if missing_fields:
|
|
raise ValueError(f"Backtest results missing required fields: {missing_fields}")
|
|
|
|
# Calculate win metrics - validate trade fields
|
|
winning_trades = []
|
|
for t in trades:
|
|
if 'exit' not in t:
|
|
raise ValueError(f"Trade missing 'exit' field: {t}")
|
|
if 'entry' not in t:
|
|
raise ValueError(f"Trade missing 'entry' field: {t}")
|
|
if t['exit'] is not None and t['exit'] > t['entry']:
|
|
winning_trades.append(t)
|
|
n_winning_trades = len(winning_trades)
|
|
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
|
|
|
|
# Calculate profit metrics
|
|
total_profit = sum(trade['profit_pct'] for trade in trades if trade['profit_pct'] > 0)
|
|
total_loss = abs(sum(trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0))
|
|
avg_trade = sum(trade['profit_pct'] for trade in trades) / n_trades if n_trades > 0 else 0
|
|
profit_ratio = total_profit / total_loss if total_loss > 0 else (float('inf') if total_profit > 0 else 0)
|
|
|
|
# Get values directly from backtest results (no defaults)
|
|
max_drawdown = results['max_drawdown']
|
|
final_usd = results['final_usd']
|
|
total_fees_usd = results['total_fees_usd']
|
|
n_stop_loss = results['n_stop_loss'] # Get stop loss count directly from backtest
|
|
|
|
# Validate no None values
|
|
if max_drawdown is None:
|
|
raise ValueError(f"max_drawdown is None for {timeframe_name} with {stop_loss_pct} stop loss")
|
|
if final_usd is None:
|
|
raise ValueError(f"final_usd is None for {timeframe_name} with {stop_loss_pct} stop loss")
|
|
if total_fees_usd is None:
|
|
raise ValueError(f"total_fees_usd is None for {timeframe_name} with {stop_loss_pct} stop loss")
|
|
if n_stop_loss is None:
|
|
raise ValueError(f"n_stop_loss is None for {timeframe_name} with {stop_loss_pct} stop loss")
|
|
|
|
return {
|
|
"timeframe": timeframe_name,
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"n_trades": n_trades,
|
|
"n_stop_loss": n_stop_loss,
|
|
"win_rate": win_rate,
|
|
"max_drawdown": max_drawdown,
|
|
"avg_trade": avg_trade,
|
|
"total_profit": total_profit,
|
|
"total_loss": total_loss,
|
|
"profit_ratio": profit_ratio,
|
|
"initial_usd": initial_usd,
|
|
"final_usd": final_usd,
|
|
"total_fees_usd": total_fees_usd,
|
|
}
|
|
|
|
def _calculate_max_drawdown(self, trades: List[Dict]) -> float:
|
|
"""Calculate maximum drawdown from trade sequence"""
|
|
cumulative_profit = 0
|
|
max_drawdown = 0
|
|
peak = 0
|
|
|
|
for trade in trades:
|
|
cumulative_profit += trade['profit_pct']
|
|
if cumulative_profit > peak:
|
|
peak = cumulative_profit
|
|
drawdown = peak - cumulative_profit
|
|
if drawdown > max_drawdown:
|
|
max_drawdown = drawdown
|
|
|
|
return max_drawdown
|
|
|
|
def _process_trades(
|
|
self,
|
|
trades: List[Dict],
|
|
timeframe_name: str,
|
|
stop_loss_pct: float
|
|
) -> List[Dict]:
|
|
"""Process individual trades with metadata"""
|
|
processed_trades = []
|
|
|
|
for trade in trades:
|
|
# Validate all required trade fields
|
|
required_fields = ["entry_time", "exit_time", "entry", "exit", "profit_pct", "type", "fee_usd"]
|
|
missing_fields = [field for field in required_fields if field not in trade]
|
|
if missing_fields:
|
|
raise ValueError(f"Trade missing required fields: {missing_fields} in trade: {trade}")
|
|
|
|
processed_trade = {
|
|
"timeframe": timeframe_name,
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"entry_time": trade["entry_time"],
|
|
"exit_time": trade["exit_time"],
|
|
"entry_price": trade["entry"],
|
|
"exit_price": trade["exit"],
|
|
"profit_pct": trade["profit_pct"],
|
|
"type": trade["type"],
|
|
"fee_usd": trade["fee_usd"],
|
|
}
|
|
processed_trades.append(processed_trade)
|
|
|
|
return processed_trades
|
|
|
|
def _debug_output(self, results: Dict[str, Any]) -> None:
|
|
"""Output debug information for backtest results"""
|
|
if 'trades' not in results:
|
|
raise ValueError("Backtest results missing 'trades' field for debug output")
|
|
trades = results['trades']
|
|
|
|
# Print stop loss trades
|
|
stop_loss_trades = []
|
|
for t in trades:
|
|
if 'type' not in t:
|
|
raise ValueError(f"Trade missing 'type' field: {t}")
|
|
if t['type'] == 'STOP':
|
|
stop_loss_trades.append(t)
|
|
|
|
if stop_loss_trades:
|
|
print("Stop Loss Trades:")
|
|
for trade in stop_loss_trades:
|
|
print(trade)
|
|
|
|
# Print large loss trades
|
|
large_loss_trades = []
|
|
for t in trades:
|
|
if 'profit_pct' not in t:
|
|
raise ValueError(f"Trade missing 'profit_pct' field: {t}")
|
|
if t['profit_pct'] < -0.09:
|
|
large_loss_trades.append(t)
|
|
|
|
if large_loss_trades:
|
|
print("Large Loss Trades:")
|
|
for trade in large_loss_trades:
|
|
print("Large loss trade:", trade)
|
|
|
|
def aggregate_results(self, all_results: List[Dict]) -> List[Dict]:
|
|
"""
|
|
Aggregate results per stop_loss_pct and timeframe
|
|
|
|
Args:
|
|
all_results: List of result dictionaries from all timeframes
|
|
|
|
Returns:
|
|
List of aggregated summary rows
|
|
"""
|
|
grouped = defaultdict(list)
|
|
for row in all_results:
|
|
key = (row['timeframe'], row['stop_loss_pct'])
|
|
grouped[key].append(row)
|
|
|
|
summary_rows = []
|
|
for (timeframe, stop_loss_pct), rows in grouped.items():
|
|
summary = self._aggregate_group(rows, timeframe, stop_loss_pct)
|
|
summary_rows.append(summary)
|
|
|
|
return summary_rows
|
|
|
|
def _aggregate_group(self, rows: List[Dict], timeframe: str, stop_loss_pct: float) -> Dict:
|
|
"""Aggregate a group of rows with the same timeframe and stop loss"""
|
|
if not rows:
|
|
raise ValueError(f"No rows to aggregate for {timeframe} with {stop_loss_pct} stop loss")
|
|
|
|
# Validate all rows have required fields
|
|
required_fields = ['n_trades', 'n_stop_loss', 'win_rate', 'max_drawdown', 'avg_trade', 'profit_ratio', 'final_usd', 'total_fees_usd', 'initial_usd']
|
|
for i, row in enumerate(rows):
|
|
missing_fields = [field for field in required_fields if field not in row]
|
|
if missing_fields:
|
|
raise ValueError(f"Row {i} missing required fields: {missing_fields}")
|
|
|
|
total_trades = sum(r['n_trades'] for r in rows)
|
|
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
|
|
|
|
# Calculate averages (no defaults, expect all values to be present)
|
|
avg_win_rate = np.mean([r['win_rate'] for r in rows])
|
|
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
|
|
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
|
|
|
|
# Handle infinite profit ratios properly
|
|
finite_profit_ratios = [r['profit_ratio'] for r in rows if not np.isinf(r['profit_ratio'])]
|
|
avg_profit_ratio = np.mean(finite_profit_ratios) if finite_profit_ratios else 0
|
|
|
|
# Calculate final USD and fees (no defaults)
|
|
final_usd = np.mean([r['final_usd'] for r in rows])
|
|
total_fees_usd = np.mean([r['total_fees_usd'] for r in rows])
|
|
initial_usd = rows[0]['initial_usd']
|
|
|
|
return {
|
|
"timeframe": timeframe,
|
|
"stop_loss_pct": stop_loss_pct,
|
|
"n_trades": total_trades,
|
|
"n_stop_loss": total_stop_loss,
|
|
"win_rate": avg_win_rate,
|
|
"max_drawdown": avg_max_drawdown,
|
|
"avg_trade": avg_avg_trade,
|
|
"profit_ratio": avg_profit_ratio,
|
|
"initial_usd": initial_usd,
|
|
"final_usd": final_usd,
|
|
"total_fees_usd": total_fees_usd,
|
|
}
|
|
|
|
def save_trade_file(self, trades: List[Dict], timeframe: str, stop_loss_pct: float) -> None:
|
|
"""
|
|
Save individual trade file with summary header
|
|
|
|
Args:
|
|
trades: List of trades for this combination
|
|
timeframe: Timeframe name
|
|
stop_loss_pct: Stop loss percentage
|
|
"""
|
|
if not trades:
|
|
return
|
|
|
|
try:
|
|
# Generate filename
|
|
sl_percent = int(round(stop_loss_pct * 100))
|
|
trades_filename = os.path.join(self.storage.results_dir, f"trades_{timeframe}_ST{sl_percent}pct.csv")
|
|
|
|
# Prepare summary from first trade
|
|
sample_trade = trades[0]
|
|
summary_fields = ["timeframe", "stop_loss_pct", "n_trades", "win_rate"]
|
|
summary_values = [timeframe, stop_loss_pct, len(trades), "calculated_elsewhere"]
|
|
|
|
# Write file with header and trades
|
|
trades_fieldnames = ["entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type", "fee_usd"]
|
|
|
|
with open(trades_filename, "w", newline="") as f:
|
|
# Write summary header
|
|
f.write("\t".join(summary_fields) + "\n")
|
|
f.write("\t".join(str(v) for v in summary_values) + "\n")
|
|
|
|
# Write trades
|
|
writer = csv.DictWriter(f, fieldnames=trades_fieldnames)
|
|
writer.writeheader()
|
|
for trade in trades:
|
|
# Validate all required fields are present
|
|
missing_fields = [k for k in trades_fieldnames if k not in trade]
|
|
if missing_fields:
|
|
raise ValueError(f"Trade missing required fields for CSV: {missing_fields} in trade: {trade}")
|
|
writer.writerow({k: trade[k] for k in trades_fieldnames})
|
|
|
|
if self.logging:
|
|
self.logging.info(f"Trades saved to {trades_filename}")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to save trades file for {timeframe}_ST{int(round(stop_loss_pct * 100))}pct: {e}"
|
|
if self.logging:
|
|
self.logging.error(error_msg)
|
|
raise RuntimeError(error_msg) from e
|
|
|
|
def save_backtest_results(
|
|
self,
|
|
results: List[Dict],
|
|
metadata_lines: List[str],
|
|
timestamp: str
|
|
) -> str:
|
|
"""
|
|
Save aggregated backtest results to CSV file
|
|
|
|
Args:
|
|
results: List of aggregated result dictionaries
|
|
metadata_lines: List of metadata strings
|
|
timestamp: Timestamp for filename
|
|
|
|
Returns:
|
|
Path to saved file
|
|
"""
|
|
try:
|
|
filename = f"{timestamp}_backtest.csv"
|
|
fieldnames = [
|
|
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
|
|
"max_drawdown", "avg_trade", "profit_ratio", "final_usd", "total_fees_usd"
|
|
]
|
|
|
|
filepath = self.storage.write_backtest_results(filename, fieldnames, results, metadata_lines)
|
|
|
|
if self.logging:
|
|
self.logging.info(f"Backtest results saved to {filepath}")
|
|
|
|
return filepath
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to save backtest results: {e}"
|
|
if self.logging:
|
|
self.logging.error(error_msg)
|
|
raise RuntimeError(error_msg) from e
|
|
|
|
def get_price_info(self, data_df: pd.DataFrame, date: str) -> Tuple[Optional[str], Optional[float]]:
|
|
"""
|
|
Get nearest price information for a given date
|
|
|
|
Args:
|
|
data_df: DataFrame with price data
|
|
date: Target date string
|
|
|
|
Returns:
|
|
Tuple of (nearest_time, price) or (None, None) if no data
|
|
"""
|
|
try:
|
|
if len(data_df) == 0:
|
|
return None, None
|
|
|
|
target_ts = pd.to_datetime(date)
|
|
nearest_idx = data_df.index.get_indexer([target_ts], method='nearest')[0]
|
|
nearest_time = data_df.index[nearest_idx]
|
|
price = data_df.iloc[nearest_idx]['close']
|
|
|
|
return str(nearest_time), float(price)
|
|
|
|
except Exception as e:
|
|
if self.logging:
|
|
self.logging.warning(f"Could not get price info for {date}: {e}")
|
|
return None, None
|
|
|
|
def save_all_trade_files(self, all_trades: List[Dict]) -> None:
|
|
"""
|
|
Save all trade files in batch after parallel execution completes
|
|
|
|
Args:
|
|
all_trades: List of all trades from all tasks
|
|
"""
|
|
if not all_trades:
|
|
return
|
|
|
|
try:
|
|
# Group trades by timeframe and stop loss
|
|
trade_groups = {}
|
|
for trade in all_trades:
|
|
timeframe = trade.get('timeframe')
|
|
stop_loss_pct = trade.get('stop_loss_pct')
|
|
if timeframe and stop_loss_pct is not None:
|
|
key = (timeframe, stop_loss_pct)
|
|
if key not in trade_groups:
|
|
trade_groups[key] = []
|
|
trade_groups[key].append(trade)
|
|
|
|
# Save each group
|
|
for (timeframe, stop_loss_pct), trades in trade_groups.items():
|
|
self.save_trade_file(trades, timeframe, stop_loss_pct)
|
|
|
|
if self.logging:
|
|
self.logging.info(f"Saved {len(trade_groups)} trade files in batch")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to save trade files in batch: {e}"
|
|
if self.logging:
|
|
self.logging.error(error_msg)
|
|
raise RuntimeError(error_msg) from e |