-
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathutils.py
More file actions
128 lines (107 loc) · 5.17 KB
/
utils.py
File metadata and controls
128 lines (107 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
from __future__ import annotations
import json
import math
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import requests
def retry_request(func, *args, retries: int = 3, backoff: float = 2.0, **kwargs):
"""Retry a request function with exponential backoff on exceptions and 5xx/429 errors."""
last_error = None
retries = max(1, retries) # Ensure at least one attempt
for attempt in range(retries):
try:
response = func(*args, **kwargs)
# Retry on server errors (5xx) and rate limits (429)
if response.status_code >= 500 or response.status_code == 429:
last_error = requests.RequestException(f"HTTP {response.status_code}: {response.text[:200]}")
if attempt < retries - 1:
wait = backoff * (2**attempt)
print(f"Retry {attempt + 1}/{retries} after {wait}s: HTTP {response.status_code}")
time.sleep(wait)
continue
return response
except (requests.RequestException, requests.Timeout) as e:
last_error = e
if attempt < retries - 1:
wait = backoff * (2**attempt)
print(f"Retry {attempt + 1}/{retries} after {wait}s: {e}")
time.sleep(wait)
raise last_error
def fetch_json(url: str, headers: dict | None = None, timeout: int = 60) -> dict:
"""Fetch JSON from URL with error handling."""
r = requests.get(url, headers=headers, timeout=timeout)
if r.status_code != 200:
sys.exit(f"HTTP {r.status_code}: {r.text[:200]}")
return r.json()
def post_json(url: str, headers: dict, payload: dict, timeout: int = 60, retries: int = 3) -> dict:
"""POST JSON to URL with retry and error handling."""
last_error = None
for attempt in range(retries):
try:
r = requests.post(url, headers=headers, json=payload, timeout=timeout)
if r.status_code == 200:
return r.json()
# Retry on server errors (5xx), fail immediately on client errors (4xx)
if r.status_code >= 500:
last_error = f"HTTP {r.status_code}: {r.text[:200]}"
if attempt < retries - 1:
wait = 2.0 * (2**attempt)
print(f"Retry {attempt + 1}/{retries} after {wait}s: {last_error}")
time.sleep(wait)
continue
sys.exit(f"HTTP {r.status_code}: {r.text[:200]}")
except (requests.RequestException, requests.Timeout) as e:
last_error = str(e)
if attempt < retries - 1:
wait = 2.0 * (2**attempt)
print(f"Retry {attempt + 1}/{retries} after {wait}s: {e}")
time.sleep(wait)
sys.exit(f"Failed after {retries} retries: {last_error}")
def read_json(path: Path) -> dict:
"""Read JSON file, return empty dict if not found or invalid."""
try:
return json.loads(path.read_text(encoding="utf-8")) if path.exists() else {}
except (json.JSONDecodeError, OSError):
return {}
def _sanitize_floats(obj):
"""Recursively replace NaN/Inf floats with 0 to ensure valid JSON."""
if isinstance(obj, float) and not math.isfinite(obj):
return 0
if isinstance(obj, dict):
return {k: _sanitize_floats(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_sanitize_floats(v) for v in obj]
return obj
def write_json(path: Path, data: dict) -> None:
"""Write compact JSON with trailing newline. Sanitizes NaN/Inf to 0 if present."""
path.parent.mkdir(parents=True, exist_ok=True)
try:
text = json.dumps(data, ensure_ascii=False, separators=(",", ":"), allow_nan=False)
except ValueError:
print(f"Warning: Sanitizing NaN/Inf values before writing {path.name}")
text = json.dumps(_sanitize_floats(data), ensure_ascii=False, separators=(",", ":"), allow_nan=False)
path.write_text(text + "\n", encoding="utf-8")
def is_valid(value, allow_zero: bool = True) -> bool:
"""Check if a numeric value is valid (finite, not NaN/None/negative/non-numeric)."""
if not isinstance(value, (int, float)) or isinstance(value, bool):
return False
if isinstance(value, float) and not math.isfinite(value):
return False
return value >= 0 if allow_zero else value > 0
def safe_merge(new_data: dict, old_data: dict, keys: tuple | list, label: str = "", allow_zero: bool = True) -> None:
"""Merge numeric fields in-place: keep new value if valid, fall back to old, default to 0."""
for key in keys:
if not is_valid(new_data.get(key), allow_zero):
old_val = old_data.get(key)
prefix = f"{label}." if label else ""
if is_valid(old_val, allow_zero):
print(f"Warning: Keeping existing {prefix}{key}: {old_val} (new: {new_data.get(key)})")
new_data[key] = old_val
else:
new_data[key] = 0
def get_timestamp() -> str:
"""Return ISO 8601 timestamp with Z suffix."""
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")