Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 7b7478e

Browse files
authored
Batch up notifications after event persistence (#14033)
1 parent 51436c8 commit 7b7478e

File tree

5 files changed

+66
-58
lines changed

5 files changed

+66
-58
lines changed

changelog.d/14033.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Don't repeatedly wake up the same users for batched events.

synapse/handlers/federation_event.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2240,8 +2240,8 @@ async def _notify_persisted_event(
22402240
event_pos = PersistedEventPosition(
22412241
self._instance_name, event.internal_metadata.stream_ordering
22422242
)
2243-
await self._notifier.on_new_room_event(
2244-
event, event_pos, max_stream_token, extra_users=extra_users
2243+
await self._notifier.on_new_room_events(
2244+
[(event, event_pos)], max_stream_token, extra_users=extra_users
22452245
)
22462246

22472247
if event.type == EventTypes.Member and event.membership == Membership.JOIN:

synapse/handlers/message.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1872,6 +1872,7 @@ async def persist_and_notify_client_events(
18721872
events_and_context, backfilled=backfilled
18731873
)
18741874

1875+
events_and_pos = []
18751876
for event in persisted_events:
18761877
if self._ephemeral_events_enabled:
18771878
# If there's an expiry timestamp on the event, schedule its expiry.
@@ -1880,25 +1881,23 @@ async def persist_and_notify_client_events(
18801881
stream_ordering = event.internal_metadata.stream_ordering
18811882
assert stream_ordering is not None
18821883
pos = PersistedEventPosition(self._instance_name, stream_ordering)
1883-
1884-
async def _notify() -> None:
1885-
try:
1886-
await self.notifier.on_new_room_event(
1887-
event, pos, max_stream_token, extra_users=extra_users
1888-
)
1889-
except Exception:
1890-
logger.exception(
1891-
"Error notifying about new room event %s",
1892-
event.event_id,
1893-
)
1894-
1895-
run_in_background(_notify)
1884+
events_and_pos.append((event, pos))
18961885

18971886
if event.type == EventTypes.Message:
18981887
# We don't want to block sending messages on any presence code. This
18991888
# matters as sometimes presence code can take a while.
19001889
run_in_background(self._bump_active_time, requester.user)
19011890

1891+
async def _notify() -> None:
1892+
try:
1893+
await self.notifier.on_new_room_events(
1894+
events_and_pos, max_stream_token, extra_users=extra_users
1895+
)
1896+
except Exception:
1897+
logger.exception("Error notifying about new room events")
1898+
1899+
run_in_background(_notify)
1900+
19021901
return persisted_events[-1]
19031902

19041903
async def _maybe_kick_guest_users(

synapse/notifier.py

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -294,35 +294,31 @@ def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
294294
"""
295295
self._new_join_in_room_callbacks.append(cb)
296296

297-
async def on_new_room_event(
297+
async def on_new_room_events(
298298
self,
299-
event: EventBase,
300-
event_pos: PersistedEventPosition,
299+
events_and_pos: List[Tuple[EventBase, PersistedEventPosition]],
301300
max_room_stream_token: RoomStreamToken,
302301
extra_users: Optional[Collection[UserID]] = None,
303302
) -> None:
304-
"""Unwraps event and calls `on_new_room_event_args`."""
305-
await self.on_new_room_event_args(
306-
event_pos=event_pos,
307-
room_id=event.room_id,
308-
event_id=event.event_id,
309-
event_type=event.type,
310-
state_key=event.get("state_key"),
311-
membership=event.content.get("membership"),
312-
max_room_stream_token=max_room_stream_token,
313-
extra_users=extra_users or [],
314-
)
303+
"""Creates a _PendingRoomEventEntry for each of the listed events and calls
304+
notify_new_room_events with the results."""
305+
event_entries = []
306+
for event, pos in events_and_pos:
307+
entry = self.create_pending_room_event_entry(
308+
pos,
309+
extra_users,
310+
event.room_id,
311+
event.type,
312+
event.get("state_key"),
313+
event.content.get("membership"),
314+
)
315+
event_entries.append((entry, event.event_id))
316+
await self.notify_new_room_events(event_entries, max_room_stream_token)
315317

316-
async def on_new_room_event_args(
318+
async def notify_new_room_events(
317319
self,
318-
room_id: str,
319-
event_id: str,
320-
event_type: str,
321-
state_key: Optional[str],
322-
membership: Optional[str],
323-
event_pos: PersistedEventPosition,
320+
event_entries: List[Tuple[_PendingRoomEventEntry, str]],
324321
max_room_stream_token: RoomStreamToken,
325-
extra_users: Optional[Collection[UserID]] = None,
326322
) -> None:
327323
"""Used by handlers to inform the notifier something has happened
328324
in the room, room event wise.
@@ -338,22 +334,33 @@ async def on_new_room_event_args(
338334
until all previous events have been persisted before notifying
339335
the client streams.
340336
"""
341-
self.pending_new_room_events.append(
342-
_PendingRoomEventEntry(
343-
event_pos=event_pos,
344-
extra_users=extra_users or [],
345-
room_id=room_id,
346-
type=event_type,
347-
state_key=state_key,
348-
membership=membership,
349-
)
350-
)
351-
self._notify_pending_new_room_events(max_room_stream_token)
337+
for event_entry, event_id in event_entries:
338+
self.pending_new_room_events.append(event_entry)
339+
await self._third_party_rules.on_new_event(event_id)
352340

353-
await self._third_party_rules.on_new_event(event_id)
341+
self._notify_pending_new_room_events(max_room_stream_token)
354342

355343
self.notify_replication()
356344

345+
def create_pending_room_event_entry(
346+
self,
347+
event_pos: PersistedEventPosition,
348+
extra_users: Optional[Collection[UserID]],
349+
room_id: str,
350+
event_type: str,
351+
state_key: Optional[str],
352+
membership: Optional[str],
353+
) -> _PendingRoomEventEntry:
354+
"""Creates and returns a _PendingRoomEventEntry"""
355+
return _PendingRoomEventEntry(
356+
event_pos=event_pos,
357+
extra_users=extra_users or [],
358+
room_id=room_id,
359+
type=event_type,
360+
state_key=state_key,
361+
membership=membership,
362+
)
363+
357364
def _notify_pending_new_room_events(
358365
self, max_room_stream_token: RoomStreamToken
359366
) -> None:

synapse/replication/tcp/client.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,16 @@ async def on_rdata(
210210

211211
max_token = self.store.get_room_max_token()
212212
event_pos = PersistedEventPosition(instance_name, token)
213-
await self.notifier.on_new_room_event_args(
214-
event_pos=event_pos,
215-
max_room_stream_token=max_token,
216-
extra_users=extra_users,
217-
room_id=row.data.room_id,
218-
event_id=row.data.event_id,
219-
event_type=row.data.type,
220-
state_key=row.data.state_key,
221-
membership=row.data.membership,
213+
event_entry = self.notifier.create_pending_room_event_entry(
214+
event_pos,
215+
extra_users,
216+
row.data.room_id,
217+
row.data.type,
218+
row.data.state_key,
219+
row.data.membership,
220+
)
221+
await self.notifier.notify_new_room_events(
222+
[(event_entry, row.data.event_id)], max_token
222223
)
223224

224225
# If this event is a join, make a note of it so we have an accurate

0 commit comments

Comments
 (0)