Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 7469824

Browse files
authored
Fix serialization errors when rotating notifications (#13118)
1 parent f114556 commit 7469824

File tree

5 files changed

+202
-83
lines changed

5 files changed

+202
-83
lines changed

changelog.d/13118.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room.

synapse/storage/databases/main/event_push_actions.py

Lines changed: 135 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,30 @@ def _get_unread_counts_by_pos_txn(
233233

234234
counts = NotifCounts()
235235

236-
# First we pull the counts from the summary table
236+
# First we pull the counts from the summary table.
237+
#
238+
# We check that `last_receipt_stream_ordering` matches the stream
239+
# ordering given. If it doesn't match then a new read receipt has arrived and
240+
# we haven't yet updated the counts in `event_push_summary` to reflect
241+
# that; in that case we simply ignore `event_push_summary` counts
242+
# and do a manual count of all of the rows in the `event_push_actions` table
243+
# for this user/room.
244+
#
245+
# If `last_receipt_stream_ordering` is null then that means it's up to
246+
# date (as the row was written by an older version of Synapse that
247+
# updated `event_push_summary` synchronously when persisting a new read
248+
# receipt).
237249
txn.execute(
238250
"""
239251
SELECT stream_ordering, notif_count, COALESCE(unread_count, 0)
240252
FROM event_push_summary
241-
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
253+
WHERE room_id = ? AND user_id = ?
254+
AND (
255+
(last_receipt_stream_ordering IS NULL AND stream_ordering > ?)
256+
OR last_receipt_stream_ordering = ?
257+
)
242258
""",
243-
(room_id, user_id, stream_ordering),
259+
(room_id, user_id, stream_ordering, stream_ordering),
244260
)
245261
row = txn.fetchone()
246262

@@ -263,9 +279,9 @@ def _get_unread_counts_by_pos_txn(
263279
if row:
264280
counts.highlight_count += row[0]
265281

266-
# Finally we need to count push actions that haven't been summarized
267-
# yet.
268-
# We only want to pull out push actions that we haven't summarized yet.
282+
# Finally we need to count push actions that aren't included in the
283+
# summary returned above, e.g. recent events that haven't been
284+
# summarized yet, or the summary is empty due to a recent read receipt.
269285
stream_ordering = max(stream_ordering, summary_stream_ordering)
270286
notify_count, unread_count = self._get_notif_unread_count_for_user_room(
271287
txn, room_id, user_id, stream_ordering
@@ -800,6 +816,19 @@ async def _rotate_notifs(self) -> None:
800816
self._doing_notif_rotation = True
801817

802818
try:
819+
# First we recalculate push summaries and delete stale push actions
820+
# for rooms/users with new receipts.
821+
while True:
822+
logger.debug("Handling new receipts")
823+
824+
caught_up = await self.db_pool.runInteraction(
825+
"_handle_new_receipts_for_notifs_txn",
826+
self._handle_new_receipts_for_notifs_txn,
827+
)
828+
if caught_up:
829+
break
830+
831+
# Then we update the event push summaries for any new events
803832
while True:
804833
logger.info("Rotating notifications")
805834

@@ -810,10 +839,110 @@ async def _rotate_notifs(self) -> None:
810839
break
811840
await self.hs.get_clock().sleep(self._rotate_delay)
812841

842+
# Finally we clear out old event push actions.
813843
await self._remove_old_push_actions_that_have_rotated()
814844
finally:
815845
self._doing_notif_rotation = False
816846

847+
def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
848+
"""Check for new read receipts and delete from event push actions.
849+
850+
Any push actions which predate the user's most recent read receipt are
851+
now redundant, so we can remove them from `event_push_actions` and
852+
update `event_push_summary`.
853+
"""
854+
855+
limit = 100
856+
857+
min_stream_id = self.db_pool.simple_select_one_onecol_txn(
858+
txn,
859+
table="event_push_summary_last_receipt_stream_id",
860+
keyvalues={},
861+
retcol="stream_id",
862+
)
863+
864+
sql = """
865+
SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
866+
FROM receipts_linearized AS r
867+
INNER JOIN events AS e USING (event_id)
868+
WHERE r.stream_id > ? AND user_id LIKE ?
869+
ORDER BY r.stream_id ASC
870+
LIMIT ?
871+
"""
872+
873+
# We only want local users, so we add a dodgy filter to the above query
874+
# and recheck it below.
875+
user_filter = "%:" + self.hs.hostname
876+
877+
txn.execute(
878+
sql,
879+
(
880+
min_stream_id,
881+
user_filter,
882+
limit,
883+
),
884+
)
885+
rows = txn.fetchall()
886+
887+
# For each new read receipt we delete push actions from before it and
888+
# recalculate the summary.
889+
for _, room_id, user_id, stream_ordering in rows:
890+
# Only handle our own read receipts.
891+
if not self.hs.is_mine_id(user_id):
892+
continue
893+
894+
txn.execute(
895+
"""
896+
DELETE FROM event_push_actions
897+
WHERE room_id = ?
898+
AND user_id = ?
899+
AND stream_ordering <= ?
900+
AND highlight = 0
901+
""",
902+
(room_id, user_id, stream_ordering),
903+
)
904+
905+
old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
906+
txn,
907+
table="event_push_summary_stream_ordering",
908+
keyvalues={},
909+
retcol="stream_ordering",
910+
)
911+
912+
notif_count, unread_count = self._get_notif_unread_count_for_user_room(
913+
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
914+
)
915+
916+
self.db_pool.simple_upsert_txn(
917+
txn,
918+
table="event_push_summary",
919+
keyvalues={"room_id": room_id, "user_id": user_id},
920+
values={
921+
"notif_count": notif_count,
922+
"unread_count": unread_count,
923+
"stream_ordering": old_rotate_stream_ordering,
924+
"last_receipt_stream_ordering": stream_ordering,
925+
},
926+
)
927+
928+
# We always update `event_push_summary_last_receipt_stream_id` to
929+
# ensure that we don't rescan the same receipts for remote users.
930+
#
931+
# This requires repeatable read to be safe, as we need the
932+
# `MAX(stream_id)` to not include any new rows that have been committed
933+
# since the start of the transaction (since those rows won't have been
934+
# returned by the query above). Alternatively we could query the max
935+
# stream ID at the start of the transaction and bound everything by
936+
# that.
937+
txn.execute(
938+
"""
939+
UPDATE event_push_summary_last_receipt_stream_id
940+
SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized)
941+
"""
942+
)
943+
944+
return len(rows) < limit
945+
817946
def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool:
818947
"""Archives older notifications into event_push_summary. Returns whether
819948
the archiving process has caught up or not.
@@ -1033,66 +1162,6 @@ def remove_old_push_actions_that_have_rotated_txn(
10331162
if done:
10341163
break
10351164

1036-
def _remove_old_push_actions_before_txn(
1037-
self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
1038-
) -> None:
1039-
"""
1040-
Purges old push actions for a user and room before a given
1041-
stream_ordering.
1042-
1043-
We however keep a months worth of highlighted notifications, so that
1044-
users can still get a list of recent highlights.
1045-
1046-
Args:
1047-
txn: The transaction
1048-
room_id: Room ID to delete from
1049-
user_id: user ID to delete for
1050-
stream_ordering: The lowest stream ordering which will
1051-
not be deleted.
1052-
"""
1053-
txn.call_after(
1054-
self.get_unread_event_push_actions_by_room_for_user.invalidate,
1055-
(room_id, user_id),
1056-
)
1057-
1058-
# We need to join on the events table to get the received_ts for
1059-
# event_push_actions and sqlite won't let us use a join in a delete so
1060-
# we can't just delete where received_ts < x. Furthermore we can
1061-
# only identify event_push_actions by a tuple of room_id, event_id
1062-
# we we can't use a subquery.
1063-
# Instead, we look up the stream ordering for the last event in that
1064-
# room received before the threshold time and delete event_push_actions
1065-
# in the room with a stream_odering before that.
1066-
txn.execute(
1067-
"DELETE FROM event_push_actions "
1068-
" WHERE user_id = ? AND room_id = ? AND "
1069-
" stream_ordering <= ?"
1070-
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
1071-
(user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
1072-
)
1073-
1074-
old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
1075-
txn,
1076-
table="event_push_summary_stream_ordering",
1077-
keyvalues={},
1078-
retcol="stream_ordering",
1079-
)
1080-
1081-
notif_count, unread_count = self._get_notif_unread_count_for_user_room(
1082-
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
1083-
)
1084-
1085-
self.db_pool.simple_upsert_txn(
1086-
txn,
1087-
table="event_push_summary",
1088-
keyvalues={"room_id": room_id, "user_id": user_id},
1089-
values={
1090-
"notif_count": notif_count,
1091-
"unread_count": unread_count,
1092-
"stream_ordering": old_rotate_stream_ordering,
1093-
},
1094-
)
1095-
10961165

10971166
class EventPushActionsStore(EventPushActionsWorkerStore):
10981167
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"

synapse/storage/databases/main/receipts.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
cast,
2727
)
2828

29-
from synapse.api.constants import EduTypes, ReceiptTypes
29+
from synapse.api.constants import EduTypes
3030
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
3131
from synapse.replication.tcp.streams import ReceiptsStream
3232
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -682,17 +682,6 @@ def _insert_linearized_receipt_txn(
682682
lock=False,
683683
)
684684

685-
# When updating a local users read receipt, remove any push actions
686-
# which resulted from the receipt's event and all earlier events.
687-
if (
688-
self.hs.is_mine_id(user_id)
689-
and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
690-
and stream_ordering is not None
691-
):
692-
self._remove_old_push_actions_before_txn( # type: ignore[attr-defined]
693-
txn, room_id=room_id, user_id=user_id, stream_ordering=stream_ordering
694-
)
695-
696685
return rx_ts
697686

698687
def _graph_to_linear(
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/* Copyright 2022 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
-- Add a column that records the position of the read receipt for the user at
17+
-- the time we summarised the push actions. This is used to check if the counts
18+
-- are up to date after a new read receipt has been sent.
19+
--
20+
-- Null means that we can skip that check, as the row was written by an older
21+
-- version of Synapse that updated `event_push_summary` synchronously when
22+
-- persisting a new read receipt
23+
ALTER TABLE event_push_summary ADD COLUMN last_receipt_stream_ordering BIGINT;
24+
25+
26+
-- Tracks which new receipts we've handled
27+
CREATE TABLE event_push_summary_last_receipt_stream_id (
28+
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
29+
stream_id BIGINT NOT NULL,
30+
CHECK (Lock='X')
31+
);
32+
33+
INSERT INTO event_push_summary_last_receipt_stream_id (stream_id)
34+
SELECT COALESCE(MAX(stream_id), 0)
35+
FROM receipts_linearized;

tests/storage/test_event_push_actions.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def test_get_unread_push_actions_for_user_in_range_for_email(self) -> None:
5555

5656
def test_count_aggregation(self) -> None:
5757
room_id = "!foo:example.com"
58-
user_id = "@user1235:example.com"
58+
user_id = "@user1235:test"
5959

6060
last_read_stream_ordering = [0]
6161

@@ -81,11 +81,26 @@ def _assert_counts(noitf_count: int, highlight_count: int) -> None:
8181
def _inject_actions(stream: int, action: list) -> None:
8282
event = Mock()
8383
event.room_id = room_id
84-
event.event_id = "$test:example.com"
84+
event.event_id = f"$test{stream}:example.com"
8585
event.internal_metadata.stream_ordering = stream
8686
event.internal_metadata.is_outlier.return_value = False
8787
event.depth = stream
8888

89+
self.get_success(
90+
self.store.db_pool.simple_insert(
91+
table="events",
92+
values={
93+
"stream_ordering": stream,
94+
"topological_ordering": stream,
95+
"type": "m.room.message",
96+
"room_id": room_id,
97+
"processed": True,
98+
"outlier": False,
99+
"event_id": event.event_id,
100+
},
101+
)
102+
)
103+
89104
self.get_success(
90105
self.store.add_push_actions_to_staging(
91106
event.event_id,
@@ -105,18 +120,28 @@ def _inject_actions(stream: int, action: list) -> None:
105120
def _rotate(stream: int) -> None:
106121
self.get_success(
107122
self.store.db_pool.runInteraction(
108-
"", self.store._rotate_notifs_before_txn, stream
123+
"rotate-receipts", self.store._handle_new_receipts_for_notifs_txn
124+
)
125+
)
126+
127+
self.get_success(
128+
self.store.db_pool.runInteraction(
129+
"rotate-notifs", self.store._rotate_notifs_before_txn, stream
109130
)
110131
)
111132

112133
def _mark_read(stream: int, depth: int) -> None:
113134
last_read_stream_ordering[0] = stream
135+
114136
self.get_success(
115137
self.store.db_pool.runInteraction(
116138
"",
117-
self.store._remove_old_push_actions_before_txn,
139+
self.store._insert_linearized_receipt_txn,
118140
room_id,
141+
"m.read",
119142
user_id,
143+
f"$test{stream}:example.com",
144+
{},
120145
stream,
121146
)
122147
)
@@ -150,7 +175,7 @@ def _mark_read(stream: int, depth: int) -> None:
150175

151176
_assert_counts(1, 0)
152177

153-
_mark_read(7, 7)
178+
_mark_read(6, 6)
154179
_assert_counts(0, 0)
155180

156181
_inject_actions(8, HIGHLIGHT)

0 commit comments

Comments
 (0)