Skip to content
This repository was archived by the owner on Apr 12, 2024. It is now read-only.

Commit 43f2b67

Browse files
authored
Intelligently select extremities used in backfill. (#8349)
Instead of just using the most recent extremities let's pick the ones that will give us results that the pagination request cares about, i.e. pick extremities only if they have a smaller depth than the pagination token. This is useful when we fail to backfill an extremity, as we no longer get stuck requesting that same extremity repeatedly.
1 parent 9db4c1b commit 43f2b67

File tree

4 files changed

+67
-20
lines changed

4 files changed

+67
-20
lines changed

changelog.d/8349.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a longstanding bug where back pagination over federation could get stuck if it failed to handle a received event.

synapse/handlers/federation.py

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -943,15 +943,26 @@ async def backfill(self, dest, room_id, limit, extremities):
943943

944944
return events
945945

946-
async def maybe_backfill(self, room_id, current_depth):
946+
async def maybe_backfill(
947+
self, room_id: str, current_depth: int, limit: int
948+
) -> bool:
947949
"""Checks the database to see if we should backfill before paginating,
948950
and if so do.
951+
952+
Args:
953+
room_id
954+
current_depth: The depth from which we're paginating from. This is
955+
used to decide if we should backfill and what extremities to
956+
use.
957+
limit: The number of events that the pagination request will
958+
return. This is used as part of the heuristic to decide if we
959+
should back paginate.
949960
"""
950961
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
951962

952963
if not extremities:
953964
logger.debug("Not backfilling as no extremeties found.")
954-
return
965+
return False
955966

956967
# We only want to paginate if we can actually see the events we'll get,
957968
# as otherwise we'll just spend a lot of resources to get redacted
@@ -1004,16 +1015,54 @@ async def maybe_backfill(self, room_id, current_depth):
10041015
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
10051016
max_depth = sorted_extremeties_tuple[0][1]
10061017

1018+
# If we're approaching an extremity we trigger a backfill, otherwise we
1019+
# no-op.
1020+
#
1021+
# We chose twice the limit here as then clients paginating backwards
1022+
# will send pagination requests that trigger backfill at least twice
1023+
# using the most recent extremity before it gets removed (see below). We
1024+
# chose more than one times the limit in case of failure, but choosing a
1025+
# much larger factor will result in triggering a backfill request much
1026+
# earlier than necessary.
1027+
if current_depth - 2 * limit > max_depth:
1028+
logger.debug(
1029+
"Not backfilling as we don't need to. %d < %d - 2 * %d",
1030+
max_depth,
1031+
current_depth,
1032+
limit,
1033+
)
1034+
return False
1035+
1036+
logger.debug(
1037+
"room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s",
1038+
room_id,
1039+
current_depth,
1040+
max_depth,
1041+
sorted_extremeties_tuple,
1042+
)
1043+
1044+
# We ignore extremities that have a greater depth than our current depth
1045+
# as:
1046+
# 1. we don't really care about getting events that have happened
1047+
# before our current position; and
1048+
# 2. we have likely previously tried and failed to backfill from that
1049+
# extremity, so to avoid getting "stuck" requesting the same
1050+
# backfill repeatedly we drop those extremities.
1051+
filtered_sorted_extremeties_tuple = [
1052+
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
1053+
]
1054+
1055+
# However, we need to check that the filtered extremities are non-empty.
1056+
# If they are empty then either we can a) bail or b) still attempt to
1057+
# backill. We opt to try backfilling anyway just in case we do get
1058+
# relevant events.
1059+
if filtered_sorted_extremeties_tuple:
1060+
sorted_extremeties_tuple = filtered_sorted_extremeties_tuple
1061+
10071062
# We don't want to specify too many extremities as it causes the backfill
10081063
# request URI to be too long.
10091064
extremities = dict(sorted_extremeties_tuple[:5])
10101065

1011-
if current_depth > max_depth:
1012-
logger.debug(
1013-
"Not backfilling as we don't need to. %d < %d", max_depth, current_depth
1014-
)
1015-
return
1016-
10171066
# Now we need to decide which hosts to hit first.
10181067

10191068
# First we try hosts that are already in the room

synapse/handlers/pagination.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,9 @@ async def get_messages(
362362
# if we're going backwards, we might need to backfill. This
363363
# requires that we have a topo token.
364364
if room_token.topological:
365-
max_topo = room_token.topological
365+
curr_topo = room_token.topological
366366
else:
367-
max_topo = await self.store.get_max_topological_token(
367+
curr_topo = await self.store.get_current_topological_token(
368368
room_id, room_token.stream
369369
)
370370

@@ -380,11 +380,11 @@ async def get_messages(
380380
leave_token = await self.store.get_topological_token_for_event(
381381
member_event_id
382382
)
383-
if RoomStreamToken.parse(leave_token).topological < max_topo:
383+
if RoomStreamToken.parse(leave_token).topological < curr_topo:
384384
source_config.from_key = str(leave_token)
385385

386386
await self.hs.get_handlers().federation_handler.maybe_backfill(
387-
room_id, max_topo
387+
room_id, curr_topo, limit=source_config.limit,
388388
)
389389

390390
events, next_key = await self.store.paginate_room_events(

synapse/storage/databases/main/stream.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -648,23 +648,20 @@ async def get_topological_token_for_event(self, event_id: str) -> str:
648648
)
649649
return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
650650

651-
async def get_max_topological_token(self, room_id: str, stream_key: int) -> int:
652-
"""Get the max topological token in a room before the given stream
651+
async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
652+
"""Gets the topological token in a room after or at the given stream
653653
ordering.
654654
655655
Args:
656656
room_id
657657
stream_key
658-
659-
Returns:
660-
The maximum topological token.
661658
"""
662659
sql = (
663-
"SELECT coalesce(max(topological_ordering), 0) FROM events"
664-
" WHERE room_id = ? AND stream_ordering < ?"
660+
"SELECT coalesce(MIN(topological_ordering), 0) FROM events"
661+
" WHERE room_id = ? AND stream_ordering >= ?"
665662
)
666663
row = await self.db_pool.execute(
667-
"get_max_topological_token", None, sql, room_id, stream_key
664+
"get_current_topological_token", None, sql, room_id, stream_key
668665
)
669666
return row[0][0] if row else 0
670667

0 commit comments

Comments
 (0)