import json import logging import typer from pathlib import Path from datetime import datetime, timezone import threading from db_interpreter import DBInterpreter from ohlc_processor import OHLCProcessor from strategy import Strategy from desktop_app import MainWindow import sys from PySide6.QtWidgets import QApplication from PySide6.QtCore import QTimer def main(instrument: str = typer.Argument(..., help="Instrument to backtest, e.g. BTC-USDT"), start_date: str = typer.Argument(..., help="Start date, e.g. 2025-07-01"), end_date: str = typer.Argument(..., help="End date, e.g. 2025-08-01"), timeframe_minutes: int = typer.Option(15, "--timeframe-minutes", help="Timeframe in minutes"), ui: bool = typer.Option(False, "--ui/--no-ui", help="Enable UI")): logging.basicConfig(filename="strategy.log", level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s") start_date = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) end_date = datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=timezone.utc) databases_path = Path("../data/OKX") if not databases_path.exists(): logging.error(f"Database path does not exist: {databases_path}") return db_paths = list(databases_path.glob(f"{instrument}*.db")) db_paths.sort() if not db_paths: logging.error(f"No database files found for instrument {instrument} in {databases_path}") return logging.info(f"Found {len(db_paths)} database files: {[p.name for p in db_paths]}") processor = OHLCProcessor(aggregate_window_seconds=60 * timeframe_minutes) strategy = Strategy( lookback=30, min_volume_factor=1.0, confirm_break_signal_high=True, # or False to disable debug=True, debug_level=2, # 1=key events, 2=per-bar detail debug_every_n_bars=100 ) def process_data(): db_to_process = [] for db_path in db_paths: db_name_parts = db_path.name.split(".")[0].split("-") if len(db_name_parts) < 5: logging.warning(f"Unexpected filename format: {db_path.name}") continue db_name = db_name_parts[2:5] db_date = datetime.strptime("".join(db_name), "%y%m%d").replace(tzinfo=timezone.utc) if db_date < start_date or db_date >= end_date: continue db_to_process.append(db_path) for i, db_path in enumerate(db_to_process): print(f"{i}/{len(db_to_process)}") db_interpreter = DBInterpreter(db_path) for orderbook_update, trades in db_interpreter.stream(): processor.update_orderbook(orderbook_update) processor.process_trades(trades) strategy.process(processor) processor.flush() strategy.process(processor) try: strategy.on_finish(processor) # optional: flat at last close except Exception: pass print(json.dumps(strategy.get_stats(), indent=2)) if ui: data_thread = threading.Thread(target=process_data, daemon=True) data_thread.start() app = QApplication(sys.argv) desktop_app = MainWindow() desktop_app.show() timer = QTimer() timer.timeout.connect(lambda: desktop_app.update_data(processor, 30)) timer.start(1000) app.exec() else: process_data() if __name__ == "__main__": typer.run(main)