File tree Expand file tree Collapse file tree 1 file changed +17
-6
lines changed Expand file tree Collapse file tree 1 file changed +17
-6
lines changed Original file line number Diff line number Diff line change 4141}
4242
4343
44- def filter_new (event , collection ):
45- cached = collection . count_documents ( filter = { "id" : event [ "id" ]})
44+ def filter_new (events , collection ):
45+ """Return the subset of *events* not already cached in * collection*."""
4646
47- if not cached :
47+ ids = [e ["id" ] for e in events ]
48+ if not ids :
49+ return []
50+
51+ cached_ids = set (collection .distinct ("id" , {"id" : {"$in" : ids }}))
52+
53+ new_docs = []
54+ for event in events :
55+ if event ["id" ] in cached_ids :
56+ continue
4857 event .update (
4958 {
5059 f"{ tag ['key' ].replace ('.' , '_' )} " : tag ["value" ]
5160 for tag in event .pop ("tags" )
5261 }
5362 )
5463 event .pop ("environment" , None )
55- return event
64+ new_docs .append (event )
65+
66+ return new_docs
5667
5768
5869def _to_sentry_time (dt ):
@@ -78,6 +89,7 @@ def fetch_window(
7889 )
7990
8091 db = MongoClient ().fmriprep_stats [event_name ]
92+ db .create_index ("id" , unique = True )
8193 cursor = None
8294 errors = 0
8395 consecutive_cached = 0
@@ -120,8 +132,7 @@ def fetch_window(
120132 for e in req .json ()
121133 if {"key" : "environment" , "value" : "prod" } in e ["tags" ]
122134 ]
123- new_docs = [filter_new (e , db ) for e in events ]
124- new_docs = [d for d in new_docs if d ]
135+ new_docs = filter_new (events , db )
125136
126137 new_records += len (new_docs )
127138 total_records += len (events )
You can’t perform that action at this time.
0 commit comments