Merge branch 'main' of ssh://dep.sokaris.link:2222/Simon/Cycles
This commit is contained in:
0
cycles/Analysis/__init__.py
Normal file
0
cycles/Analysis/__init__.py
Normal file
50
cycles/Analysis/boillinger_band.py
Normal file
50
cycles/Analysis/boillinger_band.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import pandas as pd
|
||||
|
||||
class BollingerBands:
|
||||
"""
|
||||
Calculates Bollinger Bands for given financial data.
|
||||
"""
|
||||
def __init__(self, period: int = 20, std_dev_multiplier: float = 2.0):
|
||||
"""
|
||||
Initializes the BollingerBands calculator.
|
||||
|
||||
Args:
|
||||
period (int): The period for the moving average and standard deviation.
|
||||
std_dev_multiplier (float): The number of standard deviations for the upper and lower bands.
|
||||
"""
|
||||
if period <= 0:
|
||||
raise ValueError("Period must be a positive integer.")
|
||||
if std_dev_multiplier <= 0:
|
||||
raise ValueError("Standard deviation multiplier must be positive.")
|
||||
|
||||
self.period = period
|
||||
self.std_dev_multiplier = std_dev_multiplier
|
||||
|
||||
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
|
||||
"""
|
||||
Calculates Bollinger Bands and adds them to the DataFrame.
|
||||
|
||||
Args:
|
||||
data_df (pd.DataFrame): DataFrame with price data. Must include the price_column.
|
||||
price_column (str): The name of the column containing the price data (e.g., 'close').
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: The original DataFrame with added columns:
|
||||
'SMA' (Simple Moving Average),
|
||||
'UpperBand',
|
||||
'LowerBand'.
|
||||
"""
|
||||
if price_column not in data_df.columns:
|
||||
raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
|
||||
|
||||
# Calculate SMA
|
||||
data_df['SMA'] = data_df[price_column].rolling(window=self.period).mean()
|
||||
|
||||
# Calculate Standard Deviation
|
||||
std_dev = data_df[price_column].rolling(window=self.period).std()
|
||||
|
||||
# Calculate Upper and Lower Bands
|
||||
data_df['UpperBand'] = data_df['SMA'] + (self.std_dev_multiplier * std_dev)
|
||||
data_df['LowerBand'] = data_df['SMA'] - (self.std_dev_multiplier * std_dev)
|
||||
|
||||
return data_df
|
||||
109
cycles/Analysis/rsi.py
Normal file
109
cycles/Analysis/rsi.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
class RSI:
|
||||
"""
|
||||
A class to calculate the Relative Strength Index (RSI).
|
||||
"""
|
||||
def __init__(self, period: int = 14):
|
||||
"""
|
||||
Initializes the RSI calculator.
|
||||
|
||||
Args:
|
||||
period (int): The period for RSI calculation. Default is 14.
|
||||
Must be a positive integer.
|
||||
"""
|
||||
if not isinstance(period, int) or period <= 0:
|
||||
raise ValueError("Period must be a positive integer.")
|
||||
self.period = period
|
||||
|
||||
def calculate(self, data_df: pd.DataFrame, price_column: str = 'close') -> pd.DataFrame:
|
||||
"""
|
||||
Calculates the RSI and adds it as a column to the input DataFrame.
|
||||
|
||||
Args:
|
||||
data_df (pd.DataFrame): DataFrame with historical price data.
|
||||
Must contain the 'price_column'.
|
||||
price_column (str): The name of the column containing price data.
|
||||
Default is 'close'.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: The input DataFrame with an added 'RSI' column.
|
||||
Returns the original DataFrame with no 'RSI' column
|
||||
if the period is larger than the number of data points.
|
||||
"""
|
||||
if price_column not in data_df.columns:
|
||||
raise ValueError(f"Price column '{price_column}' not found in DataFrame.")
|
||||
|
||||
if len(data_df) < self.period:
|
||||
print(f"Warning: Data length ({len(data_df)}) is less than RSI period ({self.period}). RSI will not be calculated.")
|
||||
return data_df.copy()
|
||||
|
||||
df = data_df.copy()
|
||||
delta = df[price_column].diff(1)
|
||||
|
||||
gain = delta.where(delta > 0, 0)
|
||||
loss = -delta.where(delta < 0, 0) # Ensure loss is positive
|
||||
|
||||
# Calculate initial average gain and loss (SMA)
|
||||
avg_gain = gain.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period]
|
||||
avg_loss = loss.rolling(window=self.period, min_periods=self.period).mean().iloc[self.period -1:self.period]
|
||||
|
||||
|
||||
# Calculate subsequent average gains and losses (EMA-like)
|
||||
# Pre-allocate lists for gains and losses to avoid repeated appending to Series
|
||||
gains = [0.0] * len(df)
|
||||
losses = [0.0] * len(df)
|
||||
|
||||
if not avg_gain.empty:
|
||||
gains[self.period -1] = avg_gain.iloc[0]
|
||||
if not avg_loss.empty:
|
||||
losses[self.period -1] = avg_loss.iloc[0]
|
||||
|
||||
|
||||
for i in range(self.period, len(df)):
|
||||
gains[i] = ((gains[i-1] * (self.period - 1)) + gain.iloc[i]) / self.period
|
||||
losses[i] = ((losses[i-1] * (self.period - 1)) + loss.iloc[i]) / self.period
|
||||
|
||||
df['avg_gain'] = pd.Series(gains, index=df.index)
|
||||
df['avg_loss'] = pd.Series(losses, index=df.index)
|
||||
|
||||
# Calculate RS
|
||||
# Handle division by zero: if avg_loss is 0, RS is undefined or infinite.
|
||||
# If avg_loss is 0 and avg_gain is also 0, RSI is conventionally 50.
|
||||
# If avg_loss is 0 and avg_gain > 0, RSI is conventionally 100.
|
||||
rs = df['avg_gain'] / df['avg_loss']
|
||||
|
||||
# Calculate RSI
|
||||
# RSI = 100 - (100 / (1 + RS))
|
||||
# If avg_loss is 0:
|
||||
# If avg_gain > 0, RS -> inf, RSI -> 100
|
||||
# If avg_gain == 0, RS -> NaN (0/0), RSI -> 50 (conventionally, or could be 0 or 100 depending on interpretation)
|
||||
# We will use a common convention where RSI is 100 if avg_loss is 0 and avg_gain > 0,
|
||||
# and RSI is 0 if avg_loss is 0 and avg_gain is 0 (or 50, let's use 0 to indicate no strength if both are 0).
|
||||
# However, to avoid NaN from 0/0, it's better to calculate RSI directly with conditions.
|
||||
|
||||
rsi_values = []
|
||||
for i in range(len(df)):
|
||||
avg_g = df['avg_gain'].iloc[i]
|
||||
avg_l = df['avg_loss'].iloc[i]
|
||||
|
||||
if i < self.period -1 : # Not enough data for initial SMA
|
||||
rsi_values.append(np.nan)
|
||||
continue
|
||||
|
||||
if avg_l == 0:
|
||||
if avg_g == 0:
|
||||
rsi_values.append(50) # Or 0, or np.nan depending on how you want to treat this. 50 implies neutrality.
|
||||
else:
|
||||
rsi_values.append(100) # Max strength
|
||||
else:
|
||||
rs_val = avg_g / avg_l
|
||||
rsi_values.append(100 - (100 / (1 + rs_val)))
|
||||
|
||||
df['RSI'] = pd.Series(rsi_values, index=df.index)
|
||||
|
||||
# Remove intermediate columns if desired, or keep them for debugging
|
||||
# df.drop(columns=['avg_gain', 'avg_loss'], inplace=True)
|
||||
|
||||
return df
|
||||
0
cycles/__init__.py
Normal file
0
cycles/__init__.py
Normal file
86
cycles/charts.py
Normal file
86
cycles/charts.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import os
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
class BacktestCharts:
|
||||
def __init__(self, charts_dir="charts"):
|
||||
self.charts_dir = charts_dir
|
||||
os.makedirs(self.charts_dir, exist_ok=True)
|
||||
|
||||
def plot_profit_ratio_vs_stop_loss(self, results, filename="profit_ratio_vs_stop_loss.png"):
|
||||
"""
|
||||
Plots profit ratio vs stop loss percentage for each timeframe.
|
||||
|
||||
Parameters:
|
||||
- results: list of dicts, each with keys: 'timeframe', 'stop_loss_pct', 'profit_ratio'
|
||||
- filename: output filename (will be saved in charts_dir)
|
||||
"""
|
||||
# Organize data by timeframe
|
||||
from collections import defaultdict
|
||||
data = defaultdict(lambda: {"stop_loss_pct": [], "profit_ratio": []})
|
||||
for row in results:
|
||||
tf = row["timeframe"]
|
||||
data[tf]["stop_loss_pct"].append(row["stop_loss_pct"])
|
||||
data[tf]["profit_ratio"].append(row["profit_ratio"])
|
||||
|
||||
plt.figure(figsize=(10, 6))
|
||||
for tf, vals in data.items():
|
||||
# Sort by stop_loss_pct for smooth lines
|
||||
sorted_pairs = sorted(zip(vals["stop_loss_pct"], vals["profit_ratio"]))
|
||||
stop_loss, profit_ratio = zip(*sorted_pairs)
|
||||
plt.plot(
|
||||
[s * 100 for s in stop_loss], # Convert to percent
|
||||
profit_ratio,
|
||||
marker="o",
|
||||
label=tf
|
||||
)
|
||||
|
||||
plt.xlabel("Stop Loss (%)")
|
||||
plt.ylabel("Profit Ratio")
|
||||
plt.title("Profit Ratio vs Stop Loss (%) per Timeframe")
|
||||
plt.legend(title="Timeframe")
|
||||
plt.grid(True, linestyle="--", alpha=0.5)
|
||||
plt.tight_layout()
|
||||
|
||||
output_path = os.path.join(self.charts_dir, filename)
|
||||
plt.savefig(output_path)
|
||||
plt.close()
|
||||
|
||||
def plot_average_trade_vs_stop_loss(self, results, filename="average_trade_vs_stop_loss.png"):
|
||||
"""
|
||||
Plots average trade vs stop loss percentage for each timeframe.
|
||||
|
||||
Parameters:
|
||||
- results: list of dicts, each with keys: 'timeframe', 'stop_loss_pct', 'average_trade'
|
||||
- filename: output filename (will be saved in charts_dir)
|
||||
"""
|
||||
from collections import defaultdict
|
||||
data = defaultdict(lambda: {"stop_loss_pct": [], "average_trade": []})
|
||||
for row in results:
|
||||
tf = row["timeframe"]
|
||||
if "average_trade" not in row:
|
||||
continue # Skip rows without average_trade
|
||||
data[tf]["stop_loss_pct"].append(row["stop_loss_pct"])
|
||||
data[tf]["average_trade"].append(row["average_trade"])
|
||||
|
||||
plt.figure(figsize=(10, 6))
|
||||
for tf, vals in data.items():
|
||||
# Sort by stop_loss_pct for smooth lines
|
||||
sorted_pairs = sorted(zip(vals["stop_loss_pct"], vals["average_trade"]))
|
||||
stop_loss, average_trade = zip(*sorted_pairs)
|
||||
plt.plot(
|
||||
[s * 100 for s in stop_loss], # Convert to percent
|
||||
average_trade,
|
||||
marker="o",
|
||||
label=tf
|
||||
)
|
||||
|
||||
plt.xlabel("Stop Loss (%)")
|
||||
plt.ylabel("Average Trade")
|
||||
plt.title("Average Trade vs Stop Loss (%) per Timeframe")
|
||||
plt.legend(title="Timeframe")
|
||||
plt.grid(True, linestyle="--", alpha=0.5)
|
||||
plt.tight_layout()
|
||||
|
||||
output_path = os.path.join(self.charts_dir, filename)
|
||||
plt.savefig(output_path)
|
||||
plt.close()
|
||||
197
cycles/main_debug.py
Normal file
197
cycles/main_debug.py
Normal file
@@ -0,0 +1,197 @@
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from trend_detector_simple import TrendDetectorSimple
|
||||
import os
|
||||
import datetime
|
||||
import csv
|
||||
|
||||
def load_data(file_path, start_date, stop_date):
|
||||
"""Load and filter data by date range."""
|
||||
data = pd.read_csv(file_path)
|
||||
data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s')
|
||||
data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)]
|
||||
data.columns = data.columns.str.lower()
|
||||
return data.set_index('timestamp')
|
||||
|
||||
def process_month_timeframe(min1_df, month_df, stop_loss_pcts, rule_name, initial_usd):
|
||||
"""Process a single month for a given timeframe with all stop loss values."""
|
||||
month_df = month_df.copy().reset_index(drop=True)
|
||||
trend_detector = TrendDetectorSimple(month_df, verbose=False)
|
||||
analysis_results = trend_detector.detect_trends()
|
||||
signal_df = analysis_results.get('signal_df')
|
||||
|
||||
results_rows = []
|
||||
trade_rows = []
|
||||
for stop_loss_pct in stop_loss_pcts:
|
||||
results = trend_detector.backtest_meta_supertrend(
|
||||
min1_df,
|
||||
initial_usd=initial_usd,
|
||||
stop_loss_pct=stop_loss_pct
|
||||
)
|
||||
trades = results.get('trades', [])
|
||||
n_trades = results["n_trades"]
|
||||
n_winning_trades = sum(1 for trade in trades if trade['profit_pct'] > 0)
|
||||
total_profit = sum(trade['profit_pct'] for trade in trades)
|
||||
total_loss = sum(-trade['profit_pct'] for trade in trades if trade['profit_pct'] < 0)
|
||||
win_rate = n_winning_trades / n_trades if n_trades > 0 else 0
|
||||
avg_trade = total_profit / n_trades if n_trades > 0 else 0
|
||||
profit_ratio = total_profit / total_loss if total_loss > 0 else float('inf')
|
||||
|
||||
# Max drawdown
|
||||
cumulative_profit = 0
|
||||
max_drawdown = 0
|
||||
peak = 0
|
||||
for trade in trades:
|
||||
cumulative_profit += trade['profit_pct']
|
||||
if cumulative_profit > peak:
|
||||
peak = cumulative_profit
|
||||
drawdown = peak - cumulative_profit
|
||||
if drawdown > max_drawdown:
|
||||
max_drawdown = drawdown
|
||||
|
||||
# Final USD
|
||||
final_usd = initial_usd
|
||||
for trade in trades:
|
||||
final_usd *= (1 + trade['profit_pct'])
|
||||
|
||||
row = {
|
||||
"timeframe": rule_name,
|
||||
"month": str(month_df['timestamp'].iloc[0].to_period('M')),
|
||||
"stop_loss_pct": stop_loss_pct,
|
||||
"n_trades": n_trades,
|
||||
"n_stop_loss": sum(1 for trade in trades if 'type' in trade and trade['type'] == 'STOP'),
|
||||
"win_rate": win_rate,
|
||||
"max_drawdown": max_drawdown,
|
||||
"avg_trade": avg_trade,
|
||||
"profit_ratio": profit_ratio,
|
||||
"initial_usd": initial_usd,
|
||||
"final_usd": final_usd,
|
||||
}
|
||||
results_rows.append(row)
|
||||
|
||||
for trade in trades:
|
||||
trade_rows.append({
|
||||
"timeframe": rule_name,
|
||||
"month": str(month_df['timestamp'].iloc[0].to_period('M')),
|
||||
"stop_loss_pct": stop_loss_pct,
|
||||
"entry_time": trade.get("entry_time"),
|
||||
"exit_time": trade.get("exit_time"),
|
||||
"entry_price": trade.get("entry_price"),
|
||||
"exit_price": trade.get("exit_price"),
|
||||
"profit_pct": trade.get("profit_pct"),
|
||||
"type": trade.get("type", ""),
|
||||
})
|
||||
|
||||
return results_rows, trade_rows
|
||||
|
||||
def process_timeframe(rule, data_1min, stop_loss_pcts, initial_usd):
|
||||
"""Process an entire timeframe sequentially."""
|
||||
if rule == "1T":
|
||||
df = data_1min.copy()
|
||||
else:
|
||||
df = data_1min.resample(rule).agg({
|
||||
'open': 'first',
|
||||
'high': 'max',
|
||||
'low': 'min',
|
||||
'close': 'last',
|
||||
'volume': 'sum'
|
||||
}).dropna()
|
||||
|
||||
df = df.reset_index()
|
||||
df['month'] = df['timestamp'].dt.to_period('M')
|
||||
results_rows = []
|
||||
all_trade_rows = []
|
||||
|
||||
for month, month_df in df.groupby('month'):
|
||||
if len(month_df) < 10:
|
||||
continue
|
||||
month_results, month_trades = process_month_timeframe(data_1min, month_df, stop_loss_pcts, rule, initial_usd)
|
||||
results_rows.extend(month_results)
|
||||
all_trade_rows.extend(month_trades)
|
||||
|
||||
return results_rows, all_trade_rows
|
||||
|
||||
def aggregate_results(all_rows, initial_usd):
|
||||
"""Aggregate results per stop_loss_pct and per rule (timeframe)."""
|
||||
from collections import defaultdict
|
||||
grouped = defaultdict(list)
|
||||
for row in all_rows:
|
||||
key = (row['timeframe'], row['stop_loss_pct'])
|
||||
grouped[key].append(row)
|
||||
|
||||
summary_rows = []
|
||||
for (rule, stop_loss_pct), rows in grouped.items():
|
||||
n_months = len(rows)
|
||||
total_trades = sum(r['n_trades'] for r in rows)
|
||||
total_stop_loss = sum(r['n_stop_loss'] for r in rows)
|
||||
avg_win_rate = np.mean([r['win_rate'] for r in rows])
|
||||
avg_max_drawdown = np.mean([r['max_drawdown'] for r in rows])
|
||||
avg_avg_trade = np.mean([r['avg_trade'] for r in rows])
|
||||
avg_profit_ratio = np.mean([r['profit_ratio'] for r in rows])
|
||||
final_usd = np.mean([r.get('final_usd', initial_usd) for r in rows])
|
||||
|
||||
summary_rows.append({
|
||||
"timeframe": rule,
|
||||
"stop_loss_pct": stop_loss_pct,
|
||||
"n_trades": total_trades,
|
||||
"n_stop_loss": total_stop_loss,
|
||||
"win_rate": avg_win_rate,
|
||||
"max_drawdown": avg_max_drawdown,
|
||||
"avg_trade": avg_avg_trade,
|
||||
"profit_ratio": avg_profit_ratio,
|
||||
"initial_usd": initial_usd,
|
||||
"final_usd": final_usd,
|
||||
})
|
||||
return summary_rows
|
||||
|
||||
def write_results(filename, fieldnames, rows):
|
||||
"""Write results to a CSV file."""
|
||||
with open(filename, 'w', newline="") as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Config
|
||||
start_date = '2020-01-01'
|
||||
stop_date = '2025-05-15'
|
||||
initial_usd = 10000
|
||||
|
||||
results_dir = "results"
|
||||
os.makedirs(results_dir, exist_ok=True)
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M")
|
||||
|
||||
timeframes = ["6h", "1D"]
|
||||
stop_loss_pcts = [0.01, 0.02, 0.03, 0.05, 0.07, 0.10]
|
||||
|
||||
data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date)
|
||||
print(f"1min rows: {len(data_1min)}")
|
||||
|
||||
filename = os.path.join(
|
||||
results_dir,
|
||||
f"{timestamp}_backtest_results_{start_date}_{stop_date}_multi_timeframe_stoploss.csv"
|
||||
)
|
||||
fieldnames = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd"]
|
||||
|
||||
all_results = []
|
||||
all_trades = []
|
||||
|
||||
for name in timeframes:
|
||||
print(f"Processing timeframe: {name}")
|
||||
results, trades = process_timeframe(name, data_1min, stop_loss_pcts, initial_usd)
|
||||
all_results.extend(results)
|
||||
all_trades.extend(trades)
|
||||
|
||||
summary_rows = aggregate_results(all_results, initial_usd)
|
||||
# write_results(filename, fieldnames, summary_rows)
|
||||
|
||||
trades_filename = os.path.join(
|
||||
results_dir,
|
||||
f"{timestamp}_backtest_trades.csv"
|
||||
)
|
||||
trades_fieldnames = [
|
||||
"timeframe", "month", "stop_loss_pct", "entry_time", "exit_time",
|
||||
"entry_price", "exit_price", "profit_pct", "type"
|
||||
]
|
||||
# write_results(trades_filename, trades_fieldnames, all_trades)
|
||||
25
cycles/taxes.py
Normal file
25
cycles/taxes.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import pandas as pd
|
||||
|
||||
class Taxes:
|
||||
def __init__(self, tax_rate=0.20):
|
||||
"""
|
||||
tax_rate: flat tax rate on positive profits (e.g., 0.20 for 20%)
|
||||
"""
|
||||
self.tax_rate = tax_rate
|
||||
|
||||
def add_taxes_to_results_csv(self, input_csv, output_csv=None, profit_col='final_usd'):
|
||||
"""
|
||||
Reads a backtest results CSV, adds tax columns, and writes to a new CSV.
|
||||
- input_csv: path to the input CSV file
|
||||
- output_csv: path to the output CSV file (if None, overwrite input)
|
||||
- profit_col: column name for profit (default: 'final_usd')
|
||||
"""
|
||||
df = pd.read_csv(input_csv, delimiter=None)
|
||||
# Compute tax only on positive profits
|
||||
df['tax_paid'] = df[profit_col].apply(lambda x: self.tax_rate * x if x > 0 else 0)
|
||||
df['net_profit_after_tax'] = df[profit_col] - df['tax_paid']
|
||||
df['cumulative_tax_paid'] = df['tax_paid'].cumsum()
|
||||
if not output_csv:
|
||||
output_csv = input_csv
|
||||
df.to_csv(output_csv, index=False)
|
||||
return output_csv
|
||||
0
cycles/trend_detector_simple.py
Normal file
0
cycles/trend_detector_simple.py
Normal file
0
cycles/utils/__init__.py
Normal file
0
cycles/utils/__init__.py
Normal file
23
cycles/utils/apply_taxes_to_file.py
Normal file
23
cycles/utils/apply_taxes_to_file.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from taxes import Taxes
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python apply_taxes_to_file.py <input_csv> [profit_col]")
|
||||
sys.exit(1)
|
||||
|
||||
input_csv = sys.argv[1]
|
||||
profit_col = sys.argv[2] if len(sys.argv) > 2 else 'final_usd'
|
||||
|
||||
if not os.path.isfile(input_csv):
|
||||
print(f"File not found: {input_csv}")
|
||||
sys.exit(1)
|
||||
|
||||
base, ext = os.path.splitext(input_csv)
|
||||
output_csv = f"{base}_taxed.csv"
|
||||
|
||||
taxes = Taxes() # Default 20% tax rate
|
||||
taxes.add_taxes_to_results_csv(input_csv, output_csv, profit_col=profit_col)
|
||||
print(f"Taxed file saved as: {output_csv}")
|
||||
60
cycles/utils/data_utils.py
Normal file
60
cycles/utils/data_utils.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import pandas as pd
|
||||
|
||||
def aggregate_to_daily(data_df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
Aggregates time-series financial data to daily OHLCV format.
|
||||
|
||||
The input DataFrame is expected to have a DatetimeIndex.
|
||||
'open' will be the first 'open' price of the day.
|
||||
'close' will be the last 'close' price of the day.
|
||||
'high' will be the maximum 'high' price of the day.
|
||||
'low' will be the minimum 'low' price of the day.
|
||||
'volume' (if present) will be the sum of volumes for the day.
|
||||
|
||||
Args:
|
||||
data_df (pd.DataFrame): DataFrame with a DatetimeIndex and columns
|
||||
like 'open', 'high', 'low', 'close', and optionally 'volume'.
|
||||
Column names are expected to be lowercase.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: DataFrame aggregated to daily OHLCV data.
|
||||
The index will be a DatetimeIndex with the time set to noon (12:00:00) for each day.
|
||||
Returns an empty DataFrame if no relevant OHLCV columns are found.
|
||||
|
||||
Raises:
|
||||
ValueError: If the input DataFrame does not have a DatetimeIndex.
|
||||
"""
|
||||
if not isinstance(data_df.index, pd.DatetimeIndex):
|
||||
raise ValueError("Input DataFrame must have a DatetimeIndex.")
|
||||
|
||||
agg_rules = {}
|
||||
|
||||
# Define aggregation rules based on available columns
|
||||
if 'open' in data_df.columns:
|
||||
agg_rules['open'] = 'first'
|
||||
if 'high' in data_df.columns:
|
||||
agg_rules['high'] = 'max'
|
||||
if 'low' in data_df.columns:
|
||||
agg_rules['low'] = 'min'
|
||||
if 'close' in data_df.columns:
|
||||
agg_rules['close'] = 'last'
|
||||
if 'volume' in data_df.columns:
|
||||
agg_rules['volume'] = 'sum'
|
||||
|
||||
if not agg_rules:
|
||||
# Log a warning or raise an error if no relevant columns are found
|
||||
# For now, returning an empty DataFrame with a message might be suitable for some cases
|
||||
print("Warning: No standard OHLCV columns (open, high, low, close, volume) found for daily aggregation.")
|
||||
return pd.DataFrame(index=pd.to_datetime([])) # Return empty DF with datetime index
|
||||
|
||||
# Resample to daily frequency and apply aggregation rules
|
||||
daily_data = data_df.resample('D').agg(agg_rules)
|
||||
|
||||
# Adjust timestamps to noon if data exists
|
||||
if not daily_data.empty and isinstance(daily_data.index, pd.DatetimeIndex):
|
||||
daily_data.index = daily_data.index + pd.Timedelta(hours=12)
|
||||
|
||||
# Remove rows where all values are NaN (these are days with no trades in the original data)
|
||||
daily_data.dropna(how='all', inplace=True)
|
||||
|
||||
return daily_data
|
||||
128
cycles/utils/gsheets.py
Normal file
128
cycles/utils/gsheets.py
Normal file
@@ -0,0 +1,128 @@
|
||||
import threading
|
||||
import time
|
||||
import queue
|
||||
from google.oauth2.service_account import Credentials
|
||||
import gspread
|
||||
import math
|
||||
import numpy as np
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
class GSheetBatchPusher(threading.Thread):
|
||||
|
||||
def __init__(self, queue, timestamp, spreadsheet_name, interval=60, logging=None):
|
||||
super().__init__(daemon=True)
|
||||
self.queue = queue
|
||||
self.timestamp = timestamp
|
||||
self.spreadsheet_name = spreadsheet_name
|
||||
self.interval = interval
|
||||
self._stop_event = threading.Event()
|
||||
self.logging = logging
|
||||
|
||||
def run(self):
|
||||
while not self._stop_event.is_set():
|
||||
self.push_all()
|
||||
time.sleep(self.interval)
|
||||
# Final push on stop
|
||||
self.push_all()
|
||||
|
||||
def stop(self):
|
||||
self._stop_event.set()
|
||||
|
||||
def push_all(self):
|
||||
batch_results = []
|
||||
batch_trades = []
|
||||
while True:
|
||||
try:
|
||||
results, trades = self.queue.get_nowait()
|
||||
batch_results.extend(results)
|
||||
batch_trades.extend(trades)
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
if batch_results or batch_trades:
|
||||
self.write_results_per_combination_gsheet(batch_results, batch_trades, self.timestamp, self.spreadsheet_name)
|
||||
|
||||
|
||||
def write_results_per_combination_gsheet(self, results_rows, trade_rows, timestamp, spreadsheet_name="GlimBit Backtest Results"):
|
||||
scopes = [
|
||||
"https://www.googleapis.com/auth/spreadsheets",
|
||||
"https://www.googleapis.com/auth/drive"
|
||||
]
|
||||
creds = Credentials.from_service_account_file('credentials/service_account.json', scopes=scopes)
|
||||
gc = gspread.authorize(creds)
|
||||
sh = gc.open(spreadsheet_name)
|
||||
|
||||
try:
|
||||
worksheet = sh.worksheet("Results")
|
||||
except gspread.exceptions.WorksheetNotFound:
|
||||
worksheet = sh.add_worksheet(title="Results", rows="1000", cols="20")
|
||||
|
||||
# Clear the worksheet before writing new results
|
||||
worksheet.clear()
|
||||
|
||||
# Updated fieldnames to match your data rows
|
||||
fieldnames = [
|
||||
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
|
||||
"max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd"
|
||||
]
|
||||
|
||||
def to_native(val):
|
||||
if isinstance(val, (np.generic, np.ndarray)):
|
||||
val = val.item()
|
||||
if hasattr(val, 'isoformat'):
|
||||
return val.isoformat()
|
||||
# Handle inf, -inf, nan
|
||||
if isinstance(val, float):
|
||||
if math.isinf(val):
|
||||
return "∞" if val > 0 else "-∞"
|
||||
if math.isnan(val):
|
||||
return ""
|
||||
return val
|
||||
|
||||
# Write header if sheet is empty
|
||||
if len(worksheet.get_all_values()) == 0:
|
||||
worksheet.append_row(fieldnames)
|
||||
|
||||
for row in results_rows:
|
||||
values = [to_native(row.get(field, "")) for field in fieldnames]
|
||||
worksheet.append_row(values)
|
||||
|
||||
trades_fieldnames = [
|
||||
"entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type"
|
||||
]
|
||||
trades_by_combo = defaultdict(list)
|
||||
|
||||
for trade in trade_rows:
|
||||
tf = trade.get("timeframe")
|
||||
sl = trade.get("stop_loss_pct")
|
||||
trades_by_combo[(tf, sl)].append(trade)
|
||||
|
||||
for (tf, sl), trades in trades_by_combo.items():
|
||||
sl_percent = int(round(sl * 100))
|
||||
sheet_name = f"Trades_{tf}_ST{sl_percent}%"
|
||||
|
||||
try:
|
||||
trades_ws = sh.worksheet(sheet_name)
|
||||
except gspread.exceptions.WorksheetNotFound:
|
||||
trades_ws = sh.add_worksheet(title=sheet_name, rows="1000", cols="20")
|
||||
|
||||
# Clear the trades worksheet before writing new trades
|
||||
trades_ws.clear()
|
||||
|
||||
if len(trades_ws.get_all_values()) == 0:
|
||||
trades_ws.append_row(trades_fieldnames)
|
||||
|
||||
for trade in trades:
|
||||
trade_row = [to_native(trade.get(field, "")) for field in trades_fieldnames]
|
||||
try:
|
||||
trades_ws.append_row(trade_row)
|
||||
except gspread.exceptions.APIError as e:
|
||||
if '429' in str(e):
|
||||
if self.logging is not None:
|
||||
self.logging.warning(f"Google Sheets API quota exceeded (429). Please wait one minute. Will retry on next batch push. Sheet: {sheet_name}")
|
||||
# Re-queue the failed batch for retry
|
||||
self.queue.put((results_rows, trade_rows))
|
||||
return # Stop pushing for this batch, will retry next interval
|
||||
else:
|
||||
raise
|
||||
210
cycles/utils/storage.py
Normal file
210
cycles/utils/storage.py
Normal file
@@ -0,0 +1,210 @@
|
||||
import os
|
||||
import json
|
||||
import pandas as pd
|
||||
import csv
|
||||
from collections import defaultdict
|
||||
|
||||
RESULTS_DIR = "results"
|
||||
DATA_DIR = "data"
|
||||
|
||||
class Storage:
|
||||
|
||||
"""Storage class for storing and loading results and data"""
|
||||
def __init__(self, logging=None, results_dir=RESULTS_DIR, data_dir=DATA_DIR):
|
||||
|
||||
self.results_dir = results_dir
|
||||
self.data_dir = data_dir
|
||||
self.logging = logging
|
||||
|
||||
# Create directories if they don't exist
|
||||
os.makedirs(self.results_dir, exist_ok=True)
|
||||
os.makedirs(self.data_dir, exist_ok=True)
|
||||
|
||||
def load_data(self, file_path, start_date, stop_date):
|
||||
"""Load data with optimized dtypes and filtering, supporting CSV and JSON input
|
||||
Args:
|
||||
file_path: path to the data file
|
||||
start_date: start date
|
||||
stop_date: stop date
|
||||
Returns:
|
||||
pandas DataFrame
|
||||
"""
|
||||
# Determine file type
|
||||
_, ext = os.path.splitext(file_path)
|
||||
ext = ext.lower()
|
||||
try:
|
||||
if ext == ".json":
|
||||
with open(os.path.join(self.data_dir, file_path), 'r') as f:
|
||||
raw = json.load(f)
|
||||
data = pd.DataFrame(raw["Data"])
|
||||
# Convert columns to lowercase
|
||||
data.columns = data.columns.str.lower()
|
||||
# Convert timestamp to datetime
|
||||
data["timestamp"] = pd.to_datetime(data["timestamp"], unit="s")
|
||||
# Filter by date range
|
||||
data = data[(data["timestamp"] >= start_date) & (data["timestamp"] <= stop_date)]
|
||||
if self.logging is not None:
|
||||
self.logging.info(f"Data loaded from {file_path} for date range {start_date} to {stop_date}")
|
||||
return data.set_index("timestamp")
|
||||
else:
|
||||
# Define optimized dtypes
|
||||
dtypes = {
|
||||
'Open': 'float32',
|
||||
'High': 'float32',
|
||||
'Low': 'float32',
|
||||
'Close': 'float32',
|
||||
'Volume': 'float32'
|
||||
}
|
||||
# Read data with original capitalized column names
|
||||
data = pd.read_csv(os.path.join(self.data_dir, file_path), dtype=dtypes)
|
||||
|
||||
|
||||
# Convert timestamp to datetime
|
||||
if 'Timestamp' in data.columns:
|
||||
data['Timestamp'] = pd.to_datetime(data['Timestamp'], unit='s')
|
||||
# Filter by date range
|
||||
data = data[(data['Timestamp'] >= start_date) & (data['Timestamp'] <= stop_date)]
|
||||
# Now convert column names to lowercase
|
||||
data.columns = data.columns.str.lower()
|
||||
if self.logging is not None:
|
||||
self.logging.info(f"Data loaded from {file_path} for date range {start_date} to {stop_date}")
|
||||
return data.set_index('timestamp')
|
||||
else: # Attempt to use the first column if 'Timestamp' is not present
|
||||
data.rename(columns={data.columns[0]: 'timestamp'}, inplace=True)
|
||||
data['timestamp'] = pd.to_datetime(data['timestamp'], unit='s')
|
||||
data = data[(data['timestamp'] >= start_date) & (data['timestamp'] <= stop_date)]
|
||||
data.columns = data.columns.str.lower() # Ensure all other columns are lower
|
||||
if self.logging is not None:
|
||||
self.logging.info(f"Data loaded from {file_path} (using first column as timestamp) for date range {start_date} to {stop_date}")
|
||||
return data.set_index('timestamp')
|
||||
except Exception as e:
|
||||
if self.logging is not None:
|
||||
self.logging.error(f"Error loading data from {file_path}: {e}")
|
||||
# Return an empty DataFrame with a DatetimeIndex
|
||||
return pd.DataFrame(index=pd.to_datetime([]))
|
||||
|
||||
def save_data(self, data: pd.DataFrame, file_path: str):
|
||||
"""Save processed data to a CSV file.
|
||||
If the DataFrame has a DatetimeIndex, it's converted to float Unix timestamps
|
||||
(seconds since epoch) before saving. The index is saved as a column named 'timestamp'.
|
||||
|
||||
Args:
|
||||
data (pd.DataFrame): data to save.
|
||||
file_path (str): path to the data file relative to the data_dir.
|
||||
"""
|
||||
data_to_save = data.copy()
|
||||
|
||||
if isinstance(data_to_save.index, pd.DatetimeIndex):
|
||||
# Convert DatetimeIndex to Unix timestamp (float seconds since epoch)
|
||||
# and make it a column named 'timestamp'.
|
||||
data_to_save['timestamp'] = data_to_save.index.astype('int64') / 1e9
|
||||
# Reset index so 'timestamp' column is saved and old DatetimeIndex is not saved as a column.
|
||||
# We want the 'timestamp' column to be the first one.
|
||||
data_to_save.reset_index(drop=True, inplace=True)
|
||||
# Ensure 'timestamp' is the first column if other columns exist
|
||||
if 'timestamp' in data_to_save.columns and len(data_to_save.columns) > 1:
|
||||
cols = ['timestamp'] + [col for col in data_to_save.columns if col != 'timestamp']
|
||||
data_to_save = data_to_save[cols]
|
||||
elif pd.api.types.is_numeric_dtype(data_to_save.index.dtype):
|
||||
# If index is already numeric (e.g. float Unix timestamps from a previous save/load cycle),
|
||||
# make it a column named 'timestamp'.
|
||||
data_to_save['timestamp'] = data_to_save.index
|
||||
data_to_save.reset_index(drop=True, inplace=True)
|
||||
if 'timestamp' in data_to_save.columns and len(data_to_save.columns) > 1:
|
||||
cols = ['timestamp'] + [col for col in data_to_save.columns if col != 'timestamp']
|
||||
data_to_save = data_to_save[cols]
|
||||
else:
|
||||
# For other index types, or if no index that we want to specifically handle,
|
||||
# save with the current index. pandas to_csv will handle it.
|
||||
# This branch might be removed if we strictly expect either DatetimeIndex or a numeric one from previous save.
|
||||
pass # data_to_save remains as is, to_csv will write its index if index=True
|
||||
|
||||
# Save to CSV, ensuring the 'timestamp' column (if created) is written, and not the DataFrame's active index.
|
||||
full_path = os.path.join(self.data_dir, file_path)
|
||||
data_to_save.to_csv(full_path, index=False) # index=False because timestamp is now a column
|
||||
if self.logging is not None:
|
||||
self.logging.info(f"Data saved to {full_path} with Unix timestamp column.")
|
||||
|
||||
|
||||
def format_row(self, row):
|
||||
"""Format a row for a combined results CSV file
|
||||
Args:
|
||||
row: row to format
|
||||
Returns:
|
||||
formatted row
|
||||
"""
|
||||
|
||||
return {
|
||||
"timeframe": row["timeframe"],
|
||||
"stop_loss_pct": f"{row['stop_loss_pct']*100:.2f}%",
|
||||
"n_trades": row["n_trades"],
|
||||
"n_stop_loss": row["n_stop_loss"],
|
||||
"win_rate": f"{row['win_rate']*100:.2f}%",
|
||||
"max_drawdown": f"{row['max_drawdown']*100:.2f}%",
|
||||
"avg_trade": f"{row['avg_trade']*100:.2f}%",
|
||||
"profit_ratio": f"{row['profit_ratio']*100:.2f}%",
|
||||
"final_usd": f"{row['final_usd']:.2f}",
|
||||
}
|
||||
|
||||
def write_results_chunk(self, filename, fieldnames, rows, write_header=False, initial_usd=None):
|
||||
"""Write a chunk of results to a CSV file
|
||||
Args:
|
||||
filename: filename to write to
|
||||
fieldnames: list of fieldnames
|
||||
rows: list of rows
|
||||
write_header: whether to write the header
|
||||
initial_usd: initial USD
|
||||
"""
|
||||
mode = 'w' if write_header else 'a'
|
||||
|
||||
with open(filename, mode, newline="") as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||
if write_header:
|
||||
csvfile.write(f"# initial_usd: {initial_usd}\n")
|
||||
writer.writeheader()
|
||||
|
||||
for row in rows:
|
||||
# Only keep keys that are in fieldnames
|
||||
filtered_row = {k: v for k, v in row.items() if k in fieldnames}
|
||||
writer.writerow(filtered_row)
|
||||
|
||||
def write_results_combined(self, filename, fieldnames, rows):
|
||||
"""Write a combined results to a CSV file
|
||||
Args:
|
||||
filename: filename to write to
|
||||
fieldnames: list of fieldnames
|
||||
rows: list of rows
|
||||
"""
|
||||
fname = os.path.join(self.results_dir, filename)
|
||||
with open(fname, "w", newline="") as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='\t')
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(self.format_row(row))
|
||||
if self.logging is not None:
|
||||
self.logging.info(f"Combined results written to {fname}")
|
||||
|
||||
def write_trades(self, all_trade_rows, trades_fieldnames):
|
||||
"""Write trades to a CSV file
|
||||
Args:
|
||||
all_trade_rows: list of trade rows
|
||||
trades_fieldnames: list of trade fieldnames
|
||||
logging: logging object
|
||||
"""
|
||||
|
||||
trades_by_combo = defaultdict(list)
|
||||
for trade in all_trade_rows:
|
||||
tf = trade.get("timeframe")
|
||||
sl = trade.get("stop_loss_pct")
|
||||
trades_by_combo[(tf, sl)].append(trade)
|
||||
|
||||
for (tf, sl), trades in trades_by_combo.items():
|
||||
sl_percent = int(round(sl * 100))
|
||||
trades_filename = os.path.join(self.results_dir, f"trades_{tf}_ST{sl_percent}pct.csv")
|
||||
with open(trades_filename, "w", newline="") as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=trades_fieldnames)
|
||||
writer.writeheader()
|
||||
for trade in trades:
|
||||
writer.writerow({k: trade.get(k, "") for k in trades_fieldnames})
|
||||
if self.logging is not None:
|
||||
self.logging.info(f"Trades written to {trades_filename}")
|
||||
19
cycles/utils/system.py
Normal file
19
cycles/utils/system.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import os
|
||||
import psutil
|
||||
|
||||
class SystemUtils:
|
||||
|
||||
def __init__(self, logging=None):
|
||||
self.logging = logging
|
||||
|
||||
def get_optimal_workers(self):
|
||||
"""Determine optimal number of worker processes based on system resources"""
|
||||
cpu_count = os.cpu_count() or 4
|
||||
memory_gb = psutil.virtual_memory().total / (1024**3)
|
||||
# Heuristic: Use 75% of cores, but cap based on available memory
|
||||
# Assume each worker needs ~2GB for large datasets
|
||||
workers_by_memory = max(1, int(memory_gb / 2))
|
||||
workers_by_cpu = max(1, int(cpu_count * 0.75))
|
||||
if self.logging is not None:
|
||||
self.logging.info(f"Using {min(workers_by_cpu, workers_by_memory)} workers for processing")
|
||||
return min(workers_by_cpu, workers_by_memory)
|
||||
Reference in New Issue
Block a user