Cycles/cycles/utils/storage.py

211 lines
9.8 KiB
Python
Raw Normal View History

2025-05-20 16:59:17 +08:00
import os
import json
import pandas as pd
import csv
from collections import defaultdict
RESULTS_DIR = "results"
DATA_DIR = "data"
class Storage:
"""Storage class for storing and loading results and data"""
def __init__(self, logging=None, results_dir=RESULTS_DIR, data_dir=DATA_DIR):
self.results_dir = results_dir
self.data_dir = data_dir
self.logging = logging
# Create directories if they don't exist
os.makedirs(self.results_dir, exist_ok=True)
os.makedirs(self.data_dir, exist_ok=True)
def load_data(self, file_path, start_date, stop_date):
"""Load data with optimized dtypes and filtering, supporting CSV and JSON input
Args:
file_path: path to the data file
start_date: start date
stop_date: stop date
Returns:
pandas DataFrame
"""
# Determine file type
_, ext = os.path.splitext(file_path)
ext = ext.lower()
try:
if ext == ".json":
with open(os.path.join(self.data_dir, file_path), 'r') as f:
raw = json.load(f)
data = pd.DataFrame(raw["Data"])
# Convert columns to lowercase
data.columns = data.columns.str.lower()
# Convert timestamp to datetime
data["timestamp"] = pd.to_datetime(data["timestamp"], unit="s")
# Filter by date range
data = data[(data["timestamp"] >= start_date) & (data["timestamp"] <= stop_date)]
if self.logging is not None:
self.logging.info(f"Data loaded from {file_path} for date range {start_date} to {stop_date}")
return data.set_index("timestamp")
else:
# Define optimized dtypes
dtypes = {
'Open': 'float32',
'High': 'float32',
'Low': 'float32',
'Close': 'float32',
'Volume': 'float32'
}
# Read data with original capitalized column names
data = pd.read_csv(os.path.join(self.data_dir, file_path), dtype=dtypes)
2025-05-20 18:28:53 +08:00
2025-05-20 16:59:17 +08:00
# Convert timestamp to datetime
2025-05-20 18:28:53 +08:00
if 'Timestamp' in data.columns:
data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s')
# Filter by date range
data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)]
# Now convert column names to lowercase
data.columns = data.columns.str.lower()
if self.logging is not None:
self.logging.info(f"Data loaded from {file_path} for date range {start_date} to {stop_date}")
return data.set_index('timestamp')
else: # Attempt to use the first column if 'Timestamp' is not present
data.rename(columns={data.columns[0]: 'timestamp'}, inplace=True)
data['timestamp'] = pd.to_datetime(data['timestamp'], unit='s')
data = data[(data['timestamp'] >= start_date) & (data['timestamp'] <= stop_date)]
data.columns = data.columns.str.lower() # Ensure all other columns are lower
if self.logging is not None:
self.logging.info(f"Data loaded from {file_path} (using first column as timestamp) for date range {start_date} to {stop_date}")
return data.set_index('timestamp')
2025-05-20 16:59:17 +08:00
except Exception as e:
if self.logging is not None:
self.logging.error(f"Error loading data from {file_path}: {e}")
2025-05-20 18:28:53 +08:00
# Return an empty DataFrame with a DatetimeIndex
return pd.DataFrame(index=pd.to_datetime([]))
def save_data(self, data: pd.DataFrame, file_path: str):
"""Save processed data to a CSV file.
If the DataFrame has a DatetimeIndex, it's converted to float Unix timestamps
(seconds since epoch) before saving. The index is saved as a column named 'timestamp'.
Args:
data (pd.DataFrame): data to save.
file_path (str): path to the data file relative to the data_dir.
"""
data_to_save = data.copy()
if isinstance(data_to_save.index, pd.DatetimeIndex):
# Convert DatetimeIndex to Unix timestamp (float seconds since epoch)
# and make it a column named 'timestamp'.
data_to_save['timestamp'] = data_to_save.index.astype('int64') / 1e9
# Reset index so 'timestamp' column is saved and old DatetimeIndex is not saved as a column.
# We want the 'timestamp' column to be the first one.
data_to_save.reset_index(drop=True, inplace=True)
# Ensure 'timestamp' is the first column if other columns exist
if 'timestamp' in data_to_save.columns and len(data_to_save.columns) > 1:
cols = ['timestamp'] + [col for col in data_to_save.columns if col != 'timestamp']
data_to_save = data_to_save[cols]
elif pd.api.types.is_numeric_dtype(data_to_save.index.dtype):
# If index is already numeric (e.g. float Unix timestamps from a previous save/load cycle),
# make it a column named 'timestamp'.
data_to_save['timestamp'] = data_to_save.index
data_to_save.reset_index(drop=True, inplace=True)
if 'timestamp' in data_to_save.columns and len(data_to_save.columns) > 1:
cols = ['timestamp'] + [col for col in data_to_save.columns if col != 'timestamp']
data_to_save = data_to_save[cols]
else:
# For other index types, or if no index that we want to specifically handle,
# save with the current index. pandas to_csv will handle it.
# This branch might be removed if we strictly expect either DatetimeIndex or a numeric one from previous save.
pass # data_to_save remains as is, to_csv will write its index if index=True
2025-05-20 16:59:17 +08:00
2025-05-20 18:28:53 +08:00
# Save to CSV, ensuring the 'timestamp' column (if created) is written, and not the DataFrame's active index.
full_path = os.path.join(self.data_dir, file_path)
data_to_save.to_csv(full_path, index=False) # index=False because timestamp is now a column
if self.logging is not None:
self.logging.info(f"Data saved to {full_path} with Unix timestamp column.")
2025-05-20 16:59:17 +08:00
def format_row(self, row):
"""Format a row for a combined results CSV file
Args:
row: row to format
Returns:
formatted row
"""
return {
"timeframe": row["timeframe"],
"stop_loss_pct": f"{row['stop_loss_pct']*100:.2f}%",
"n_trades": row["n_trades"],
"n_stop_loss": row["n_stop_loss"],
"win_rate": f"{row['win_rate']*100:.2f}%",
"max_drawdown": f"{row['max_drawdown']*100:.2f}%",
"avg_trade": f"{row['avg_trade']*100:.2f}%",
"profit_ratio": f"{row['profit_ratio']*100:.2f}%",
"final_usd": f"{row['final_usd']:.2f}",
"total_fees_usd": f"{row['total_fees_usd']:.2f}",
2025-05-20 16:59:17 +08:00
}
def write_results_chunk(self, filename, fieldnames, rows, write_header=False, initial_usd=None):
"""Write a chunk of results to a CSV file
Args:
filename: filename to write to
fieldnames: list of fieldnames
rows: list of rows
write_header: whether to write the header
initial_usd: initial USD
"""
mode = 'w' if write_header else 'a'
with open(filename, mode, newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if write_header:
csvfile.write(f"# initial_usd: {initial_usd}\n")
writer.writeheader()
for row in rows:
# Only keep keys that are in fieldnames
filtered_row = {k: v for k, v in row.items() if k in fieldnames}
writer.writerow(filtered_row)
def write_results_combined(self, filename, fieldnames, rows):
"""Write a combined results to a CSV file
Args:
filename: filename to write to
fieldnames: list of fieldnames
rows: list of rows
"""
fname = os.path.join(self.results_dir, filename)
with open(fname, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='\t')
writer.writeheader()
for row in rows:
writer.writerow(self.format_row(row))
if self.logging is not None:
self.logging.info(f"Combined results written to {fname}")
def write_trades(self, all_trade_rows, trades_fieldnames):
"""Write trades to a CSV file
Args:
all_trade_rows: list of trade rows
trades_fieldnames: list of trade fieldnames
logging: logging object
"""
trades_by_combo = defaultdict(list)
for trade in all_trade_rows:
tf = trade.get("timeframe")
sl = trade.get("stop_loss_pct")
trades_by_combo[(tf, sl)].append(trade)
for (tf, sl), trades in trades_by_combo.items():
sl_percent = int(round(sl * 100))
trades_filename = os.path.join(self.results_dir, f"trades_{tf}_ST{sl_percent}pct.csv")
with open(trades_filename, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=trades_fieldnames)
writer.writeheader()
for trade in trades:
writer.writerow({k: trade.get(k, "") for k in trades_fieldnames})
if self.logging is not None:
self.logging.info(f"Trades written to {trades_filename}")