diff --git a/main.py b/main.py index 35fa49b..81ef837 100644 --- a/main.py +++ b/main.py @@ -1,16 +1,19 @@ import pandas as pd import numpy as np -from trend_detector_macd import TrendDetectorMACD from trend_detector_simple import TrendDetectorSimple -from cycle_detector import CycleDetector import csv import logging import concurrent.futures import os import psutil import datetime -from charts import BacktestCharts -from collections import Counter +import gspread +from google.oauth2.service_account import Credentials +from collections import defaultdict +import threading +import queue +import time +import math # Set up logging logging.basicConfig( @@ -22,6 +25,43 @@ logging.basicConfig( ] ) +# Global queue for batching Google Sheets updates +results_queue = queue.Queue() + +# Background thread function to push updates every minute +class GSheetBatchPusher(threading.Thread): + def __init__(self, queue, timestamp, spreadsheet_name, interval=60): + super().__init__(daemon=True) + self.queue = queue + self.timestamp = timestamp + self.spreadsheet_name = spreadsheet_name + self.interval = interval + self._stop_event = threading.Event() + + def run(self): + while not self._stop_event.is_set(): + self.push_all() + time.sleep(self.interval) + # Final push on stop + self.push_all() + + def stop(self): + self._stop_event.set() + + def push_all(self): + batch_results = [] + batch_trades = [] + while True: + try: + results, trades = self.queue.get_nowait() + batch_results.extend(results) + batch_trades.extend(trades) + except queue.Empty: + break + + if batch_results or batch_trades: + write_results_per_combination_gsheet(batch_results, batch_trades, self.timestamp, self.spreadsheet_name) + def get_optimal_workers(): """Determine optimal number of worker processes based on system resources""" cpu_count = os.cpu_count() or 4 @@ -127,8 +167,8 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd, return results_rows, trade_rows def process_timeframe(timeframe_info, debug=False): - """Process an entire timeframe (no monthly split)""" - rule, data_1min, stop_loss_pcts, initial_usd = timeframe_info + """Process a single (timeframe, stop_loss_pct) combination (no monthly split)""" + rule, data_1min, stop_loss_pct, initial_usd = timeframe_info if rule == "1T": df = data_1min.copy() else: @@ -140,13 +180,8 @@ def process_timeframe(timeframe_info, debug=False): 'volume': 'sum' }).dropna() df = df.reset_index() - - # --- Add this block to check alignment --- - print("1-min data range:", data_1min.index.min(), "to", data_1min.index.max()) - print(f"{rule} data range:", df['timestamp'].min(), "to", df['timestamp'].max()) - # ----------------------------------------- - - results_rows, all_trade_rows = process_timeframe_data(data_1min, df, stop_loss_pcts, rule, initial_usd, debug=debug) + # Only process one stop loss + results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug) return results_rows, all_trade_rows def write_results_chunk(filename, fieldnames, rows, write_header=False): @@ -200,49 +235,101 @@ def aggregate_results(all_rows): }) return summary_rows -def write_results_per_combination(results_rows, trade_rows, timestamp): - results_dir = "results" - os.makedirs(results_dir, exist_ok=True) +def write_results_per_combination_gsheet(results_rows, trade_rows, timestamp, spreadsheet_name="GlimBit Backtest Results"): + scopes = [ + "https://www.googleapis.com/auth/spreadsheets", + "https://www.googleapis.com/auth/drive" + ] + creds = Credentials.from_service_account_file('credentials/service_account.json', scopes=scopes) + gc = gspread.authorize(creds) + sh = gc.open(spreadsheet_name) + + try: + worksheet = sh.worksheet("Results") + except gspread.exceptions.WorksheetNotFound: + worksheet = sh.add_worksheet(title="Results", rows="1000", cols="20") + + # Clear the worksheet before writing new results + worksheet.clear() + + # Updated fieldnames to match your data rows + fieldnames = [ + "timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", + "max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd" + ] + + def to_native(val): + if isinstance(val, (np.generic, np.ndarray)): + val = val.item() + if hasattr(val, 'isoformat'): + return val.isoformat() + # Handle inf, -inf, nan + if isinstance(val, float): + if math.isinf(val): + return "∞" if val > 0 else "-∞" + if math.isnan(val): + return "" + return val + + # Write header if sheet is empty + if len(worksheet.get_all_values()) == 0: + worksheet.append_row(fieldnames) + for row in results_rows: - timeframe = row["timeframe"] - stop_loss_pct = row["stop_loss_pct"] - filename = os.path.join( - results_dir, - f"{timestamp}_backtest_{timeframe}_{stop_loss_pct}.csv" - ) - fieldnames = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd"] - write_results_chunk(filename, fieldnames, [row], write_header=not os.path.exists(filename)) + values = [to_native(row.get(field, "")) for field in fieldnames] + worksheet.append_row(values) + + trades_fieldnames = [ + "entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type" + ] + trades_by_combo = defaultdict(list) + for trade in trade_rows: - timeframe = trade["timeframe"] - stop_loss_pct = trade["stop_loss_pct"] - trades_filename = os.path.join( - results_dir, - f"{timestamp}_trades_{timeframe}_{stop_loss_pct}.csv" - ) - trades_fieldnames = [ - "timeframe", "stop_loss_pct", "entry_time", "exit_time", - "entry_price", "exit_price", "profit_pct", "type" - ] - write_results_chunk(trades_filename, trades_fieldnames, [trade], write_header=not os.path.exists(trades_filename)) + tf = trade.get("timeframe") + sl = trade.get("stop_loss_pct") + trades_by_combo[(tf, sl)].append(trade) + + for (tf, sl), trades in trades_by_combo.items(): + sl_percent = int(round(sl * 100)) + sheet_name = f"Trades_{tf}_ST{sl_percent}%" + + try: + trades_ws = sh.worksheet(sheet_name) + except gspread.exceptions.WorksheetNotFound: + trades_ws = sh.add_worksheet(title=sheet_name, rows="1000", cols="20") + + # Clear the trades worksheet before writing new trades + trades_ws.clear() + + if len(trades_ws.get_all_values()) == 0: + trades_ws.append_row(trades_fieldnames) + + for trade in trades: + trade_row = [to_native(trade.get(field, "")) for field in trades_fieldnames] + try: + trades_ws.append_row(trade_row) + except gspread.exceptions.APIError as e: + if '429' in str(e): + logging.warning(f"Google Sheets API quota exceeded (429). Please wait one minute. Will retry on next batch push. Sheet: {sheet_name}") + # Re-queue the failed batch for retry + results_queue.put((results_rows, trade_rows)) + return # Stop pushing for this batch, will retry next interval + else: + raise if __name__ == "__main__": # Configuration start_date = '2020-01-01' stop_date = '2025-05-15' initial_usd = 10000 - debug = False # Set to True to enable debug prints - # --- NEW: Prepare results folder and timestamp --- + debug = False + results_dir = "results" os.makedirs(results_dir, exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M") - # --- END NEW --- - # Replace the dictionary with a list of timeframe names - timeframes = ["15min", "1h", "6h", "1D"] - # timeframes = ["6h"] - - stop_loss_pcts = [0.01, 0.02, 0.03, 0.05, 0.07, 0.10] - # stop_loss_pcts = [0.01] + timeframes = ["15min", "30min", "1h", "6h", "1D"] + stop_loss_pcts = [0.02, 0.03, 0.05] # Load data once data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date) @@ -250,26 +337,39 @@ if __name__ == "__main__": # Prepare tasks tasks = [ - (name, data_1min, stop_loss_pcts, initial_usd) + (name, data_1min, stop_loss_pct, initial_usd) for name in timeframes + for stop_loss_pct in stop_loss_pcts ] # Determine optimal worker count workers = get_optimal_workers() logging.info(f"Using {workers} workers for processing") + + # Start the background batch pusher + spreadsheet_name = "GlimBit Backtest Results" + batch_pusher = GSheetBatchPusher(results_queue, timestamp, spreadsheet_name, interval=65) + batch_pusher.start() # Process tasks with optimized concurrency with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: - futures = {executor.submit(process_timeframe, task, debug): task[1] for task in tasks} + futures = {executor.submit(process_timeframe, task, debug): task for task in tasks} all_results_rows = [] for future in concurrent.futures.as_completed(futures): - #try: results, trades = future.result() if results or trades: all_results_rows.extend(results) - write_results_per_combination(results, trades, timestamp) - #except Exception as exc: - # logging.error(f"generated an exception: {exc}") + results_queue.put((results, trades)) # Enqueue for batch update + + # After all tasks, flush any remaining updates + batch_pusher.stop() + batch_pusher.join() + + # Ensure all batches are pushed, even after 429 errors + while not results_queue.empty(): + logging.info("Waiting for Google Sheets quota to reset. Retrying batch push in 60 seconds...") + time.sleep(65) + batch_pusher.push_all() # Write all results to a single CSV file combined_filename = os.path.join(results_dir, f"{timestamp}_backtest_combined.csv") diff --git a/requirements.txt b/requirements.txt index 35ec483..b7f6690 100644 Binary files a/requirements.txt and b/requirements.txt differ