Skip to content
This repository was archived by the owner on Apr 12, 2024. It is now read-only.

Commit 8942e23

Browse files
authored
Always update AS last_pos, even on no events (#10107)
Fixes #1834. `get_new_events_for_appservice` internally calls `get_events_as_list`, which will filter out any rejected events. If all returned events are filtered out, `_notify_interested_services` will return without updating the last handled stream position. If there are 100 consecutive such events, processing will halt altogether. Breaking the loop is now done by checking whether we're up-to-date with `current_max` in the loop condition, instead of relying on an empty `events` list. Signed-off-by: Willem Mulder <[email protected]>
1 parent d558292 commit 8942e23

File tree

3 files changed

+15
-17
lines changed

3 files changed

+15
-17
lines changed

changelog.d/10107.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed a bug that could cause Synapse to stop notifying application services. Contributed by Willem Mulder.

synapse/handlers/appservice.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,15 @@ async def _notify_interested_services(self, max_token: RoomStreamToken):
8787
self.is_processing = True
8888
try:
8989
limit = 100
90-
while True:
90+
upper_bound = -1
91+
while upper_bound < self.current_max:
9192
(
9293
upper_bound,
9394
events,
9495
) = await self.store.get_new_events_for_appservice(
9596
self.current_max, limit
9697
)
9798

98-
if not events:
99-
break
100-
10199
events_by_room = {} # type: Dict[str, List[EventBase]]
102100
for event in events:
103101
events_by_room.setdefault(event.room_id, []).append(event)
@@ -153,9 +151,6 @@ async def handle_room_events(events):
153151

154152
await self.store.set_appservice_last_pos(upper_bound)
155153

156-
now = self.clock.time_msec()
157-
ts = await self.store.get_received_ts(events[-1].event_id)
158-
159154
synapse.metrics.event_processing_positions.labels(
160155
"appservice_sender"
161156
).set(upper_bound)
@@ -168,12 +163,16 @@ async def handle_room_events(events):
168163

169164
event_processing_loop_counter.labels("appservice_sender").inc()
170165

171-
synapse.metrics.event_processing_lag.labels(
172-
"appservice_sender"
173-
).set(now - ts)
174-
synapse.metrics.event_processing_last_ts.labels(
175-
"appservice_sender"
176-
).set(ts)
166+
if events:
167+
now = self.clock.time_msec()
168+
ts = await self.store.get_received_ts(events[-1].event_id)
169+
170+
synapse.metrics.event_processing_lag.labels(
171+
"appservice_sender"
172+
).set(now - ts)
173+
synapse.metrics.event_processing_last_ts.labels(
174+
"appservice_sender"
175+
).set(ts)
177176
finally:
178177
self.is_processing = False
179178

tests/handlers/test_appservice.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ def test_notify_interested_services(self):
5757
sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
5858
)
5959
self.mock_store.get_new_events_for_appservice.side_effect = [
60-
make_awaitable((0, [event])),
6160
make_awaitable((0, [])),
61+
make_awaitable((1, [event])),
6262
]
63-
self.handler.notify_interested_services(RoomStreamToken(None, 0))
63+
self.handler.notify_interested_services(RoomStreamToken(None, 1))
6464

6565
self.mock_scheduler.submit_event_for_as.assert_called_once_with(
6666
interested_service, event
@@ -77,7 +77,6 @@ def test_query_user_exists_unknown_user(self):
7777
self.mock_as_api.query_user.return_value = make_awaitable(True)
7878
self.mock_store.get_new_events_for_appservice.side_effect = [
7979
make_awaitable((0, [event])),
80-
make_awaitable((0, [])),
8180
]
8281

8382
self.handler.notify_interested_services(RoomStreamToken(None, 0))
@@ -95,7 +94,6 @@ def test_query_user_exists_known_user(self):
9594
self.mock_as_api.query_user.return_value = make_awaitable(True)
9695
self.mock_store.get_new_events_for_appservice.side_effect = [
9796
make_awaitable((0, [event])),
98-
make_awaitable((0, [])),
9997
]
10098

10199
self.handler.notify_interested_services(RoomStreamToken(None, 0))

0 commit comments

Comments
 (0)