Skip to content

Commit cf6995e

Browse files
committed
enh: parallelize fetching data from Sentry
1 parent 4fdcaec commit cf6995e

File tree

2 files changed

+202
-74
lines changed

2 files changed

+202
-74
lines changed

src/api.py

Lines changed: 135 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
# https://www.nipreps.org/community/licensing/
2222
#
2323
"""Fetching fMRIPrep statistics from Sentry."""
24+
2425
import os
2526
import sys
2627
from time import sleep
2728
import requests
2829
import datetime
2930
from pymongo import MongoClient
31+
from concurrent.futures import ThreadPoolExecutor, as_completed
3032

33+
DEFAULT_MAX_ERRORS = 5
3134
ISSUES = {
3235
"success": "758615130",
3336
"started": "540334560",
@@ -36,92 +39,161 @@
3639
"sigkill": "854282951",
3740
}
3841

39-
epoch = datetime.datetime.utcfromtimestamp(0)
40-
4142

4243
def filter_new(event, collection):
4344
cached = collection.count_documents(filter={"id": event["id"]})
4445

4546
if not cached:
46-
event.update({
47-
f"{tag['key'].replace('.', '_')}": tag["value"]
48-
for tag in event.pop("tags")
49-
})
47+
event.update(
48+
{
49+
f"{tag['key'].replace('.', '_')}": tag["value"]
50+
for tag in event.pop("tags")
51+
}
52+
)
5053
event.pop("environment", None)
5154
return event
5255

5356

54-
def get_events(event_name, token=None, limit=None, max_errors=10, cached_limit=10):
55-
"""Retrieve events."""
57+
def _to_sentry_time(dt):
58+
# drop microseconds, force UTC 'Z' suffix
59+
return dt.replace(microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ")
5660

57-
token = token or os.getenv("SENTRY_TOKEN", None)
5861

59-
if token is None:
60-
raise RuntimeError("Token must be provided")
61-
62-
issue_id = ISSUES[event_name]
62+
def fetch_window(
63+
event_name, token, window_start, window_end, max_errors=None, cached_limit=None
64+
):
65+
"""Fetch one time-window's worth of pages, sequentially paging until done."""
6366

64-
# Initiate session
65-
db_client = MongoClient()
66-
db = db_client.fmriprep_stats
67-
url = f"https://sentry.io/api/0/issues/{issue_id}/events/?query="
68-
counter = 0
69-
errors = []
67+
max_errors = max_errors or DEFAULT_MAX_ERRORS
7068

69+
issue_id = ISSUES[event_name]
70+
start_iso = _to_sentry_time(window_start)
71+
end_iso = _to_sentry_time(window_end)
72+
73+
# build the base URL with your date filters
74+
base_url = (
75+
f"https://sentry.io/api/0/issues/{issue_id}/events/"
76+
f"?start={start_iso}&end={end_iso}"
77+
)
78+
79+
db = MongoClient().fmriprep_stats[event_name]
80+
cursor = None
81+
errors = 0
7182
consecutive_cached = 0
72-
while limit is None or counter < limit:
73-
r = requests.get(url, headers={"Authorization": "Bearer %s" % token})
74-
75-
if not r.ok:
76-
print("E", end="", flush=True)
77-
errors.append(f"{r.status_code}")
78-
if len(errors) >= max_errors:
79-
print(f"Too many errors: {', '.join(errors)}", file=sys.stderr)
80-
exit(1)
81-
82-
sleep(len(errors) + 1)
83+
url = base_url
84+
85+
new_records = 0
86+
total_records = 0
87+
88+
headers = {"Authorization": f"Bearer {token}"}
89+
90+
while True:
91+
if cursor:
92+
url = base_url + f"&cursor={cursor}"
93+
94+
r = requests.get(url, headers=headers)
95+
if r.status_code == 429:
96+
# parse Retry‑After or use exponential backoff
97+
wait = int(r.headers.get("Retry-After", 2**errors))
98+
sleep(wait + 0.1)
99+
errors += 1
100+
if errors > max_errors:
101+
print("")
102+
print(f"[{event_name}][{window_start}] too many 429s; abort")
103+
break
104+
continue
105+
elif not r.ok:
106+
errors += 1
107+
if errors > max_errors:
108+
print("")
109+
print(f"[{event_name}][{window_start}] errors: {r.status_code}; abort")
110+
break
111+
sleep(errors) # simple backoff
83112
continue
84113

85-
events_json = [
86-
event for event in r.json()
87-
if {'key': 'environment', 'value': 'prod'} in event["tags"]
114+
errors = 0
115+
events = [
116+
e for e in r.json() if {"key": "environment", "value": "prod"} in e["tags"]
88117
]
118+
new_docs = [filter_new(e, db) for e in events]
119+
new_docs = [d for d in new_docs if d]
89120

90-
new_documents = [
91-
document
92-
for event in events_json
93-
if (document := filter_new(event, db[event_name])) is not None
121+
new_records += len(new_docs)
122+
total_records += len(events)
94123

95-
]
96-
97-
if new_documents:
98-
print(".", end="", flush=True)
99-
db[event_name].insert_many(new_documents)
124+
if new_docs:
125+
db.insert_many(new_docs)
100126
consecutive_cached = 0
101127
else:
102-
print("c", end="", flush=True)
103128
consecutive_cached += 1
129+
if cached_limit and consecutive_cached >= cached_limit:
130+
break
104131

105-
if consecutive_cached >= cached_limit:
132+
# look at Link header for next cursor or end
133+
link = r.headers.get("Link", "")
134+
if 'results="false"' in link:
106135
break
107-
108-
cursor = (
109-
r.headers["Link"].split(",")[1].split(";")[3].split("=")[1].replace('"', "")
110-
)
111-
results_str = (
112-
r.headers["Link"].split(",")[1].split(";")[2].split("=")[1].replace('"', "")
113-
)
114-
115-
if results_str.strip() != "true":
136+
# naive parse of cursor—tweak to your needs
137+
try:
138+
cursor = link.split("cursor=")[-1].split('"')[1]
139+
except Exception:
116140
break
117141

118-
new_url = (
119-
f"https://sentry.io/api/0/issues/{issue_id}/events/?cursor={cursor}&query="
120-
)
121-
url = new_url
122-
counter += 1
123-
124-
print("")
125-
126-
if errors:
127-
print(f"Encountered {len(errors)} error(s): {', '.join(errors)}.")
142+
return new_records, total_records
143+
144+
145+
def parallel_fetch(
146+
event_name,
147+
token,
148+
since,
149+
until,
150+
max_workers=None,
151+
days_per_chunk=1,
152+
cached_limit=None,
153+
max_errors=None,
154+
):
155+
"""Scatter a series of single-day windows in parallel."""
156+
157+
from tqdm import tqdm
158+
159+
if not max_workers or max_workers < 1:
160+
max_workers = os.cpu_count()
161+
162+
windows = []
163+
cur = since
164+
while cur < until:
165+
nxt = min(cur + datetime.timedelta(days=days_per_chunk), until)
166+
windows.append((cur, nxt))
167+
cur = nxt
168+
169+
kwargs = {
170+
"cached_limit": cached_limit,
171+
"max_errors": max_errors,
172+
}
173+
174+
total_chunks = len(windows)
175+
sum_new = 0
176+
sum_total = 0
177+
with ThreadPoolExecutor(max_workers=max_workers) as exe:
178+
futures = [
179+
exe.submit(fetch_window, event_name, token, w0, w1, **kwargs)
180+
for (w0, w1) in windows
181+
]
182+
for fut in tqdm(
183+
as_completed(futures),
184+
total=total_chunks,
185+
desc=f"Fetching {event_name}",
186+
unit=f"{days_per_chunk} day(s)",
187+
):
188+
try:
189+
new_rec, tot_rec = fut.result()
190+
sum_new += new_rec
191+
sum_total += tot_rec
192+
except Exception:
193+
print("chunk raised", sys.exc_info()[0])
194+
195+
# final summary
196+
print(
197+
f"[{event_name}] Finished {total_chunks} chunk(s): "
198+
f"{sum_new} new records inserted, {sum_total} total events fetched."
199+
)

src/run.py

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,16 @@
2121
# https://www.nipreps.org/community/licensing/
2222
#
2323
"""CLI."""
24-
from datetime import datetime
24+
25+
import os
26+
import sys
27+
from datetime import datetime, timezone, timedelta
2528

2629
import click
27-
from api import get_events, ISSUES
30+
from api import parallel_fetch, ISSUES, DEFAULT_MAX_ERRORS
31+
32+
DEFAULT_DAYS_WINDOW = 90
33+
DEFAULT_CHUNK_DAYS = 1
2834

2935

3036
@click.group()
@@ -34,24 +40,74 @@ def cli():
3440

3541

3642
@cli.command()
37-
@click.option("-l", "--limit", type=click.IntRange(min=1), default=None)
38-
@click.option("-L", "--cached-limit", type=click.IntRange(min=1), default=1000)
3943
@click.option(
4044
"-m",
4145
"--event",
4246
type=click.Choice(ISSUES.keys(), case_sensitive=False),
4347
multiple=True,
4448
default=("started", "success", "failed"),
49+
help="Which Sentry issues to fetch",
50+
)
51+
@click.option(
52+
"-s",
53+
"--since",
54+
type=click.DateTime(formats=["%Y-%m-%d"]),
55+
default=None,
56+
help=f"Start date (inclusive) in YYYY-MM-DD; defaults to {DEFAULT_DAYS_WINDOW} days ago",
57+
)
58+
@click.option(
59+
"-u",
60+
"--until",
61+
type=click.DateTime(formats=["%Y-%m-%d"]),
62+
default=None,
63+
help="End date (exclusive) in YYYY-MM-DD; defaults to today",
64+
)
65+
@click.option(
66+
"-c",
67+
"--chunk-days",
68+
type=click.IntRange(min=1),
69+
default=DEFAULT_CHUNK_DAYS,
70+
help="Number of days per parallel chunk",
4571
)
46-
def get(limit, cached_limit, event):
47-
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S [Started]"))
72+
@click.option(
73+
"-J",
74+
"--jobs",
75+
type=click.IntRange(min=1),
76+
default=None,
77+
help="Max number of parallel worker threads",
78+
)
79+
@click.option(
80+
"-M", "--max-errors", type=click.IntRange(min=1), default=DEFAULT_MAX_ERRORS
81+
)
82+
@click.option("-L", "--cached-limit", type=click.IntRange(min=1), default=None)
83+
def get(event, since, until, chunk_days, jobs, max_errors, cached_limit):
84+
"""Fetch events in parallel using time-window chunking."""
85+
86+
token = os.getenv("SENTRY_TOKEN")
87+
if not token:
88+
click.echo("ERROR: SENTRY_TOKEN environment variable not set", err=True)
89+
sys.exit(1)
90+
91+
now = datetime.now(timezone.utc)
92+
93+
since = since or (now - timedelta(days=DEFAULT_DAYS_WINDOW))
94+
until = until or now
95+
96+
click.echo(f"{now:%Y-%m-%d %H:%M:%S} [Started]")
4897

4998
# Get events
50-
for _ev in event:
51-
print(f"{_ev}:", end="", flush=True)
52-
get_events(_ev, limit=limit, cached_limit=cached_limit)
53-
54-
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S [Stopped]"))
99+
for ev in event:
100+
parallel_fetch(
101+
event_name=ev,
102+
token=token,
103+
since=since,
104+
until=until,
105+
days_per_chunk=chunk_days,
106+
max_workers=jobs,
107+
cached_limit=cached_limit,
108+
max_errors=max_errors,
109+
)
110+
click.echo(f"{datetime.now(timezone.utc):%Y-%m-%d %H:%M:%S} [Finished]")
55111

56112

57113
if __name__ == "__main__":

0 commit comments

Comments
 (0)