-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix historical messages backfilling in random order on remote homeservers (MSC2716) #11114
Changes from 45 commits
f30302d
438e222
4983739
a64bb2e
260ca06
886071b
477c15d
4191f56
f39c1da
7da8012
69dfa16
83474d9
1263c7e
ee47878
5bfde7b
2fbe3f1
1d3f417
4a12304
9a6d8fa
3e09d49
5afc264
6ea263b
3d387f9
fb8e281
c772b35
e0ff66d
76d454f
3529449
321f9ea
15c3282
5db717a
e96fd5c
f3b7b3e
7f2105a
246278e
ec35be5
bc0ba8c
363aed6
d771fbd
b559e23
6b64184
1d00043
b071426
ec33a40
b99efa8
3810ae1
df2a152
cc4eb72
47590bb
a38befa
033360a
3f22e42
e5670ff
023bd3e
b3fcffb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -167,9 +167,14 @@ async def _maybe_backfill_inner( | |
| oldest_events_with_depth = ( | ||
| await self.store.get_oldest_event_ids_with_depth_in_room(room_id) | ||
| ) | ||
| insertion_events_to_be_backfilled = ( | ||
| await self.store.get_insertion_event_backwards_extremities_in_room(room_id) | ||
| ) | ||
|
|
||
| insertion_events_to_be_backfilled: Dict[str, int] = {} | ||
| if self.hs.config.experimental.msc2716_enabled: | ||
| insertion_events_to_be_backfilled = ( | ||
| await self.store.get_insertion_event_backward_extremities_in_room( | ||
| room_id | ||
| ) | ||
| ) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just putting this behind the experimental feature flag so people can easily turn off the brokenness if it occurs |
||
| logger.debug( | ||
| "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s", | ||
| oldest_events_with_depth, | ||
|
|
@@ -272,11 +277,12 @@ async def _maybe_backfill_inner( | |
| ] | ||
|
|
||
| logger.debug( | ||
| "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s", | ||
| "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s", | ||
| room_id, | ||
| current_depth, | ||
| limit, | ||
| max_depth, | ||
| len(sorted_extremeties_tuple), | ||
| sorted_extremeties_tuple, | ||
| filtered_sorted_extremeties_tuple, | ||
| ) | ||
|
|
@@ -1051,6 +1057,19 @@ async def on_backfill_request( | |
| limit = min(limit, 100) | ||
|
|
||
| events = await self.store.get_backfill_events(room_id, pdu_list, limit) | ||
| logger.debug( | ||
| "on_backfill_request: backfill events=%s", | ||
| [ | ||
| "event_id=%s,depth=%d,body=%s,prevs=%s\n" | ||
| % ( | ||
| event.event_id, | ||
| event.depth, | ||
| event.content.get("body", event.type), | ||
| event.prev_event_ids(), | ||
| ) | ||
| for event in events | ||
| ], | ||
| ) | ||
|
|
||
| events = await filter_events_for_server(self.storage, origin, events) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -514,7 +514,11 @@ async def backfill( | |
| f"room {ev.room_id}, when we were backfilling in {room_id}" | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ) | ||
|
|
||
| await self._process_pulled_events(dest, events, backfilled=True) | ||
| await self._process_pulled_events( | ||
| dest, | ||
| events, | ||
| backfilled=True, | ||
| ) | ||
|
|
||
| async def _get_missing_events_for_pdu( | ||
| self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int | ||
|
|
@@ -632,11 +636,24 @@ async def _process_pulled_events( | |
| backfilled: True if this is part of a historical batch of events (inhibits | ||
| notification to clients, and validation of device keys.) | ||
| """ | ||
| logger.debug( | ||
| "processing pulled backfilled=%s events=%s", | ||
| backfilled, | ||
| [ | ||
| "event_id=%s,depth=%d,body=%s,prevs=%s\n" | ||
| % ( | ||
| event.event_id, | ||
| event.depth, | ||
| event.content.get("body", event.type), | ||
| event.prev_event_ids(), | ||
| ) | ||
| for event in events | ||
| ], | ||
| ) | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # We want to sort these by depth so we process them and | ||
| # tell clients about them in order. | ||
| sorted_events = sorted(events, key=lambda x: x.depth) | ||
|
|
||
| for ev in sorted_events: | ||
| with nested_logging_context(ev.event_id): | ||
| await self._process_pulled_event(origin, ev, backfilled=backfilled) | ||
|
|
@@ -996,6 +1013,8 @@ async def _process_received_pdu( | |
|
|
||
| await self._run_push_actions_and_persist_event(event, context, backfilled) | ||
|
|
||
| await self._handle_marker_event(origin, event) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this above the early-return when
Related to #11114 (comment) |
||
|
|
||
| if backfilled or context.rejected: | ||
| return | ||
|
|
||
|
|
@@ -1075,8 +1094,6 @@ async def _process_received_pdu( | |
| event.sender, | ||
| ) | ||
|
|
||
| await self._handle_marker_event(origin, event) | ||
|
|
||
| async def _resync_device(self, sender: str) -> None: | ||
| """We have detected that the device list for the given user may be out | ||
| of sync, so we try and resync them. | ||
|
|
@@ -1323,7 +1340,14 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]: | |
| return event, context | ||
|
|
||
| events_to_persist = (x for x in (prep(event) for event in fetched_events) if x) | ||
| await self.persist_events_and_notify(room_id, tuple(events_to_persist)) | ||
| await self.persist_events_and_notify( | ||
| room_id, | ||
| tuple(events_to_persist), | ||
| # Mark these events backfilled as they're historic events that will | ||
| # eventually be backfilled. For example, missing events we fetch | ||
| # during backfill should be marked as backfilled as well. | ||
| backfilled=True, | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ) | ||
|
|
||
| async def _check_event_auth( | ||
| self, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -490,12 +490,12 @@ async def create_event( | |
| requester: Requester, | ||
| event_dict: dict, | ||
| txn_id: Optional[str] = None, | ||
| allow_no_prev_events: bool = False, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did we move this up? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To group it next to |
||
| prev_event_ids: Optional[List[str]] = None, | ||
| auth_event_ids: Optional[List[str]] = None, | ||
| require_consent: bool = True, | ||
| outlier: bool = False, | ||
| historical: bool = False, | ||
| allow_no_prev_events: bool = False, | ||
| depth: Optional[int] = None, | ||
| ) -> Tuple[EventBase, EventContext]: | ||
| """ | ||
|
|
@@ -510,6 +510,10 @@ async def create_event( | |
| requester | ||
| event_dict: An entire event | ||
| txn_id | ||
| allow_no_prev_events: Whether to allow this event to be created an empty | ||
| list of prev_events. Normally this is prohibited just because most | ||
| events should have a prev_event and we should only use this in special | ||
| cases like MSC2716. | ||
| prev_event_ids: | ||
| the forward extremities to use as the prev_events for the | ||
| new event. | ||
|
|
@@ -604,10 +608,10 @@ async def create_event( | |
| event, context = await self.create_new_client_event( | ||
| builder=builder, | ||
| requester=requester, | ||
| allow_no_prev_events=allow_no_prev_events, | ||
| prev_event_ids=prev_event_ids, | ||
| auth_event_ids=auth_event_ids, | ||
| depth=depth, | ||
| allow_no_prev_events=allow_no_prev_events, | ||
| ) | ||
|
|
||
| # In an ideal world we wouldn't need the second part of this condition. However, | ||
|
|
@@ -764,6 +768,7 @@ async def create_and_send_nonmember_event( | |
| self, | ||
| requester: Requester, | ||
| event_dict: dict, | ||
| allow_no_prev_events: bool = False, | ||
| prev_event_ids: Optional[List[str]] = None, | ||
| auth_event_ids: Optional[List[str]] = None, | ||
| ratelimit: bool = True, | ||
|
|
@@ -781,6 +786,10 @@ async def create_and_send_nonmember_event( | |
| Args: | ||
| requester: The requester sending the event. | ||
| event_dict: An entire event. | ||
| allow_no_prev_events: Whether to allow this event to be created an empty | ||
| list of prev_events. Normally this is prohibited just because most | ||
| events should have a prev_event and we should only use this in special | ||
| cases like MSC2716. | ||
| prev_event_ids: | ||
| The event IDs to use as the prev events. | ||
| Should normally be left as None to automatically request them | ||
|
|
@@ -880,16 +889,20 @@ async def create_new_client_event( | |
| self, | ||
| builder: EventBuilder, | ||
| requester: Optional[Requester] = None, | ||
| allow_no_prev_events: bool = False, | ||
| prev_event_ids: Optional[List[str]] = None, | ||
| auth_event_ids: Optional[List[str]] = None, | ||
| depth: Optional[int] = None, | ||
| allow_no_prev_events: bool = False, | ||
| ) -> Tuple[EventBase, EventContext]: | ||
| """Create a new event for a local client | ||
|
|
||
| Args: | ||
| builder: | ||
| requester: | ||
| allow_no_prev_events: Whether to allow this event to be created an empty | ||
| list of prev_events. Normally this is prohibited just because most | ||
| events should have a prev_event and we should only use this in special | ||
| cases like MSC2716. | ||
| prev_event_ids: | ||
| the forward extremities to use as the prev_events for the | ||
| new event. | ||
|
|
@@ -908,7 +921,6 @@ async def create_new_client_event( | |
| Returns: | ||
| Tuple of created event, context | ||
| """ | ||
|
|
||
| # Strip down the auth_event_ids to only what we need to auth the event. | ||
| # For example, we don't need extra m.room.member that don't match event.sender | ||
| full_state_ids_at_event = None | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,10 +13,6 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def generate_fake_event_id() -> str: | ||
| return "$fake_" + random_string(43) | ||
|
|
||
|
|
||
| class RoomBatchHandler: | ||
| def __init__(self, hs: "HomeServer"): | ||
| self.hs = hs | ||
|
|
@@ -184,9 +180,9 @@ async def persist_state_events_at_start( | |
|
|
||
| # Make the state events float off on their own so we don't have a | ||
| # bunch of `@mxid joined the room` noise between each batch | ||
| prev_event_id_for_state_chain = generate_fake_event_id() | ||
| prev_event_ids_for_state_chain: List[str] = [] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| for state_event in state_events_at_start: | ||
| for index, state_event in enumerate(state_events_at_start): | ||
| assert_params_in_dict( | ||
| state_event, ["type", "origin_server_ts", "content", "sender"] | ||
| ) | ||
|
|
@@ -222,7 +218,10 @@ async def persist_state_events_at_start( | |
| content=event_dict["content"], | ||
| outlier=True, | ||
| historical=True, | ||
| prev_event_ids=[prev_event_id_for_state_chain], | ||
| # Only the first event in the chain should be floating. | ||
| # The rest should hang off each other in a chain. | ||
| allow_no_prev_events=index == 0, | ||
| prev_event_ids=prev_event_ids_for_state_chain, | ||
| # Make sure to use a copy of this list because we modify it | ||
| # later in the loop here. Otherwise it will be the same | ||
| # reference and also update in the event when we append later. | ||
|
|
@@ -242,7 +241,10 @@ async def persist_state_events_at_start( | |
| event_dict, | ||
| outlier=True, | ||
| historical=True, | ||
| prev_event_ids=[prev_event_id_for_state_chain], | ||
| # Only the first event in the chain should be floating. | ||
| # The rest should hang off each other in a chain. | ||
| allow_no_prev_events=index == 0, | ||
| prev_event_ids=prev_event_ids_for_state_chain, | ||
| # Make sure to use a copy of this list because we modify it | ||
| # later in the loop here. Otherwise it will be the same | ||
| # reference and also update in the event when we append later. | ||
|
|
@@ -253,15 +255,14 @@ async def persist_state_events_at_start( | |
| state_event_ids_at_start.append(event_id) | ||
| auth_event_ids.append(event_id) | ||
| # Connect all the state in a floating chain | ||
| prev_event_id_for_state_chain = event_id | ||
| prev_event_ids_for_state_chain = [event_id] | ||
|
|
||
| return state_event_ids_at_start | ||
|
|
||
| async def persist_historical_events( | ||
| self, | ||
| events_to_create: List[JsonDict], | ||
| room_id: str, | ||
| initial_prev_event_ids: List[str], | ||
| inherited_depth: int, | ||
| auth_event_ids: List[str], | ||
| app_service_requester: Requester, | ||
|
|
@@ -277,9 +278,6 @@ async def persist_historical_events( | |
| events_to_create: List of historical events to create in JSON | ||
| dictionary format. | ||
| room_id: Room where you want the events persisted in. | ||
| initial_prev_event_ids: These will be the prev_events for the first | ||
| event created. Each event created afterwards will point to the | ||
| previous event created. | ||
| inherited_depth: The depth to create the events at (you will | ||
| probably by calling inherit_depth_from_prev_ids(...)). | ||
| auth_event_ids: Define which events allow you to create the given | ||
|
|
@@ -291,11 +289,13 @@ async def persist_historical_events( | |
| """ | ||
| assert app_service_requester.app_service | ||
|
|
||
| prev_event_ids = initial_prev_event_ids.copy() | ||
| # Make the historical event chain float off on its own which causes the | ||
| # HS to ask for the state at the start of the batch later. | ||
| prev_event_ids: List[str] = [] | ||
|
|
||
| event_ids = [] | ||
| events_to_persist = [] | ||
| for ev in events_to_create: | ||
| for index, ev in enumerate(events_to_create): | ||
| assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) | ||
|
|
||
| assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % ( | ||
|
|
@@ -319,6 +319,9 @@ async def persist_historical_events( | |
| ev["sender"], app_service_requester.app_service | ||
| ), | ||
| event_dict, | ||
| # Only the first event in the chain should be floating. | ||
| # The rest should hang off each other in a chain. | ||
| allow_no_prev_events=index == 0, | ||
| prev_event_ids=event_dict.get("prev_events"), | ||
| auth_event_ids=auth_event_ids, | ||
| historical=True, | ||
|
|
@@ -370,7 +373,6 @@ async def handle_batch_of_events( | |
| events_to_create: List[JsonDict], | ||
| room_id: str, | ||
| batch_id_to_connect_to: str, | ||
| initial_prev_event_ids: List[str], | ||
| inherited_depth: int, | ||
| auth_event_ids: List[str], | ||
| app_service_requester: Requester, | ||
|
|
@@ -385,9 +387,6 @@ async def handle_batch_of_events( | |
| room_id: Room where you want the events created in. | ||
| batch_id_to_connect_to: The batch_id from the insertion event you | ||
| want this batch to connect to. | ||
| initial_prev_event_ids: These will be the prev_events for the first | ||
| event created. Each event created afterwards will point to the | ||
| previous event created. | ||
| inherited_depth: The depth to create the events at (you will | ||
| probably by calling inherit_depth_from_prev_ids(...)). | ||
| auth_event_ids: Define which events allow you to create the given | ||
|
|
@@ -436,7 +435,6 @@ async def handle_batch_of_events( | |
| event_ids = await self.persist_historical_events( | ||
| events_to_create=events_to_create, | ||
| room_id=room_id, | ||
| initial_prev_event_ids=initial_prev_event_ids, | ||
| inherited_depth=inherited_depth, | ||
| auth_event_ids=auth_event_ids, | ||
| app_service_requester=app_service_requester, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.