Skip to content

Commit a8c7cdc

Browse files
committed
Improve event filtering
1 parent a975169 commit a8c7cdc

File tree

1 file changed

+17
-6
lines changed

1 file changed

+17
-6
lines changed

src/api.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,29 @@
4141
}
4242

4343

44-
def filter_new(event, collection):
45-
cached = collection.count_documents(filter={"id": event["id"]})
44+
def filter_new(events, collection):
45+
"""Return the subset of *events* not already cached in *collection*."""
4646

47-
if not cached:
47+
ids = [e["id"] for e in events]
48+
if not ids:
49+
return []
50+
51+
cached_ids = set(collection.distinct("id", {"id": {"$in": ids}}))
52+
53+
new_docs = []
54+
for event in events:
55+
if event["id"] in cached_ids:
56+
continue
4857
event.update(
4958
{
5059
f"{tag['key'].replace('.', '_')}": tag["value"]
5160
for tag in event.pop("tags")
5261
}
5362
)
5463
event.pop("environment", None)
55-
return event
64+
new_docs.append(event)
65+
66+
return new_docs
5667

5768

5869
def _to_sentry_time(dt):
@@ -78,6 +89,7 @@ def fetch_window(
7889
)
7990

8091
db = MongoClient().fmriprep_stats[event_name]
92+
db.create_index("id", unique=True)
8193
cursor = None
8294
errors = 0
8395
consecutive_cached = 0
@@ -120,8 +132,7 @@ def fetch_window(
120132
for e in req.json()
121133
if {"key": "environment", "value": "prod"} in e["tags"]
122134
]
123-
new_docs = [filter_new(e, db) for e in events]
124-
new_docs = [d for d in new_docs if d]
135+
new_docs = filter_new(events, db)
125136

126137
new_records += len(new_docs)
127138
total_records += len(events)

0 commit comments

Comments
 (0)