Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 886071b

Browse files
committed
Fix backfill not picking up batch events connected to non-base insertion events
Previously, we would only look for a batch event if the insertion event was connected to something else by prev_event. This is only the case for the base insertion event. And instead, we need to look for a batch event whenever we come across an insertion event.
1 parent 260ca06 commit 886071b

File tree

4 files changed

+34
-21
lines changed

4 files changed

+34
-21
lines changed

scripts-dev/complement.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
6565
fi
6666

6767
# Run the tests!
68-
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
68+
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/main_test.go ./tests/msc2716_test.go

synapse/handlers/federation.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ async def _maybe_backfill_inner(
270270
# request URI to be too long.
271271
extremities = dict(sorted_extremeties_tuple[:5])
272272

273+
logger.info("backfill extremities=%s", extremities)
274+
273275
# Now we need to decide which hosts to hit first.
274276

275277
# First we try hosts that are already in the room

synapse/handlers/federation_event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1276,7 +1276,7 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
12761276
await self.persist_events_and_notify(
12771277
room_id,
12781278
tuple(events_to_persist),
1279-
# TODO: Maybe this to get fetched missing events during backfill as backfill also :/
1279+
# TODO: Maybe this to get fetched missing events during backfill as backfilled also :/
12801280
backfilled=True,
12811281
)
12821282

synapse/storage/databases/main/event_federation.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from prometheus_client import Counter, Gauge
2020

21-
from synapse.api.constants import MAX_DEPTH
21+
from synapse.api.constants import MAX_DEPTH, EventTypes
2222
from synapse.api.errors import StoreError
2323
from synapse.api.room_versions import EventFormatVersions, RoomVersion
2424
from synapse.events import EventBase, make_event_from_dict
@@ -1013,8 +1013,8 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
10131013
# search.
10141014

10151015
# Look for the prev_event_id connected to the given event_id
1016-
query = """
1017-
SELECT depth, stream_ordering, prev_event_id FROM event_edges
1016+
connected_prev_event_query = """
1017+
SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
10181018
/* Get the depth and stream_ordering of the prev_event_id from the events table */
10191019
INNER JOIN events
10201020
ON prev_event_id = events.event_id
@@ -1029,7 +1029,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
10291029

10301030
# Look for the "insertion" events connected to the given event_id
10311031
connected_insertion_event_query = """
1032-
SELECT e.depth, e.stream_ordering, i.event_id FROM insertion_event_edges AS i
1032+
SELECT e.depth, e.stream_ordering, i.event_id, e.type FROM insertion_event_edges AS i
10331033
/* Get the depth of the insertion event from the events table */
10341034
INNER JOIN events AS e USING (event_id)
10351035
/* Find an insertion event which points via prev_events to the given event_id */
@@ -1039,7 +1039,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
10391039

10401040
# Find any batch connections of a given insertion event
10411041
batch_connection_query = """
1042-
SELECT e.depth, e.stream_ordering, c.event_id FROM insertion_events AS i
1042+
SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
10431043
/* Find the batch that connects to the given insertion event */
10441044
INNER JOIN batch_events AS c
10451045
ON i.next_batch_id = c.batch_id
@@ -1063,6 +1063,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
10631063
table="events",
10641064
keyvalues={"event_id": event_id, "room_id": room_id},
10651065
retcols=(
1066+
"type",
10661067
"depth",
10671068
"stream_ordering",
10681069
),
@@ -1075,12 +1076,13 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
10751076
-event_lookup_result["depth"],
10761077
-event_lookup_result["stream_ordering"],
10771078
event_id,
1079+
event_lookup_result["type"],
10781080
)
10791081
)
10801082

10811083
while not queue.empty() and len(event_results) < limit:
10821084
try:
1083-
_, _, event_id = queue.get_nowait()
1085+
_, _, event_id, event_type = queue.get_nowait()
10841086
except Empty:
10851087
break
10861088

@@ -1125,46 +1127,55 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
11251127
# Try and find any potential historical batches of message history.
11261128
#
11271129
# First we look for an insertion event connected to the current
1128-
# event (by prev_event). If we find any, we need to go and try to
1129-
# find any batch events connected to the insertion event (by
1130-
# batch_id). If we find any, we'll add them to the queue and
1131-
# navigate up the DAG like normal in the next iteration of the loop.
1130+
# event (by prev_event). If we find any, we'll add them to the queue
1131+
# and navigate up the DAG like normal in the next iteration of the
1132+
# loop.
11321133
txn.execute(
11331134
connected_insertion_event_query, (event_id, limit - len(event_results))
11341135
)
11351136
connected_insertion_event_id_results = txn.fetchall()
1136-
logger.debug(
1137+
logger.info(
11371138
"_get_backfill_events: connected_insertion_event_query %s",
11381139
connected_insertion_event_id_results,
11391140
)
11401141
for row in connected_insertion_event_id_results:
11411142
connected_insertion_event_depth = row[0]
11421143
connected_insertion_event_stream_ordering = row[1]
1143-
connected_insertion_event = row[2]
1144-
if connected_insertion_event not in event_results:
1144+
connected_insertion_event_id = row[2]
1145+
connected_insertion_event_type = row[3]
1146+
if connected_insertion_event_id not in event_results:
11451147
queue.put(
11461148
(
11471149
-connected_insertion_event_depth,
11481150
-connected_insertion_event_stream_ordering,
1149-
connected_insertion_event,
1151+
connected_insertion_event_id,
1152+
connected_insertion_event_type,
11501153
)
11511154
)
11521155

1156+
# Second, we need to go and try to find any batch events connected
1157+
# to a given insertion event (by batch_id). If we find any, we'll
1158+
# add them to the queue and navigate up the DAG like normal in the
1159+
# next iteration of the loop.
1160+
if event_type == EventTypes.MSC2716_INSERTION:
11531161
# Find any batch connections for the given insertion event
11541162
txn.execute(
11551163
batch_connection_query,
1156-
(connected_insertion_event, limit - len(event_results)),
1164+
(event_id, limit - len(event_results)),
11571165
)
11581166
batch_start_event_id_results = txn.fetchall()
1159-
logger.debug(
1167+
logger.info(
11601168
"_get_backfill_events: batch_start_event_id_results %s",
11611169
batch_start_event_id_results,
11621170
)
11631171
for row in batch_start_event_id_results:
11641172
if row[2] not in event_results:
1165-
queue.put((-row[0], -row[1], row[2]))
1173+
queue.put((-row[0], -row[1], row[2], row[3]))
11661174

1167-
txn.execute(query, (event_id, False, limit - len(event_results)))
1175+
txn.execute(
1176+
connected_prev_event_query,
1177+
(event_id, False, limit - len(event_results)),
1178+
)
11681179
prev_event_id_results = txn.fetchall()
11691180
logger.info(
11701181
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
@@ -1177,7 +1188,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
11771188

11781189
for row in prev_event_id_results:
11791190
if row[2] not in event_results:
1180-
queue.put((-row[0], -row[1], row[2]))
1191+
queue.put((-row[0], -row[1], row[2], row[3]))
11811192

11821193
return event_results.values()
11831194

0 commit comments

Comments
 (0)