- 
          
- 
                Notifications
    You must be signed in to change notification settings 
- Fork 2.1k
          Optimize backfill receiving to have less missing prev_event thrashing (scratch)
          #13864
        
      Conversation
Fix #13856 `_invalidate_caches_for_event` doesn't run in monolith mode which means we never even tried to clear the `have_seen_event` and other caches. And even in worker mode, it only runs on the workers, not the master (AFAICT). Additionally there is bug with the key being wrong so `_invalidate_caches_for_event` never invalidates the `have_seen_event` cache even when it does run. Wrong: ```py self.have_seen_event.invalidate((room_id, event_id)) ``` Correct: ```py self.have_seen_event.invalidate(((room_id, event_id),)) ```
Fix #13856 `_invalidate_caches_for_event` doesn't run in monolith mode which means we never even tried to clear the `have_seen_event` and other caches. And even in worker mode, it only runs on the workers, not the master (AFAICT). Additionally there is bug with the key being wrong so `_invalidate_caches_for_event` never invalidates the `have_seen_event` cache even when it does run. Wrong: ```py self.have_seen_event.invalidate((room_id, event_id)) ``` Correct: ```py self.have_seen_event.invalidate(((room_id, event_id),)) ```
…rder) and persist in the oldest -> newest to get the least missing prev_event fetch thrashing
prev_event thrashing
      prev_event thrashingprev_event thrashing
      …not-being-invalidated
Copying what #13796 is doing
As mentioned by @erikjohnston, #13865 (comment)
…lidated' into maddlittlemods/msc2716-many-batches-optimization Conflicts: tests/storage/databases/main/test_events_worker.py
…into maddlittlemods/msc2716-many-batches-optimization Conflicts: synapse/handlers/federation.py synapse/storage/databases/main/cache.py synapse/storage/databases/main/event_federation.py
…sertion event rejected
…in so everyhting is valid We are going to lose the benefit of keeping the join noise out of the timeline. And will probably have to hide "historical" state on the client.
| def test_process_pulled_events_asdf(self) -> None: | ||
| main_store = self.hs.get_datastores().main | ||
| state_storage_controller = self.hs.get_storage_controllers().state | ||
|  | ||
| def _debug_event_string(event: EventBase) -> str: | ||
| debug_body = event.content.get("body", event.type) | ||
| maybe_state_key = getattr(event, "state_key", None) | ||
| return f"event_id={event.event_id},depth={event.depth},body={debug_body}({maybe_state_key}),prevs={event.prev_event_ids()}" | ||
|  | ||
| known_event_dict: Dict[str, Tuple[EventBase, List[EventBase]]] = {} | ||
|  | ||
| def _add_to_known_event_list( | ||
| event: EventBase, state_events: Optional[List[EventBase]] = None | ||
| ) -> None: | ||
| if state_events is None: | ||
| state_map = self.get_success( | ||
| state_storage_controller.get_state_for_event(event.event_id) | ||
| ) | ||
| state_events = list(state_map.values()) | ||
|  | ||
| known_event_dict[event.event_id] = (event, state_events) | ||
|  | ||
| async def get_room_state_ids( | ||
| destination: str, room_id: str, event_id: str | ||
| ) -> JsonDict: | ||
| self.assertEqual(destination, self.OTHER_SERVER_NAME) | ||
| known_event_info = known_event_dict.get(event_id) | ||
| if known_event_info is None: | ||
| self.fail( | ||
| f"stubbed get_room_state_ids: Event ({event_id}) not part of our known events list" | ||
| ) | ||
|  | ||
| known_event, known_event_state_list = known_event_info | ||
| logger.info( | ||
| "stubbed get_room_state_ids: destination=%s event_id=%s auth_event_ids=%s", | ||
| destination, | ||
| event_id, | ||
| known_event.auth_event_ids(), | ||
| ) | ||
|  | ||
| # self.assertEqual(event_id, missing_event.event_id) | ||
| return { | ||
| "pdu_ids": [ | ||
| state_event.event_id for state_event in known_event_state_list | ||
| ], | ||
| "auth_chain_ids": known_event.auth_event_ids(), | ||
| } | ||
|  | ||
| async def get_room_state( | ||
| room_version: RoomVersion, destination: str, room_id: str, event_id: str | ||
| ) -> StateRequestResponse: | ||
| self.assertEqual(destination, self.OTHER_SERVER_NAME) | ||
| known_event_info = known_event_dict.get(event_id) | ||
| if known_event_info is None: | ||
| self.fail( | ||
| f"stubbed get_room_state: Event ({event_id}) not part of our known events list" | ||
| ) | ||
|  | ||
| known_event, known_event_state_list = known_event_info | ||
| logger.info( | ||
| "stubbed get_room_state: destination=%s event_id=%s auth_event_ids=%s", | ||
| destination, | ||
| event_id, | ||
| known_event.auth_event_ids(), | ||
| ) | ||
|  | ||
| auth_event_ids = known_event.auth_event_ids() | ||
| auth_events = [] | ||
| for auth_event_id in auth_event_ids: | ||
| known_event_info = known_event_dict.get(event_id) | ||
| if known_event_info is None: | ||
| self.fail( | ||
| f"stubbed get_room_state: Auth event ({auth_event_id}) is not part of our known events list" | ||
| ) | ||
| known_auth_event, _ = known_event_info | ||
| auth_events.append(known_auth_event) | ||
|  | ||
| return StateRequestResponse( | ||
| state=known_event_state_list, | ||
| auth_events=auth_events, | ||
| ) | ||
|  | ||
| async def get_event(destination: str, event_id: str, timeout=None): | ||
| self.assertEqual(destination, self.OTHER_SERVER_NAME) | ||
| known_event_info = known_event_dict.get(event_id) | ||
| if known_event_info is None: | ||
| self.fail( | ||
| f"stubbed get_event: Event ({event_id}) not part of our known events list" | ||
| ) | ||
|  | ||
| known_event, _ = known_event_info | ||
| return {"pdus": [known_event.get_pdu_json()]} | ||
|  | ||
| self.mock_federation_transport_client.get_room_state_ids.side_effect = ( | ||
| get_room_state_ids | ||
| ) | ||
| self.mock_federation_transport_client.get_room_state.side_effect = ( | ||
| get_room_state | ||
| ) | ||
|  | ||
| self.mock_federation_transport_client.get_event.side_effect = get_event | ||
|  | ||
| # create the room | ||
| room_creator = self.appservice.sender | ||
| room_id = self.helper.create_room_as( | ||
| room_creator=self.appservice.sender, tok=self.appservice.token | ||
| ) | ||
| room_version = self.get_success(main_store.get_room_version(room_id)) | ||
|  | ||
| event_before = self.get_success( | ||
| inject_event( | ||
| self.hs, | ||
| room_id=room_id, | ||
| sender=room_creator, | ||
| type=EventTypes.Message, | ||
| content={"body": "eventBefore0", "msgtype": "m.text"}, | ||
| ) | ||
| ) | ||
| _add_to_known_event_list(event_before) | ||
|  | ||
| event_after = self.get_success( | ||
| inject_event( | ||
| self.hs, | ||
| room_id=room_id, | ||
| sender=room_creator, | ||
| type=EventTypes.Message, | ||
| content={"body": "eventAfter0", "msgtype": "m.text"}, | ||
| ) | ||
| ) | ||
| _add_to_known_event_list(event_after) | ||
|  | ||
| state_map = self.get_success( | ||
| state_storage_controller.get_state_for_event(event_before.event_id) | ||
| ) | ||
|  | ||
| room_create_event = state_map.get((EventTypes.Create, "")) | ||
| pl_event = state_map.get((EventTypes.PowerLevels, "")) | ||
| as_membership_event = state_map.get((EventTypes.Member, room_creator)) | ||
| assert room_create_event is not None | ||
| assert pl_event is not None | ||
| assert as_membership_event is not None | ||
|  | ||
| for state_event in state_map.values(): | ||
| _add_to_known_event_list(state_event) | ||
|  | ||
| # This should be the successor of the event we want to insert next to | ||
| # (the successor of event_before is event_after). | ||
| inherited_depth = event_after.depth | ||
|  | ||
| historical_base_auth_event_ids = [ | ||
| room_create_event.event_id, | ||
| pl_event.event_id, | ||
| ] | ||
| historical_state_events = list(state_map.values()) | ||
| historical_state_event_ids = [ | ||
| state_event.event_id for state_event in historical_state_events | ||
| ] | ||
|  | ||
| maria_mxid = "@maria:test" | ||
| maria_membership_event, _ = self.get_success( | ||
| create_event( | ||
| self.hs, | ||
| room_id=room_id, | ||
| sender=maria_mxid, | ||
| state_key=maria_mxid, | ||
| type=EventTypes.Member, | ||
| content={ | ||
| "membership": "join", | ||
| }, | ||
| # It all works when I add a prev_event for the floating | ||
| # insertion event but the event no longer floats. | ||
| # It's able to resolve state at the prev_events though. | ||
| prev_event_ids=[event_before.event_id], | ||
| # allow_no_prev_events=True, | ||
| # prev_event_ids=[], | ||
| auth_event_ids=historical_base_auth_event_ids, | ||
| state_event_ids=historical_state_event_ids, | ||
| depth=inherited_depth, | ||
| ) | ||
| ) | ||
| _add_to_known_event_list(maria_membership_event, historical_state_events) | ||
|  | ||
| historical_state_events.append(maria_membership_event) | ||
| historical_state_event_ids.append(maria_membership_event.event_id) | ||
|  | ||
| batch_id = random_string(8) | ||
| next_batch_id = random_string(8) | ||
| insertion_event, _ = self.get_success( | ||
| create_event( | ||
| self.hs, | ||
| room_id=room_id, | ||
| sender=room_creator, | ||
| type=EventTypes.MSC2716_INSERTION, | ||
| content={ | ||
| EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id, | ||
| EventContentFields.MSC2716_HISTORICAL: True, | ||
| }, | ||
| # The difference from the actual room /batch_send is that this is normally | ||
| # floating as well. But seems to work once we connect it to the | ||
| # floating historical state chain. | ||
| prev_event_ids=[maria_membership_event.event_id], | ||
| # allow_no_prev_events=True, | ||
| # prev_event_ids=[], | ||
| auth_event_ids=[ | ||
| *historical_base_auth_event_ids, | ||
| as_membership_event.event_id, | ||
| ], | ||
| state_event_ids=historical_state_event_ids, | ||
| depth=inherited_depth, | ||
| ) | ||
| ) | ||
| _add_to_known_event_list(insertion_event, historical_state_events) | ||
| historical_message_event, _ = self.get_success( | ||
| create_event( | ||
| self.hs, | ||
| room_id=room_id, | ||
| sender=maria_mxid, | ||
| type=EventTypes.Message, | ||
| content={"body": "Historical message", "msgtype": "m.text"}, | ||
| prev_event_ids=[insertion_event.event_id], | ||
| auth_event_ids=[ | ||
| *historical_base_auth_event_ids, | ||
| maria_membership_event.event_id, | ||
| ], | ||
| depth=inherited_depth, | ||
| ) | ||
| ) | ||
| _add_to_known_event_list(historical_message_event, historical_state_events) | ||
| batch_event, _ = self.get_success( | ||
| create_event( | ||
| self.hs, | ||
| room_id=room_id, | ||
| sender=room_creator, | ||
| type=EventTypes.MSC2716_BATCH, | ||
| content={ | ||
| EventContentFields.MSC2716_BATCH_ID: batch_id, | ||
| EventContentFields.MSC2716_HISTORICAL: True, | ||
| }, | ||
| prev_event_ids=[historical_message_event.event_id], | ||
| auth_event_ids=[ | ||
| *historical_base_auth_event_ids, | ||
| as_membership_event.event_id, | ||
| ], | ||
| depth=inherited_depth, | ||
| ) | ||
| ) | ||
| _add_to_known_event_list(batch_event, historical_state_events) | ||
| base_insertion_event, base_insertion_event_context = self.get_success( | ||
| create_event( | ||
| self.hs, | ||
| room_id=room_id, | ||
| sender=room_creator, | ||
| type=EventTypes.MSC2716_INSERTION, | ||
| content={ | ||
| EventContentFields.MSC2716_NEXT_BATCH_ID: batch_id, | ||
| EventContentFields.MSC2716_HISTORICAL: True, | ||
| }, | ||
| prev_event_ids=[event_before.event_id], | ||
| auth_event_ids=[ | ||
| *historical_base_auth_event_ids, | ||
| as_membership_event.event_id, | ||
| ], | ||
| state_event_ids=historical_state_event_ids, | ||
| depth=inherited_depth, | ||
| ) | ||
| ) | ||
| _add_to_known_event_list(base_insertion_event, historical_state_events) | ||
|  | ||
| # Chronological | ||
| pulled_events: List[EventBase] = [ | ||
| # Beginning of room (oldest messages) | ||
| # *list(state_map.values()), | ||
| room_create_event, | ||
| pl_event, | ||
| as_membership_event, | ||
| state_map.get((EventTypes.JoinRules, "")), | ||
| state_map.get((EventTypes.RoomHistoryVisibility, "")), | ||
| event_before, | ||
| # HISTORICAL MESSAGE END | ||
| insertion_event, | ||
| historical_message_event, | ||
| batch_event, | ||
| base_insertion_event, | ||
| # HISTORICAL MESSAGE START | ||
| event_after, | ||
| # Latest in the room (newest messages) | ||
| ] | ||
|  | ||
| # pulled_events: List[EventBase] = [ | ||
| # # Beginning of room (oldest messages) | ||
| # # *list(state_map.values()), | ||
| # room_create_event, | ||
| # pl_event, | ||
| # as_membership_event, | ||
| # state_map.get((EventTypes.JoinRules, "")), | ||
| # state_map.get((EventTypes.RoomHistoryVisibility, "")), | ||
| # event_before, | ||
| # # HISTORICAL MESSAGE END | ||
| # insertion_event, | ||
| # historical_message_event, | ||
| # batch_event, | ||
| # base_insertion_event, | ||
| # # HISTORICAL MESSAGE START | ||
| # event_after, | ||
| # # Latest in the room (newest messages) | ||
| # ] | ||
|  | ||
| # The order that we get after passing reverse chronological events in | ||
| # that mostly passes. Only the insertion event is rejected but the | ||
| # historical messages appear /messages scrollback. | ||
| # pulled_events: List[EventBase] = [ | ||
| # # Beginning of room (oldest messages) | ||
| # # *list(state_map.values()), | ||
| # room_create_event, | ||
| # pl_event, | ||
| # as_membership_event, | ||
| # state_map.get((EventTypes.JoinRules, "")), | ||
| # state_map.get((EventTypes.RoomHistoryVisibility, "")), | ||
| # event_before, | ||
| # event_after, | ||
| # base_insertion_event, | ||
| # batch_event, | ||
| # historical_message_event, | ||
| # insertion_event, | ||
| # # Latest in the room (newest messages) | ||
| # ] | ||
|  | ||
| import logging | ||
|  | ||
| logger = logging.getLogger(__name__) | ||
| logger.info( | ||
| "pulled_events=%s", | ||
| json.dumps( | ||
| [_debug_event_string(event) for event in pulled_events], | ||
| indent=4, | ||
| ), | ||
| ) | ||
|  | ||
| for event, _ in known_event_dict.values(): | ||
| if event.internal_metadata.outlier: | ||
| self.fail("Our pristine events should not be marked as an outlier") | ||
|  | ||
| self.get_success( | ||
| self.hs.get_federation_event_handler()._process_pulled_events( | ||
| self.OTHER_SERVER_NAME, | ||
| [ | ||
| # Make copies of events since Synapse modifies the | ||
| # internal_metadata in place and we want to keep our | ||
| # pristine copies | ||
| make_event_from_dict(pulled_event.get_pdu_json(), room_version) | ||
| for pulled_event in pulled_events | ||
| ], | ||
| backfilled=True, | ||
| ) | ||
| ) | ||
|  | ||
| from_token = self.get_success( | ||
| self.hs.get_event_sources().get_current_token_for_pagination(room_id) | ||
| ) | ||
| actual_events_in_room_reverse_chronological, _ = self.get_success( | ||
| main_store.paginate_room_events( | ||
| room_id, from_key=from_token.room_key, limit=100, direction="b" | ||
| ) | ||
| ) | ||
|  | ||
| # We have to reverse the list to make it chronological. | ||
| actual_events_in_room_chronological = list( | ||
| reversed(actual_events_in_room_reverse_chronological) | ||
| ) | ||
|  | ||
| expected_event_order = [ | ||
| # Beginning of room (oldest messages) | ||
| # *list(state_map.values()), | ||
| room_create_event, | ||
| as_membership_event, | ||
| pl_event, | ||
| state_map.get((EventTypes.JoinRules, "")), | ||
| state_map.get((EventTypes.RoomHistoryVisibility, "")), | ||
| event_before, | ||
| # HISTORICAL MESSAGE END | ||
| insertion_event, | ||
| historical_message_event, | ||
| batch_event, | ||
| base_insertion_event, | ||
| # HISTORICAL MESSAGE START | ||
| event_after, | ||
| # Latest in the room (newest messages) | ||
| ] | ||
|  | ||
| event_id_diff = {event.event_id for event in expected_event_order} - { | ||
| event.event_id for event in actual_events_in_room_chronological | ||
| } | ||
| event_diff_ordered = [ | ||
| event for event in expected_event_order if event.event_id in event_id_diff | ||
| ] | ||
| event_id_extra = { | ||
| event.event_id for event in actual_events_in_room_chronological | ||
| } - {event.event_id for event in expected_event_order} | ||
| event_extra_ordered = [ | ||
| event | ||
| for event in actual_events_in_room_chronological | ||
| if event.event_id in event_id_extra | ||
| ] | ||
| assertion_message = ( | ||
| "Debug info:\nActual events missing from expected list: %s\nActual events contain %d additional events compared to expected: %s\nExpected event order: %s\nActual event order: %s" | ||
| % ( | ||
| json.dumps( | ||
| [_debug_event_string(event) for event in event_diff_ordered], | ||
| indent=4, | ||
| ), | ||
| len(event_extra_ordered), | ||
| json.dumps( | ||
| [_debug_event_string(event) for event in event_extra_ordered], | ||
| indent=4, | ||
| ), | ||
| json.dumps( | ||
| [_debug_event_string(event) for event in expected_event_order], | ||
| indent=4, | ||
| ), | ||
| json.dumps( | ||
| [ | ||
| _debug_event_string(event) | ||
| for event in actual_events_in_room_chronological | ||
| ], | ||
| indent=4, | ||
| ), | ||
| ) | ||
| ) | ||
|  | ||
| # assert ( | ||
| # actual_events_in_room_chronological == expected_event_order | ||
| # ), assertion_message | ||
|  | ||
| self.assertEqual( | ||
| [event.event_id for event in actual_events_in_room_chronological], | ||
| [event.event_id for event in expected_event_order], | ||
| assertion_message, | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice test for figuring out the mess with historical events from MSC2716 being rejected.
It eliminates all the federation variables when trying to do the same thing in Complement. And is so much faster to iterate on. Seconds vs minutes.
| # It all works when I add a prev_event for the floating | ||
| # insertion event but the event no longer floats. | ||
| # It's able to resolve state at the prev_events though. | ||
| prev_event_ids=[event_before.event_id], | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can work without connecting it to event_before but then we rely on maria_membership_event not being gossiped about during backfill because if it is, then it will be rejected and we can't use a rejected event to auth the following historical events.
_process_pulled_event
_process_pulled_event
_compute_event_context_with_maybe_missing_prevs
compute_event_context
The reason it works when it's not gossiped about is that _compute_event_context_with_maybe_missing_prevs fills in the state_ids_before_event and resolves the state magically for us without rejecting.
synapse/synapse/handlers/federation_event.py
Lines 971 to 1037 in 5f659d4
| logger.info( | |
| "Event %s is missing prev_events %s: calculating state for a " | |
| "backwards extremity", | |
| event_id, | |
| shortstr(missing_prevs), | |
| ) | |
| # Calculate the state after each of the previous events, and | |
| # resolve them to find the correct state at the current event. | |
| try: | |
| # Determine whether we may be about to retrieve partial state | |
| # Events may be un-partial stated right after we compute the partial state | |
| # flag, but that's okay, as long as the flag errs on the conservative side. | |
| partial_state_flags = await self._store.get_partial_state_events(seen) | |
| partial_state = any(partial_state_flags.values()) | |
| # Get the state of the events we know about | |
| ours = await self._state_storage_controller.get_state_groups_ids( | |
| room_id, seen, await_full_state=False | |
| ) | |
| # state_maps is a list of mappings from (type, state_key) to event_id | |
| state_maps: List[StateMap[str]] = list(ours.values()) | |
| # we don't need this any more, let's delete it. | |
| del ours | |
| # Ask the remote server for the states we don't | |
| # know about | |
| for p in missing_prevs: | |
| logger.info("Requesting state after missing prev_event %s", p) | |
| with nested_logging_context(p): | |
| # note that if any of the missing prevs share missing state or | |
| # auth events, the requests to fetch those events are deduped | |
| # by the get_pdu_cache in federation_client. | |
| remote_state_map = ( | |
| await self._get_state_ids_after_missing_prev_event( | |
| dest, room_id, p | |
| ) | |
| ) | |
| state_maps.append(remote_state_map) | |
| room_version = await self._store.get_room_version_id(room_id) | |
| state_map = await self._state_resolution_handler.resolve_events_with_store( | |
| room_id, | |
| room_version, | |
| state_maps, | |
| event_map={event_id: event}, | |
| state_res_store=StateResolutionStore(self._store), | |
| ) | |
| except Exception: | |
| logger.warning( | |
| "Error attempting to resolve state at missing prev_events", | |
| exc_info=True, | |
| ) | |
| raise FederationError( | |
| "ERROR", | |
| 403, | |
| "We can't get valid state history.", | |
| affected=event_id, | |
| ) | |
| return await self._state_handler.compute_event_context( | |
| event, state_ids_before_event=state_map, partial_state=partial_state | |
| ) | 
I have a suspicion that we will also stop doing sort of thing since we also removed auth event resolving in #12943
| events_to_create=events_to_create, | ||
| room_id=room_id, | ||
| inherited_depth=inherited_depth, | ||
| state_chain_event_id_to_connect_to=state_chain_event_id_to_connect_to, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes make this go from before -> after where we now connect historical batch to the historical state chain. And connect the state chain to the prev_event so everything has valid prev_events and auth_events to resolve organically from the DAG.
We do lose the benefit of removing the noise @mxid joined the room noise between each batch but we might have to solve this on the client by hiding historical state.
Before
flowchart BT
    A --- annotation1>"Note: older events are at the top"]
    subgraph live timeline
        marker1>m.room.marker] ----> B -----------------> A
    end
    
    subgraph batch0
        batch0-batch[[m.room.batch]] --> batch0-2(("2")) --> batch0-1((1)) --> batch0-0((0)) --> batch0-insertion[/m.room.insertion\]
    end
    subgraph batch1
        batch1-batch[[m.room.batch]] --> batch1-2(("2")) --> batch1-1((1)) --> batch1-0((0)) --> batch1-insertion[/m.room.insertion\]
    end
    
    subgraph batch2
        batch2-batch[[m.room.batch]] --> batch2-2(("2")) --> batch2-1((1)) --> batch2-0((0)) --> batch2-insertion[/m.room.insertion\]
    end
    batch0-insertion -.-> memberBob0(["m.room.member (bob)"]) --> memberAlice0(["m.room.member (alice)"])
    batch1-insertion -.-> memberBob1(["m.room.member (bob)"]) --> memberAlice1(["m.room.member (alice)"])
    batch2-insertion -.-> memberBob2(["m.room.member (bob)"]) --> memberAlice2(["m.room.member (alice)"])
    marker1 -.-> batch0-insertionBase
    batch0-insertionBase[/m.room.insertion\] ---------------> A
    batch0-batch -.-> batch0-insertionBase
    batch1-batch -.-> batch0-insertion
    batch2-batch -.-> batch1-insertion
    %% make the annotation links invisible
    linkStyle 0 stroke-width:2px,fill:none,stroke:none;
    After
flowchart BT
    A --- annotation1>"Note: older events are at the top"]
    subgraph live timeline
        marker1>m.room.marker] ----> B -----------------> A
    end
    
    subgraph batch0
        batch0-batch[[m.room.batch]] --> batch0-2(("2")) --> batch0-1((1)) --> batch0-0((0)) --> batch0-insertion[/m.room.insertion\]
    end
    subgraph batch1
        batch1-batch[[m.room.batch]] --> batch1-2(("2")) --> batch1-1((1)) --> batch1-0((0)) --> batch1-insertion[/m.room.insertion\]
    end
    
    subgraph batch2
        batch2-batch[[m.room.batch]] --> batch2-2(("2")) --> batch2-1((1)) --> batch2-0((0)) --> batch2-insertion[/m.room.insertion\]
    end
    batch0-insertion --> memberBob0(["m.room.member (bob)"]) --> memberAlice0(["m.room.member (alice)"]) --> A
    batch1-insertion --> memberBob1(["m.room.member (bob)"]) --> memberAlice1(["m.room.member (alice)"]) --> A
    batch2-insertion --> memberBob2(["m.room.member (bob)"]) --> memberAlice2(["m.room.member (alice)"]) --> A
    marker1 -.-> batch0-insertionBase
    batch0-insertionBase[/m.room.insertion\] ---------------> A
    batch0-batch -.-> batch0-insertionBase
    batch1-batch -.-> batch0-insertion
    batch2-batch -.-> batch1-insertion
    %% make the annotation links invisible
    linkStyle 0 stroke-width:2px,fill:none,stroke:none;
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split out to #13971
Pulled from scratch changes in, #13864
prev_event thrashingprev_event thrashing (scratch)
      Pulled from scratch changes in, #13864
Addressing:
So Synapse is fast enough to merge this MSC2716 Complement test for importing many batches, matrix-org/complement#214 (comment)
Complement tests: matrix-org/complement#214
Dev notes
Why are we seeing some of these historical events being rejected?
I think it's because of changes to the auth-event reconciliation in #12943 (comment) which I was brought in on but didn't realize the magnitude of the change since the MSC2716 tests still passed. Although not sure I realized that it actually removed one the MSC2716 tests from the Synapse code base.
Why aren't we sorting topologically when receiving backfill events?
See #11114 (comment)
How is
stream_orderinggiven out?Persisting events
See
_persist_events_and_state_updatesfor where we normally assignstream_orderingand continue down to_persist_events_txn.Pretty print list
This one is great if you're printing a JSON-like thing:
This one sucks because it doesn't print the first and last items on indented new lines
Random
Pull Request Checklist
EventStoretoEventWorkerStore.".code blocks.(run the linters)