Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit e4ab867

Browse files
authored
Fix tight loop handling presence replication. (#9900)
Only affects workers. Introduced in #9819. Fixes #9899.
1 parent 8ba0869 commit e4ab867

File tree

3 files changed

+46
-1
lines changed

3 files changed

+46
-1
lines changed

changelog.d/9900.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1.

synapse/handlers/presence.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2026,18 +2026,40 @@ async def get_replication_rows(
20262026
)
20272027
return result["updates"], result["upto_token"], result["limited"]
20282028

2029+
# If the from_token is the current token then there's nothing to return
2030+
# and we can trivially no-op.
2031+
if from_token == self._next_id - 1:
2032+
return [], upto_token, False
2033+
20292034
# We can find the correct position in the queue by noting that there is
20302035
# exactly one entry per stream ID, and that the last entry has an ID of
20312036
# `self._next_id - 1`, so we can count backwards from the end.
20322037
#
2038+
# Since we are returning all states in the range `from_token < stream_id
2039+
# <= upto_token` we look for the index with a `stream_id` of `from_token
2040+
# + 1`.
2041+
#
20332042
# Since the start of the queue is periodically truncated we need to
20342043
# handle the case where `from_token` stream ID has already been dropped.
2035-
start_idx = max(from_token - self._next_id, -len(self._queue))
2044+
start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
20362045

20372046
to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
20382047
limited = False
20392048
new_id = upto_token
20402049
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
2050+
if stream_id <= from_token:
2051+
# Paranoia check that we are actually only sending states that
2052+
# are have stream_id strictly greater than from_token. We should
2053+
# never hit this.
2054+
logger.warning(
2055+
"Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
2056+
stream_id,
2057+
from_token,
2058+
self._next_id,
2059+
len(self._queue),
2060+
)
2061+
continue
2062+
20412063
if stream_id > upto_token:
20422064
break
20432065

tests/handlers/test_presence.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,14 @@ def test_send_and_get(self):
509509

510510
self.assertCountEqual(rows, expected_rows)
511511

512+
now_token = self.queue.get_current_token(self.instance_name)
513+
rows, upto_token, limited = self.get_success(
514+
self.queue.get_replication_rows("master", upto_token, now_token, 10)
515+
)
516+
self.assertEqual(upto_token, now_token)
517+
self.assertFalse(limited)
518+
self.assertCountEqual(rows, [])
519+
512520
def test_send_and_get_split(self):
513521
state1 = UserPresenceState.default("@user1:test")
514522
state2 = UserPresenceState.default("@user2:test")
@@ -538,6 +546,20 @@ def test_send_and_get_split(self):
538546

539547
self.assertCountEqual(rows, expected_rows)
540548

549+
now_token = self.queue.get_current_token(self.instance_name)
550+
rows, upto_token, limited = self.get_success(
551+
self.queue.get_replication_rows("master", upto_token, now_token, 10)
552+
)
553+
554+
self.assertEqual(upto_token, now_token)
555+
self.assertFalse(limited)
556+
557+
expected_rows = [
558+
(2, ("dest3", "@user3:test")),
559+
]
560+
561+
self.assertCountEqual(rows, expected_rows)
562+
541563
def test_clear_queue_all(self):
542564
state1 = UserPresenceState.default("@user1:test")
543565
state2 = UserPresenceState.default("@user2:test")

0 commit comments

Comments
 (0)