86 lines
2.3 KiB
Python
86 lines
2.3 KiB
Python
"""Level parsing utilities for orderbook data."""
|
|
|
|
import json
|
|
import ast
|
|
import logging
|
|
from typing import List, Any, Tuple
|
|
|
|
|
|
def normalize_levels(levels: Any) -> List[List[float]]:
|
|
"""
|
|
Convert string-encoded levels into [[price, size], ...] floats.
|
|
|
|
Filters out zero/negative sizes. Supports JSON and Python literal formats.
|
|
"""
|
|
if not levels or levels == '[]':
|
|
return []
|
|
|
|
parsed = _parse_string_to_list(levels)
|
|
if not parsed:
|
|
return []
|
|
|
|
pairs: List[List[float]] = []
|
|
for item in parsed:
|
|
price, size = _extract_price_size(item)
|
|
if price is None or size is None:
|
|
continue
|
|
try:
|
|
p, s = float(price), float(size)
|
|
if s > 0:
|
|
pairs.append([p, s])
|
|
except Exception:
|
|
continue
|
|
|
|
if not pairs:
|
|
logging.debug("normalize_levels: no valid pairs parsed from input")
|
|
return pairs
|
|
|
|
|
|
def parse_levels_including_zeros(levels: Any) -> List[Tuple[float, float]]:
|
|
"""
|
|
Parse levels into (price, size) tuples including zero sizes for deletions.
|
|
|
|
Similar to normalize_levels but preserves zero sizes (for orderbook deletions).
|
|
"""
|
|
if not levels or levels == '[]':
|
|
return []
|
|
|
|
parsed = _parse_string_to_list(levels)
|
|
if not parsed:
|
|
return []
|
|
|
|
results: List[Tuple[float, float]] = []
|
|
for item in parsed:
|
|
price, size = _extract_price_size(item)
|
|
if price is None or size is None:
|
|
continue
|
|
try:
|
|
p, s = float(price), float(size)
|
|
if s >= 0:
|
|
results.append((p, s))
|
|
except Exception:
|
|
continue
|
|
|
|
return results
|
|
|
|
|
|
def _parse_string_to_list(levels: Any) -> List[Any]:
|
|
"""Parse string levels to list, trying JSON first then literal_eval."""
|
|
try:
|
|
parsed = json.loads(levels)
|
|
except Exception:
|
|
try:
|
|
parsed = ast.literal_eval(levels)
|
|
except Exception:
|
|
return []
|
|
return parsed if isinstance(parsed, list) else []
|
|
|
|
|
|
def _extract_price_size(item: Any) -> Tuple[Any, Any]:
|
|
"""Extract price and size from dict or list/tuple format."""
|
|
if isinstance(item, dict):
|
|
return item.get("price", item.get("p")), item.get("size", item.get("s"))
|
|
elif isinstance(item, (list, tuple)) and len(item) >= 2:
|
|
return item[0], item[1]
|
|
return None, None
|