Implement Google Sheets integration in main.py for batch updates of backtest results

- Added GSheetBatchPusher class to handle background updates to Google Sheets.
- Refactored write_results_per_combination function to write results directly to Google Sheets instead of CSV files.
- Updated process_timeframe function to handle single stop loss percentages.
- Introduced a global queue for batching results and trades for efficient updates.
- Enhanced error handling for Google Sheets API quota limits.
- Adjusted main execution flow to start the batch pusher and ensure all results are pushed after processing.
This commit is contained in:
Simon Moisy 2025-05-19 02:02:03 +08:00
parent f7f0fc6dd5
commit 170751db0e
2 changed files with 150 additions and 50 deletions

200
main.py
View File

@ -1,16 +1,19 @@
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from trend_detector_macd import TrendDetectorMACD
from trend_detector_simple import TrendDetectorSimple from trend_detector_simple import TrendDetectorSimple
from cycle_detector import CycleDetector
import csv import csv
import logging import logging
import concurrent.futures import concurrent.futures
import os import os
import psutil import psutil
import datetime import datetime
from charts import BacktestCharts import gspread
from collections import Counter from google.oauth2.service_account import Credentials
from collections import defaultdict
import threading
import queue
import time
import math
# Set up logging # Set up logging
logging.basicConfig( logging.basicConfig(
@ -22,6 +25,43 @@ logging.basicConfig(
] ]
) )
# Global queue for batching Google Sheets updates
results_queue = queue.Queue()
# Background thread function to push updates every minute
class GSheetBatchPusher(threading.Thread):
def __init__(self, queue, timestamp, spreadsheet_name, interval=60):
super().__init__(daemon=True)
self.queue = queue
self.timestamp = timestamp
self.spreadsheet_name = spreadsheet_name
self.interval = interval
self._stop_event = threading.Event()
def run(self):
while not self._stop_event.is_set():
self.push_all()
time.sleep(self.interval)
# Final push on stop
self.push_all()
def stop(self):
self._stop_event.set()
def push_all(self):
batch_results = []
batch_trades = []
while True:
try:
results, trades = self.queue.get_nowait()
batch_results.extend(results)
batch_trades.extend(trades)
except queue.Empty:
break
if batch_results or batch_trades:
write_results_per_combination_gsheet(batch_results, batch_trades, self.timestamp, self.spreadsheet_name)
def get_optimal_workers(): def get_optimal_workers():
"""Determine optimal number of worker processes based on system resources""" """Determine optimal number of worker processes based on system resources"""
cpu_count = os.cpu_count() or 4 cpu_count = os.cpu_count() or 4
@ -127,8 +167,8 @@ def process_timeframe_data(min1_df, df, stop_loss_pcts, rule_name, initial_usd,
return results_rows, trade_rows return results_rows, trade_rows
def process_timeframe(timeframe_info, debug=False): def process_timeframe(timeframe_info, debug=False):
"""Process an entire timeframe (no monthly split)""" """Process a single (timeframe, stop_loss_pct) combination (no monthly split)"""
rule, data_1min, stop_loss_pcts, initial_usd = timeframe_info rule, data_1min, stop_loss_pct, initial_usd = timeframe_info
if rule == "1T": if rule == "1T":
df = data_1min.copy() df = data_1min.copy()
else: else:
@ -140,13 +180,8 @@ def process_timeframe(timeframe_info, debug=False):
'volume': 'sum' 'volume': 'sum'
}).dropna() }).dropna()
df = df.reset_index() df = df.reset_index()
# Only process one stop loss
# --- Add this block to check alignment --- results_rows, all_trade_rows = process_timeframe_data(data_1min, df, [stop_loss_pct], rule, initial_usd, debug=debug)
print("1-min data range:", data_1min.index.min(), "to", data_1min.index.max())
print(f"{rule} data range:", df['timestamp'].min(), "to", df['timestamp'].max())
# -----------------------------------------
results_rows, all_trade_rows = process_timeframe_data(data_1min, df, stop_loss_pcts, rule, initial_usd, debug=debug)
return results_rows, all_trade_rows return results_rows, all_trade_rows
def write_results_chunk(filename, fieldnames, rows, write_header=False): def write_results_chunk(filename, fieldnames, rows, write_header=False):
@ -200,49 +235,101 @@ def aggregate_results(all_rows):
}) })
return summary_rows return summary_rows
def write_results_per_combination(results_rows, trade_rows, timestamp): def write_results_per_combination_gsheet(results_rows, trade_rows, timestamp, spreadsheet_name="GlimBit Backtest Results"):
results_dir = "results" scopes = [
os.makedirs(results_dir, exist_ok=True) "https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive"
]
creds = Credentials.from_service_account_file('credentials/service_account.json', scopes=scopes)
gc = gspread.authorize(creds)
sh = gc.open(spreadsheet_name)
try:
worksheet = sh.worksheet("Results")
except gspread.exceptions.WorksheetNotFound:
worksheet = sh.add_worksheet(title="Results", rows="1000", cols="20")
# Clear the worksheet before writing new results
worksheet.clear()
# Updated fieldnames to match your data rows
fieldnames = [
"timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate",
"max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd"
]
def to_native(val):
if isinstance(val, (np.generic, np.ndarray)):
val = val.item()
if hasattr(val, 'isoformat'):
return val.isoformat()
# Handle inf, -inf, nan
if isinstance(val, float):
if math.isinf(val):
return "" if val > 0 else "-∞"
if math.isnan(val):
return ""
return val
# Write header if sheet is empty
if len(worksheet.get_all_values()) == 0:
worksheet.append_row(fieldnames)
for row in results_rows: for row in results_rows:
timeframe = row["timeframe"] values = [to_native(row.get(field, "")) for field in fieldnames]
stop_loss_pct = row["stop_loss_pct"] worksheet.append_row(values)
filename = os.path.join(
results_dir, trades_fieldnames = [
f"{timestamp}_backtest_{timeframe}_{stop_loss_pct}.csv" "entry_time", "exit_time", "entry_price", "exit_price", "profit_pct", "type"
) ]
fieldnames = ["timeframe", "stop_loss_pct", "n_trades", "n_stop_loss", "win_rate", "max_drawdown", "avg_trade", "profit_ratio", "initial_usd", "final_usd"] trades_by_combo = defaultdict(list)
write_results_chunk(filename, fieldnames, [row], write_header=not os.path.exists(filename))
for trade in trade_rows: for trade in trade_rows:
timeframe = trade["timeframe"] tf = trade.get("timeframe")
stop_loss_pct = trade["stop_loss_pct"] sl = trade.get("stop_loss_pct")
trades_filename = os.path.join( trades_by_combo[(tf, sl)].append(trade)
results_dir,
f"{timestamp}_trades_{timeframe}_{stop_loss_pct}.csv" for (tf, sl), trades in trades_by_combo.items():
) sl_percent = int(round(sl * 100))
trades_fieldnames = [ sheet_name = f"Trades_{tf}_ST{sl_percent}%"
"timeframe", "stop_loss_pct", "entry_time", "exit_time",
"entry_price", "exit_price", "profit_pct", "type" try:
] trades_ws = sh.worksheet(sheet_name)
write_results_chunk(trades_filename, trades_fieldnames, [trade], write_header=not os.path.exists(trades_filename)) except gspread.exceptions.WorksheetNotFound:
trades_ws = sh.add_worksheet(title=sheet_name, rows="1000", cols="20")
# Clear the trades worksheet before writing new trades
trades_ws.clear()
if len(trades_ws.get_all_values()) == 0:
trades_ws.append_row(trades_fieldnames)
for trade in trades:
trade_row = [to_native(trade.get(field, "")) for field in trades_fieldnames]
try:
trades_ws.append_row(trade_row)
except gspread.exceptions.APIError as e:
if '429' in str(e):
logging.warning(f"Google Sheets API quota exceeded (429). Please wait one minute. Will retry on next batch push. Sheet: {sheet_name}")
# Re-queue the failed batch for retry
results_queue.put((results_rows, trade_rows))
return # Stop pushing for this batch, will retry next interval
else:
raise
if __name__ == "__main__": if __name__ == "__main__":
# Configuration # Configuration
start_date = '2020-01-01' start_date = '2020-01-01'
stop_date = '2025-05-15' stop_date = '2025-05-15'
initial_usd = 10000 initial_usd = 10000
debug = False # Set to True to enable debug prints debug = False
# --- NEW: Prepare results folder and timestamp ---
results_dir = "results" results_dir = "results"
os.makedirs(results_dir, exist_ok=True) os.makedirs(results_dir, exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M") timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M")
# --- END NEW ---
# Replace the dictionary with a list of timeframe names timeframes = ["15min", "30min", "1h", "6h", "1D"]
timeframes = ["15min", "1h", "6h", "1D"] stop_loss_pcts = [0.02, 0.03, 0.05]
# timeframes = ["6h"]
stop_loss_pcts = [0.01, 0.02, 0.03, 0.05, 0.07, 0.10]
# stop_loss_pcts = [0.01]
# Load data once # Load data once
data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date) data_1min = load_data('./data/btcusd_1-min_data.csv', start_date, stop_date)
@ -250,26 +337,39 @@ if __name__ == "__main__":
# Prepare tasks # Prepare tasks
tasks = [ tasks = [
(name, data_1min, stop_loss_pcts, initial_usd) (name, data_1min, stop_loss_pct, initial_usd)
for name in timeframes for name in timeframes
for stop_loss_pct in stop_loss_pcts
] ]
# Determine optimal worker count # Determine optimal worker count
workers = get_optimal_workers() workers = get_optimal_workers()
logging.info(f"Using {workers} workers for processing") logging.info(f"Using {workers} workers for processing")
# Start the background batch pusher
spreadsheet_name = "GlimBit Backtest Results"
batch_pusher = GSheetBatchPusher(results_queue, timestamp, spreadsheet_name, interval=65)
batch_pusher.start()
# Process tasks with optimized concurrency # Process tasks with optimized concurrency
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(process_timeframe, task, debug): task[1] for task in tasks} futures = {executor.submit(process_timeframe, task, debug): task for task in tasks}
all_results_rows = [] all_results_rows = []
for future in concurrent.futures.as_completed(futures): for future in concurrent.futures.as_completed(futures):
#try:
results, trades = future.result() results, trades = future.result()
if results or trades: if results or trades:
all_results_rows.extend(results) all_results_rows.extend(results)
write_results_per_combination(results, trades, timestamp) results_queue.put((results, trades)) # Enqueue for batch update
#except Exception as exc:
# logging.error(f"generated an exception: {exc}") # After all tasks, flush any remaining updates
batch_pusher.stop()
batch_pusher.join()
# Ensure all batches are pushed, even after 429 errors
while not results_queue.empty():
logging.info("Waiting for Google Sheets quota to reset. Retrying batch push in 60 seconds...")
time.sleep(65)
batch_pusher.push_all()
# Write all results to a single CSV file # Write all results to a single CSV file
combined_filename = os.path.join(results_dir, f"{timestamp}_backtest_combined.csv") combined_filename = os.path.join(results_dir, f"{timestamp}_backtest_combined.csv")

Binary file not shown.