@@ -269,11 +269,11 @@ async def _background_backfill_thread_id(
269269 event_push_actions_done = progress .get ("event_push_actions_done" , False )
270270
271271 def add_thread_id_txn (
272- txn : LoggingTransaction , table_name : str , start_stream_ordering : int
272+ txn : LoggingTransaction , start_stream_ordering : int
273273 ) -> int :
274- sql = f """
274+ sql = """
275275 SELECT stream_ordering
276- FROM { table_name }
276+ FROM event_push_actions
277277 WHERE
278278 thread_id IS NULL
279279 AND stream_ordering > ?
@@ -285,7 +285,7 @@ def add_thread_id_txn(
285285 # No more rows to process.
286286 rows = txn .fetchall ()
287287 if not rows :
288- progress [f" { table_name } _done " ] = True
288+ progress ["event_push_actions_done " ] = True
289289 self .db_pool .updates ._background_update_progress_txn (
290290 txn , "event_push_backfill_thread_id" , progress
291291 )
@@ -294,8 +294,8 @@ def add_thread_id_txn(
294294 # Update the thread ID for any of those rows.
295295 max_stream_ordering = rows [- 1 ][0 ]
296296
297- sql = f """
298- UPDATE { table_name }
297+ sql = """
298+ UPDATE event_push_actions
299299 SET thread_id = 'main'
300300 WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
301301 """
@@ -309,7 +309,50 @@ def add_thread_id_txn(
309309
310310 # Update progress.
311311 processed_rows = txn .rowcount
312- progress [f"max_{ table_name } _stream_ordering" ] = max_stream_ordering
312+ progress ["max_event_push_actions_stream_ordering" ] = max_stream_ordering
313+ self .db_pool .updates ._background_update_progress_txn (
314+ txn , "event_push_backfill_thread_id" , progress
315+ )
316+
317+ return processed_rows
318+
319+ def add_thread_id_summary_txn (txn : LoggingTransaction ) -> int :
320+ min_user_id = progress .get ("max_summary_user_id" , "" )
321+ min_room_id = progress .get ("max_summary_room_id" , "" )
322+
323+ # Slightly overcomplicated query for getting the Nth user ID / room
324+ # ID tuple, or the last if there are less than N remaining.
325+ sql = """
326+ SELECT user_id, room_id FROM (
327+ SELECT user_id, room_id FROM event_push_summary
328+ WHERE (user_id, room_id) > (?, ?)
329+ AND thread_id IS NULL
330+ ORDER BY user_id, room_id
331+ LIMIT ?
332+ ) AS e
333+ ORDER BY user_id DESC, room_id DESC
334+ LIMIT 1
335+ """
336+
337+ txn .execute (sql , (min_user_id , min_room_id , batch_size ))
338+ row = txn .fetchone ()
339+ if not row :
340+ return 0
341+
342+ max_user_id , max_room_id = row
343+
344+ sql = """
345+ UPDATE event_push_summary
346+ SET thread_id = 'main'
347+ WHERE
348+ (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
349+ AND thread_id IS NULL
350+ """
351+ txn .execute (sql , (min_user_id , min_room_id , max_user_id , max_room_id ))
352+ processed_rows = txn .rowcount
353+
354+ progress ["max_summary_user_id" ] = max_user_id
355+ progress ["max_summary_room_id" ] = max_room_id
313356 self .db_pool .updates ._background_update_progress_txn (
314357 txn , "event_push_backfill_thread_id" , progress
315358 )
@@ -325,15 +368,12 @@ def add_thread_id_txn(
325368 result = await self .db_pool .runInteraction (
326369 "event_push_backfill_thread_id" ,
327370 add_thread_id_txn ,
328- "event_push_actions" ,
329371 progress .get ("max_event_push_actions_stream_ordering" , 0 ),
330372 )
331373 else :
332374 result = await self .db_pool .runInteraction (
333375 "event_push_backfill_thread_id" ,
334- add_thread_id_txn ,
335- "event_push_summary" ,
336- progress .get ("max_event_push_summary_stream_ordering" , 0 ),
376+ add_thread_id_summary_txn ,
337377 )
338378
339379 # Only done after the event_push_summary table is done.
0 commit comments