Cycles/cycles/utils/storage.py

123 lines
4.4 KiB
Python

import os
import pandas as pd
from typing import Optional, Union, Dict, Any, List
import logging
from .data_loader import DataLoader
from .data_saver import DataSaver
from .result_formatter import ResultFormatter
from .storage_utils import DataLoadingError, DataSavingError
RESULTS_DIR = "../results"
DATA_DIR = "../data"
class Storage:
"""Unified storage interface for data and results operations
Acts as a coordinator for DataLoader, DataSaver, and ResultFormatter components,
maintaining backward compatibility while providing a clean separation of concerns.
"""
def __init__(self, logging=None, results_dir=RESULTS_DIR, data_dir=DATA_DIR):
"""Initialize storage with component instances
Args:
logging: Optional logging instance
results_dir: Directory for results files
data_dir: Directory for data files
"""
self.results_dir = results_dir
self.data_dir = data_dir
self.logging = logging
# Create directories if they don't exist
os.makedirs(self.results_dir, exist_ok=True)
os.makedirs(self.data_dir, exist_ok=True)
# Initialize component instances
self.data_loader = DataLoader(data_dir, logging)
self.data_saver = DataSaver(data_dir, logging)
self.result_formatter = ResultFormatter(results_dir, logging)
def load_data(self, file_path: str, start_date: Union[str, pd.Timestamp],
stop_date: Union[str, pd.Timestamp]) -> pd.DataFrame:
"""Load data with optimized dtypes and filtering, supporting CSV and JSON input
Args:
file_path: path to the data file
start_date: start date (string or datetime-like)
stop_date: stop date (string or datetime-like)
Returns:
pandas DataFrame with timestamp index
Raises:
DataLoadingError: If data loading fails
"""
return self.data_loader.load_data(file_path, start_date, stop_date)
def save_data(self, data: pd.DataFrame, file_path: str) -> None:
"""Save processed data to a CSV file
Args:
data: DataFrame to save
file_path: path to the data file relative to the data_dir
Raises:
DataSavingError: If saving fails
"""
self.data_saver.save_data(data, file_path)
def format_row(self, row: Dict[str, Any]) -> Dict[str, str]:
"""Format a row for a combined results CSV file
Args:
row: Dictionary containing row data
Returns:
Dictionary with formatted values
"""
return self.result_formatter.format_row(row)
def write_results_chunk(self, filename: str, fieldnames: List[str],
rows: List[Dict], write_header: bool = False,
initial_usd: Optional[float] = None) -> None:
"""Write a chunk of results to a CSV file
Args:
filename: filename to write to
fieldnames: list of fieldnames
rows: list of rows
write_header: whether to write the header
initial_usd: initial USD value for header comment
"""
self.result_formatter.write_results_chunk(
filename, fieldnames, rows, write_header, initial_usd
)
def write_backtest_results(self, filename: str, fieldnames: List[str],
rows: List[Dict], metadata_lines: Optional[List[str]] = None) -> str:
"""Write combined backtest results to a CSV file
Args:
filename: filename to write to
fieldnames: list of fieldnames
rows: list of result dictionaries
metadata_lines: optional list of strings to write as header comments
Returns:
Full path to the written file
"""
return self.result_formatter.write_backtest_results(
filename, fieldnames, rows, metadata_lines
)
def write_trades(self, all_trade_rows: List[Dict], trades_fieldnames: List[str]) -> None:
"""Write trades to separate CSV files grouped by timeframe and stop loss
Args:
all_trade_rows: list of trade dictionaries
trades_fieldnames: list of trade fieldnames
"""
self.result_formatter.write_trades(all_trade_rows, trades_fieldnames)