Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 8025602

Browse files
committed
Merge remote-tracking branch 'origin/release-v1.33.0' into develop
2 parents 10a08ab + e4ab867 commit 8025602

File tree

3 files changed

+46
-1
lines changed

3 files changed

+46
-1
lines changed

changelog.d/9900.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1.

synapse/handlers/presence.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2045,18 +2045,40 @@ async def get_replication_rows(
20452045
)
20462046
return result["updates"], result["upto_token"], result["limited"]
20472047

2048+
# If the from_token is the current token then there's nothing to return
2049+
# and we can trivially no-op.
2050+
if from_token == self._next_id - 1:
2051+
return [], upto_token, False
2052+
20482053
# We can find the correct position in the queue by noting that there is
20492054
# exactly one entry per stream ID, and that the last entry has an ID of
20502055
# `self._next_id - 1`, so we can count backwards from the end.
20512056
#
2057+
# Since we are returning all states in the range `from_token < stream_id
2058+
# <= upto_token` we look for the index with a `stream_id` of `from_token
2059+
# + 1`.
2060+
#
20522061
# Since the start of the queue is periodically truncated we need to
20532062
# handle the case where `from_token` stream ID has already been dropped.
2054-
start_idx = max(from_token - self._next_id, -len(self._queue))
2063+
start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
20552064

20562065
to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
20572066
limited = False
20582067
new_id = upto_token
20592068
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
2069+
if stream_id <= from_token:
2070+
# Paranoia check that we are actually only sending states that
2071+
# are have stream_id strictly greater than from_token. We should
2072+
# never hit this.
2073+
logger.warning(
2074+
"Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
2075+
stream_id,
2076+
from_token,
2077+
self._next_id,
2078+
len(self._queue),
2079+
)
2080+
continue
2081+
20602082
if stream_id > upto_token:
20612083
break
20622084

tests/handlers/test_presence.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,14 @@ def test_send_and_get(self):
509509

510510
self.assertCountEqual(rows, expected_rows)
511511

512+
now_token = self.queue.get_current_token(self.instance_name)
513+
rows, upto_token, limited = self.get_success(
514+
self.queue.get_replication_rows("master", upto_token, now_token, 10)
515+
)
516+
self.assertEqual(upto_token, now_token)
517+
self.assertFalse(limited)
518+
self.assertCountEqual(rows, [])
519+
512520
def test_send_and_get_split(self):
513521
state1 = UserPresenceState.default("@user1:test")
514522
state2 = UserPresenceState.default("@user2:test")
@@ -538,6 +546,20 @@ def test_send_and_get_split(self):
538546

539547
self.assertCountEqual(rows, expected_rows)
540548

549+
now_token = self.queue.get_current_token(self.instance_name)
550+
rows, upto_token, limited = self.get_success(
551+
self.queue.get_replication_rows("master", upto_token, now_token, 10)
552+
)
553+
554+
self.assertEqual(upto_token, now_token)
555+
self.assertFalse(limited)
556+
557+
expected_rows = [
558+
(2, ("dest3", "@user3:test")),
559+
]
560+
561+
self.assertCountEqual(rows, expected_rows)
562+
541563
def test_clear_queue_all(self):
542564
state1 = UserPresenceState.default("@user1:test")
543565
state2 = UserPresenceState.default("@user2:test")

0 commit comments

Comments
 (0)