- Added error handling in DataLoader to attempt reading CSV files with a fallback to the Python engine if the default engine fails. - Converted numpy float32 columns to Python float for compatibility in DataLoader. - Updated MinuteDataBuffer to accept both Python and numpy numeric types, ensuring consistent data validation and conversion.
498 lines
20 KiB
Python
498 lines
20 KiB
Python
"""
|
|
Backtester Utilities
|
|
|
|
This module provides utility functions for data loading, system resource management,
|
|
and result saving for the incremental backtesting framework.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import pandas as pd
|
|
import numpy as np
|
|
import psutil
|
|
from typing import Dict, List, Any, Optional
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DataLoader:
|
|
"""
|
|
Data loading utilities for backtesting.
|
|
|
|
This class handles loading and preprocessing of market data from various formats
|
|
including CSV and JSON files.
|
|
"""
|
|
|
|
def __init__(self, data_dir: str = "data"):
|
|
"""
|
|
Initialize data loader.
|
|
|
|
Args:
|
|
data_dir: Directory containing data files
|
|
"""
|
|
self.data_dir = data_dir
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
|
|
def load_data(self, file_path: str, start_date: str, end_date: str) -> pd.DataFrame:
|
|
"""
|
|
Load data with optimized dtypes and filtering, supporting CSV and JSON input.
|
|
|
|
Args:
|
|
file_path: Path to the data file (relative to data_dir)
|
|
start_date: Start date for filtering (YYYY-MM-DD format)
|
|
end_date: End date for filtering (YYYY-MM-DD format)
|
|
|
|
Returns:
|
|
pd.DataFrame: Loaded OHLCV data with DatetimeIndex
|
|
"""
|
|
full_path = os.path.join(self.data_dir, file_path)
|
|
|
|
if not os.path.exists(full_path):
|
|
raise FileNotFoundError(f"Data file not found: {full_path}")
|
|
|
|
# Determine file type
|
|
_, ext = os.path.splitext(file_path)
|
|
ext = ext.lower()
|
|
|
|
try:
|
|
if ext == ".json":
|
|
return self._load_json_data(full_path, start_date, end_date)
|
|
else:
|
|
return self._load_csv_data(full_path, start_date, end_date)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading data from {file_path}: {e}")
|
|
# Return an empty DataFrame with a DatetimeIndex
|
|
return pd.DataFrame(index=pd.to_datetime([]))
|
|
|
|
def _load_json_data(self, file_path: str, start_date: str, end_date: str) -> pd.DataFrame:
|
|
"""Load data from JSON file."""
|
|
with open(file_path, 'r') as f:
|
|
raw = json.load(f)
|
|
|
|
data = pd.DataFrame(raw["Data"])
|
|
|
|
# Convert columns to lowercase
|
|
data.columns = data.columns.str.lower()
|
|
|
|
# Convert timestamp to datetime
|
|
data["timestamp"] = pd.to_datetime(data["timestamp"], unit="s")
|
|
|
|
# Filter by date range
|
|
data = data[(data["timestamp"] >= start_date) & (data["timestamp"] <= end_date)]
|
|
|
|
logger.info(f"JSON data loaded: {len(data)} rows for {start_date} to {end_date}")
|
|
return data.set_index("timestamp")
|
|
|
|
def _load_csv_data(self, file_path: str, start_date: str, end_date: str) -> pd.DataFrame:
|
|
"""Load data from CSV file."""
|
|
# Define optimized dtypes
|
|
dtypes = {
|
|
'Open': 'float32',
|
|
'High': 'float32',
|
|
'Low': 'float32',
|
|
'Close': 'float32',
|
|
'Volume': 'float32'
|
|
}
|
|
|
|
# Read data with original capitalized column names
|
|
try:
|
|
data = pd.read_csv(file_path, dtype=dtypes)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to read CSV with default engine, trying python engine: {e}")
|
|
data = pd.read_csv(file_path, dtype=dtypes, engine='python')
|
|
|
|
# Handle timestamp column
|
|
if 'Timestamp' in data.columns:
|
|
data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s')
|
|
# Filter by date range
|
|
data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= end_date)]
|
|
# Convert column names to lowercase
|
|
data.columns = data.columns.str.lower()
|
|
|
|
# Convert numpy float32 to Python float for compatibility
|
|
numeric_columns = ['open', 'high', 'low', 'close', 'volume']
|
|
for col in numeric_columns:
|
|
if col in data.columns:
|
|
data[col] = data[col].astype(float)
|
|
|
|
logger.info(f"CSV data loaded: {len(data)} rows for {start_date} to {end_date}")
|
|
return data.set_index('timestamp')
|
|
else:
|
|
# Attempt to use the first column if 'Timestamp' is not present
|
|
data.rename(columns={data.columns[0]: 'timestamp'}, inplace=True)
|
|
data['timestamp'] = pd.to_datetime(data['timestamp'], unit='s')
|
|
data = data[(data['timestamp'] >= start_date) & (data['timestamp'] <= end_date)]
|
|
data.columns = data.columns.str.lower()
|
|
|
|
# Convert numpy float32 to Python float for compatibility
|
|
numeric_columns = ['open', 'high', 'low', 'close', 'volume']
|
|
for col in numeric_columns:
|
|
if col in data.columns:
|
|
data[col] = data[col].astype(float)
|
|
|
|
logger.info(f"CSV data loaded (first column as timestamp): {len(data)} rows for {start_date} to {end_date}")
|
|
return data.set_index('timestamp')
|
|
|
|
def validate_data(self, data: pd.DataFrame) -> bool:
|
|
"""
|
|
Validate loaded data for required columns and basic integrity.
|
|
|
|
Args:
|
|
data: DataFrame to validate
|
|
|
|
Returns:
|
|
bool: True if data is valid
|
|
"""
|
|
if data.empty:
|
|
logger.error("Data is empty")
|
|
return False
|
|
|
|
required_columns = ['open', 'high', 'low', 'close', 'volume']
|
|
missing_columns = [col for col in required_columns if col not in data.columns]
|
|
|
|
if missing_columns:
|
|
logger.error(f"Missing required columns: {missing_columns}")
|
|
return False
|
|
|
|
# Check for NaN values
|
|
if data[required_columns].isnull().any().any():
|
|
logger.warning("Data contains NaN values")
|
|
|
|
# Check for negative prices
|
|
price_columns = ['open', 'high', 'low', 'close']
|
|
if (data[price_columns] <= 0).any().any():
|
|
logger.warning("Data contains non-positive prices")
|
|
|
|
# Check OHLC consistency
|
|
if not ((data['low'] <= data['open']) &
|
|
(data['low'] <= data['close']) &
|
|
(data['high'] >= data['open']) &
|
|
(data['high'] >= data['close'])).all():
|
|
logger.warning("Data contains OHLC inconsistencies")
|
|
|
|
return True
|
|
|
|
|
|
class SystemUtils:
|
|
"""
|
|
System resource management utilities.
|
|
|
|
This class provides methods for determining optimal system resource usage
|
|
for parallel processing and performance optimization.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize system utilities."""
|
|
pass
|
|
|
|
def get_optimal_workers(self) -> int:
|
|
"""
|
|
Determine optimal number of worker processes based on system resources.
|
|
|
|
Returns:
|
|
int: Optimal number of worker processes
|
|
"""
|
|
cpu_count = os.cpu_count() or 4
|
|
memory_gb = psutil.virtual_memory().total / (1024**3)
|
|
|
|
# Heuristic: Use 75% of cores, but cap based on available memory
|
|
# Assume each worker needs ~2GB for large datasets
|
|
workers_by_memory = max(1, int(memory_gb / 2))
|
|
workers_by_cpu = max(1, int(cpu_count * 0.75))
|
|
|
|
optimal_workers = min(workers_by_cpu, workers_by_memory)
|
|
|
|
logger.info(f"System resources: {cpu_count} CPUs, {memory_gb:.1f}GB RAM")
|
|
logger.info(f"Using {optimal_workers} workers for processing")
|
|
|
|
return optimal_workers
|
|
|
|
def get_system_info(self) -> Dict[str, Any]:
|
|
"""
|
|
Get comprehensive system information.
|
|
|
|
Returns:
|
|
Dict containing system information
|
|
"""
|
|
memory = psutil.virtual_memory()
|
|
|
|
return {
|
|
"cpu_count": os.cpu_count(),
|
|
"memory_total_gb": memory.total / (1024**3),
|
|
"memory_available_gb": memory.available / (1024**3),
|
|
"memory_percent": memory.percent,
|
|
"optimal_workers": self.get_optimal_workers()
|
|
}
|
|
|
|
|
|
class ResultsSaver:
|
|
"""
|
|
Results saving utilities for backtesting.
|
|
|
|
This class handles saving backtest results in various formats including
|
|
CSV, JSON, and comprehensive reports.
|
|
"""
|
|
|
|
def __init__(self, results_dir: str = "results"):
|
|
"""
|
|
Initialize results saver.
|
|
|
|
Args:
|
|
results_dir: Directory for saving results
|
|
"""
|
|
self.results_dir = results_dir
|
|
os.makedirs(self.results_dir, exist_ok=True)
|
|
|
|
def save_results_csv(self, results: List[Dict[str, Any]], filename: str) -> None:
|
|
"""
|
|
Save backtest results to CSV file.
|
|
|
|
Args:
|
|
results: List of backtest results
|
|
filename: Output filename
|
|
"""
|
|
try:
|
|
# Convert results to DataFrame for easy saving
|
|
df_data = []
|
|
for result in results:
|
|
if result.get("success", True):
|
|
row = {
|
|
"strategy_name": result.get("strategy_name", ""),
|
|
"profit_ratio": result.get("profit_ratio", 0),
|
|
"final_usd": result.get("final_usd", 0),
|
|
"n_trades": result.get("n_trades", 0),
|
|
"win_rate": result.get("win_rate", 0),
|
|
"max_drawdown": result.get("max_drawdown", 0),
|
|
"avg_trade": result.get("avg_trade", 0),
|
|
"total_fees_usd": result.get("total_fees_usd", 0),
|
|
"backtest_duration_seconds": result.get("backtest_duration_seconds", 0),
|
|
"data_points_processed": result.get("data_points_processed", 0)
|
|
}
|
|
|
|
# Add strategy parameters
|
|
strategy_params = result.get("strategy_params", {})
|
|
for key, value in strategy_params.items():
|
|
row[f"strategy_{key}"] = value
|
|
|
|
# Add trader parameters
|
|
trader_params = result.get("trader_params", {})
|
|
for key, value in trader_params.items():
|
|
row[f"trader_{key}"] = value
|
|
|
|
df_data.append(row)
|
|
|
|
# Save to CSV
|
|
df = pd.DataFrame(df_data)
|
|
full_path = os.path.join(self.results_dir, filename)
|
|
df.to_csv(full_path, index=False)
|
|
|
|
logger.info(f"Results saved to {full_path}: {len(df_data)} rows")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving results to {filename}: {e}")
|
|
raise
|
|
|
|
def save_comprehensive_results(self, results: List[Dict[str, Any]],
|
|
base_filename: str,
|
|
summary: Optional[Dict[str, Any]] = None,
|
|
action_log: Optional[List[Dict[str, Any]]] = None,
|
|
session_start_time: Optional[datetime] = None) -> None:
|
|
"""
|
|
Save comprehensive backtest results including summary, individual results, and logs.
|
|
|
|
Args:
|
|
results: List of backtest results
|
|
base_filename: Base filename (without extension)
|
|
summary: Optional summary statistics
|
|
action_log: Optional action log
|
|
session_start_time: Optional session start time
|
|
"""
|
|
try:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
session_start = session_start_time or datetime.now()
|
|
|
|
# 1. Save summary report
|
|
if summary is None:
|
|
summary = self._calculate_summary_statistics(results)
|
|
|
|
summary_data = {
|
|
"session_info": {
|
|
"timestamp": timestamp,
|
|
"session_start": session_start.isoformat(),
|
|
"session_duration_seconds": (datetime.now() - session_start).total_seconds()
|
|
},
|
|
"summary_statistics": summary,
|
|
"action_log_summary": {
|
|
"total_actions": len(action_log) if action_log else 0,
|
|
"action_types": list(set(action["action_type"] for action in action_log)) if action_log else []
|
|
}
|
|
}
|
|
|
|
summary_filename = f"{base_filename}_summary_{timestamp}.json"
|
|
self._save_json(summary_data, summary_filename)
|
|
|
|
# 2. Save detailed results CSV
|
|
self.save_results_csv(results, f"{base_filename}_detailed_{timestamp}.csv")
|
|
|
|
# 3. Save individual strategy results
|
|
valid_results = [r for r in results if r.get("success", True)]
|
|
for i, result in enumerate(valid_results):
|
|
strategy_filename = f"{base_filename}_strategy_{i+1}_{result['strategy_name']}_{timestamp}.json"
|
|
strategy_data = self._format_strategy_result(result)
|
|
self._save_json(strategy_data, strategy_filename)
|
|
|
|
# 4. Save action log if provided
|
|
if action_log:
|
|
action_log_filename = f"{base_filename}_actions_{timestamp}.json"
|
|
action_log_data = {
|
|
"session_info": {
|
|
"timestamp": timestamp,
|
|
"session_start": session_start.isoformat(),
|
|
"total_actions": len(action_log)
|
|
},
|
|
"actions": action_log
|
|
}
|
|
self._save_json(action_log_data, action_log_filename)
|
|
|
|
# 5. Create master index file
|
|
index_filename = f"{base_filename}_index_{timestamp}.json"
|
|
index_data = self._create_index_file(base_filename, timestamp, valid_results, summary)
|
|
self._save_json(index_data, index_filename)
|
|
|
|
# Print summary
|
|
print(f"\n📊 Comprehensive results saved:")
|
|
print(f" 📋 Summary: {self.results_dir}/{summary_filename}")
|
|
print(f" 📈 Detailed CSV: {self.results_dir}/{base_filename}_detailed_{timestamp}.csv")
|
|
if action_log:
|
|
print(f" 📝 Action Log: {self.results_dir}/{action_log_filename}")
|
|
print(f" 📁 Individual Strategies: {len(valid_results)} files")
|
|
print(f" 🗂️ Master Index: {self.results_dir}/{index_filename}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving comprehensive results: {e}")
|
|
raise
|
|
|
|
def _save_json(self, data: Dict[str, Any], filename: str) -> None:
|
|
"""Save data to JSON file."""
|
|
full_path = os.path.join(self.results_dir, filename)
|
|
with open(full_path, 'w') as f:
|
|
json.dump(data, f, indent=2, default=str)
|
|
logger.info(f"JSON saved: {full_path}")
|
|
|
|
def _calculate_summary_statistics(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""Calculate summary statistics from results."""
|
|
valid_results = [r for r in results if r.get("success", True)]
|
|
|
|
if not valid_results:
|
|
return {
|
|
"total_runs": len(results),
|
|
"successful_runs": 0,
|
|
"failed_runs": len(results),
|
|
"error": "No valid results to summarize"
|
|
}
|
|
|
|
# Extract metrics
|
|
profit_ratios = [r["profit_ratio"] for r in valid_results]
|
|
final_balances = [r["final_usd"] for r in valid_results]
|
|
n_trades_list = [r["n_trades"] for r in valid_results]
|
|
win_rates = [r["win_rate"] for r in valid_results]
|
|
max_drawdowns = [r["max_drawdown"] for r in valid_results]
|
|
|
|
return {
|
|
"total_runs": len(results),
|
|
"successful_runs": len(valid_results),
|
|
"failed_runs": len(results) - len(valid_results),
|
|
"profit_ratio": {
|
|
"mean": np.mean(profit_ratios),
|
|
"std": np.std(profit_ratios),
|
|
"min": np.min(profit_ratios),
|
|
"max": np.max(profit_ratios),
|
|
"median": np.median(profit_ratios)
|
|
},
|
|
"final_usd": {
|
|
"mean": np.mean(final_balances),
|
|
"std": np.std(final_balances),
|
|
"min": np.min(final_balances),
|
|
"max": np.max(final_balances),
|
|
"median": np.median(final_balances)
|
|
},
|
|
"n_trades": {
|
|
"mean": np.mean(n_trades_list),
|
|
"std": np.std(n_trades_list),
|
|
"min": np.min(n_trades_list),
|
|
"max": np.max(n_trades_list),
|
|
"median": np.median(n_trades_list)
|
|
},
|
|
"win_rate": {
|
|
"mean": np.mean(win_rates),
|
|
"std": np.std(win_rates),
|
|
"min": np.min(win_rates),
|
|
"max": np.max(win_rates),
|
|
"median": np.median(win_rates)
|
|
},
|
|
"max_drawdown": {
|
|
"mean": np.mean(max_drawdowns),
|
|
"std": np.std(max_drawdowns),
|
|
"min": np.min(max_drawdowns),
|
|
"max": np.max(max_drawdowns),
|
|
"median": np.median(max_drawdowns)
|
|
},
|
|
"best_run": max(valid_results, key=lambda x: x["profit_ratio"]),
|
|
"worst_run": min(valid_results, key=lambda x: x["profit_ratio"])
|
|
}
|
|
|
|
def _format_strategy_result(self, result: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Format individual strategy result for saving."""
|
|
return {
|
|
"strategy_info": {
|
|
"name": result['strategy_name'],
|
|
"params": result.get('strategy_params', {}),
|
|
"trader_params": result.get('trader_params', {})
|
|
},
|
|
"performance": {
|
|
"initial_usd": result['initial_usd'],
|
|
"final_usd": result['final_usd'],
|
|
"profit_ratio": result['profit_ratio'],
|
|
"n_trades": result['n_trades'],
|
|
"win_rate": result['win_rate'],
|
|
"max_drawdown": result['max_drawdown'],
|
|
"avg_trade": result['avg_trade'],
|
|
"total_fees_usd": result['total_fees_usd']
|
|
},
|
|
"execution": {
|
|
"backtest_duration_seconds": result.get('backtest_duration_seconds', 0),
|
|
"data_points_processed": result.get('data_points_processed', 0),
|
|
"warmup_complete": result.get('warmup_complete', False)
|
|
},
|
|
"trades": result.get('trades', [])
|
|
}
|
|
|
|
def _create_index_file(self, base_filename: str, timestamp: str,
|
|
valid_results: List[Dict[str, Any]],
|
|
summary: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Create master index file."""
|
|
return {
|
|
"session_info": {
|
|
"timestamp": timestamp,
|
|
"base_filename": base_filename,
|
|
"total_strategies": len(valid_results)
|
|
},
|
|
"files": {
|
|
"summary": f"{base_filename}_summary_{timestamp}.json",
|
|
"detailed_csv": f"{base_filename}_detailed_{timestamp}.csv",
|
|
"individual_strategies": [
|
|
f"{base_filename}_strategy_{i+1}_{result['strategy_name']}_{timestamp}.json"
|
|
for i, result in enumerate(valid_results)
|
|
]
|
|
},
|
|
"quick_stats": {
|
|
"best_profit": summary.get("profit_ratio", {}).get("max", 0) if summary.get("profit_ratio") else 0,
|
|
"worst_profit": summary.get("profit_ratio", {}).get("min", 0) if summary.get("profit_ratio") else 0,
|
|
"avg_profit": summary.get("profit_ratio", {}).get("mean", 0) if summary.get("profit_ratio") else 0,
|
|
"total_successful_runs": summary.get("successful_runs", 0),
|
|
"total_failed_runs": summary.get("failed_runs", 0)
|
|
}
|
|
} |