@@ -864,18 +864,20 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
864864
865865 limit = 100
866866
867- min_stream_id = self .db_pool .simple_select_one_onecol_txn (
867+ min_receipts_stream_id = self .db_pool .simple_select_one_onecol_txn (
868868 txn ,
869869 table = "event_push_summary_last_receipt_stream_id" ,
870870 keyvalues = {},
871871 retcol = "stream_id" ,
872872 )
873873
874+ max_receipts_stream_id = self ._receipts_id_gen .get_current_token ()
875+
874876 sql = """
875877 SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
876878 FROM receipts_linearized AS r
877879 INNER JOIN events AS e USING (event_id)
878- WHERE r.stream_id > ? AND user_id LIKE ?
880+ WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
879881 ORDER BY r.stream_id ASC
880882 LIMIT ?
881883 """
@@ -887,13 +889,21 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
887889 txn .execute (
888890 sql ,
889891 (
890- min_stream_id ,
892+ min_receipts_stream_id ,
893+ max_receipts_stream_id ,
891894 user_filter ,
892895 limit ,
893896 ),
894897 )
895898 rows = txn .fetchall ()
896899
900+ old_rotate_stream_ordering = self .db_pool .simple_select_one_onecol_txn (
901+ txn ,
902+ table = "event_push_summary_stream_ordering" ,
903+ keyvalues = {},
904+ retcol = "stream_ordering" ,
905+ )
906+
897907 # For each new read receipt we delete push actions from before it and
898908 # recalculate the summary.
899909 for _ , room_id , user_id , stream_ordering in rows :
@@ -912,13 +922,6 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
912922 (room_id , user_id , stream_ordering ),
913923 )
914924
915- old_rotate_stream_ordering = self .db_pool .simple_select_one_onecol_txn (
916- txn ,
917- table = "event_push_summary_stream_ordering" ,
918- keyvalues = {},
919- retcol = "stream_ordering" ,
920- )
921-
922925 notif_count , unread_count = self ._get_notif_unread_count_for_user_room (
923926 txn , room_id , user_id , stream_ordering , old_rotate_stream_ordering
924927 )
@@ -937,18 +940,19 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
937940
938941 # We always update `event_push_summary_last_receipt_stream_id` to
939942 # ensure that we don't rescan the same receipts for remote users.
940- #
941- # This requires repeatable read to be safe, as we need the
942- # `MAX(stream_id)` to not include any new rows that have been committed
943- # since the start of the transaction (since those rows won't have been
944- # returned by the query above). Alternatively we could query the max
945- # stream ID at the start of the transaction and bound everything by
946- # that.
947- txn .execute (
948- """
949- UPDATE event_push_summary_last_receipt_stream_id
950- SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized)
951- """
943+
944+ upper_limit = max_receipts_stream_id
945+ if len (rows ) >= limit :
946+ # If we pulled out a limited number of rows we only update the
947+ # position to the last receipt we processed, so we continue
948+ # processing the rest next iteration.
949+ upper_limit = rows [- 1 ][0 ]
950+
951+ self .db_pool .simple_update_txn (
952+ txn ,
953+ table = "event_push_summary_last_receipt_stream_id" ,
954+ keyvalues = {},
955+ updatevalues = {"stream_id" : upper_limit },
952956 )
953957
954958 return len (rows ) < limit
@@ -1199,6 +1203,16 @@ def __init__(
11991203 where_clause = "highlight=1" ,
12001204 )
12011205
1206+ # Add index to make deleting old push actions faster.
1207+ self .db_pool .updates .register_background_index_update (
1208+ "event_push_actions_stream_highlight_index" ,
1209+ index_name = "event_push_actions_stream_highlight_index" ,
1210+ table = "event_push_actions" ,
1211+ columns = ["highlight" , "stream_ordering" ],
1212+ where_clause = "highlight=0" ,
1213+ psql_only = True ,
1214+ )
1215+
12021216 async def get_push_actions_for_user (
12031217 self ,
12041218 user_id : str ,
0 commit comments