Cycles/cycles/utils/result_formatter.py

179 lines
7.0 KiB
Python

import os
import csv
from typing import Dict, List, Optional, Any
from collections import defaultdict
import logging
from .storage_utils import DataSavingError
class ResultFormatter:
"""Handles formatting and writing of backtest results to CSV files"""
def __init__(self, results_dir: str, logging_instance: Optional[logging.Logger] = None):
"""Initialize result formatter
Args:
results_dir: Directory for saving result files
logging_instance: Optional logging instance
"""
self.results_dir = results_dir
self.logging = logging_instance
def format_row(self, row: Dict[str, Any]) -> Dict[str, str]:
"""Format a row for a combined results CSV file
Args:
row: Dictionary containing row data
Returns:
Dictionary with formatted values
"""
return {
"timeframe": row["timeframe"],
"stop_loss_pct": f"{row['stop_loss_pct']*100:.2f}%",
"n_trades": row["n_trades"],
"n_stop_loss": row["n_stop_loss"],
"win_rate": f"{row['win_rate']*100:.2f}%",
"max_drawdown": f"{row['max_drawdown']*100:.2f}%",
"avg_trade": f"{row['avg_trade']*100:.2f}%",
"profit_ratio": f"{row['profit_ratio']*100:.2f}%",
"final_usd": f"{row['final_usd']:.2f}",
"total_fees_usd": f"{row['total_fees_usd']:.2f}",
}
def write_results_chunk(self, filename: str, fieldnames: List[str],
rows: List[Dict], write_header: bool = False,
initial_usd: Optional[float] = None) -> None:
"""Write a chunk of results to a CSV file
Args:
filename: filename to write to
fieldnames: list of fieldnames
rows: list of rows
write_header: whether to write the header
initial_usd: initial USD value for header comment
Raises:
DataSavingError: If writing fails
"""
try:
mode = 'w' if write_header else 'a'
with open(filename, mode, newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if write_header:
if initial_usd is not None:
csvfile.write(f"# initial_usd: {initial_usd}\n")
writer.writeheader()
for row in rows:
# Only keep keys that are in fieldnames
filtered_row = {k: v for k, v in row.items() if k in fieldnames}
writer.writerow(filtered_row)
except Exception as e:
error_msg = f"Failed to write results chunk to {filename}: {e}"
if self.logging is not None:
self.logging.error(error_msg)
raise DataSavingError(error_msg) from e
def write_backtest_results(self, filename: str, fieldnames: List[str],
rows: List[Dict], metadata_lines: Optional[List[str]] = None) -> str:
"""Write combined backtest results to a CSV file
Args:
filename: filename to write to
fieldnames: list of fieldnames
rows: list of result dictionaries
metadata_lines: optional list of strings to write as header comments
Returns:
Full path to the written file
Raises:
DataSavingError: If writing fails
"""
try:
fname = os.path.join(self.results_dir, filename)
with open(fname, "w", newline="") as csvfile:
if metadata_lines:
for line in metadata_lines:
csvfile.write(f"{line}\n")
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='\t')
writer.writeheader()
for row in rows:
writer.writerow(self.format_row(row))
if self.logging is not None:
self.logging.info(f"Combined results written to {fname}")
return fname
except Exception as e:
error_msg = f"Failed to write backtest results to {filename}: {e}"
if self.logging is not None:
self.logging.error(error_msg)
raise DataSavingError(error_msg) from e
def write_trades(self, all_trade_rows: List[Dict], trades_fieldnames: List[str]) -> None:
"""Write trades to separate CSV files grouped by timeframe and stop loss
Args:
all_trade_rows: list of trade dictionaries
trades_fieldnames: list of trade fieldnames
Raises:
DataSavingError: If writing fails
"""
try:
trades_by_combo = self._group_trades_by_combination(all_trade_rows)
for (tf, sl), trades in trades_by_combo.items():
self._write_single_trade_file(tf, sl, trades, trades_fieldnames)
except Exception as e:
error_msg = f"Failed to write trades: {e}"
if self.logging is not None:
self.logging.error(error_msg)
raise DataSavingError(error_msg) from e
def _group_trades_by_combination(self, all_trade_rows: List[Dict]) -> Dict:
"""Group trades by timeframe and stop loss combination
Args:
all_trade_rows: List of trade dictionaries
Returns:
Dictionary grouped by (timeframe, stop_loss_pct) tuples
"""
trades_by_combo = defaultdict(list)
for trade in all_trade_rows:
tf = trade.get("timeframe")
sl = trade.get("stop_loss_pct")
trades_by_combo[(tf, sl)].append(trade)
return trades_by_combo
def _write_single_trade_file(self, timeframe: str, stop_loss_pct: float,
trades: List[Dict], trades_fieldnames: List[str]) -> None:
"""Write trades for a single timeframe/stop-loss combination
Args:
timeframe: Timeframe identifier
stop_loss_pct: Stop loss percentage
trades: List of trades for this combination
trades_fieldnames: List of field names for trades
"""
sl_percent = int(round(stop_loss_pct * 100))
trades_filename = os.path.join(self.results_dir, f"trades_{timeframe}_ST{sl_percent}pct.csv")
with open(trades_filename, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=trades_fieldnames)
writer.writeheader()
for trade in trades:
writer.writerow({k: trade.get(k, "") for k in trades_fieldnames})
if self.logging is not None:
self.logging.info(f"Trades written to {trades_filename}")