|  | 
| 75 | 75 | from synapse.storage.databases.main.events import PartialStateConflictError | 
| 76 | 76 | from synapse.storage.databases.main.events_worker import EventRedactBehaviour | 
| 77 | 77 | from synapse.storage.state import StateFilter | 
|  | 78 | +from synapse.storage.util.id_generators import AbstractStreamIdGenerator | 
| 78 | 79 | from synapse.types import ( | 
| 79 | 80 |     PersistedEventPosition, | 
| 80 | 81 |     RoomStreamToken, | 
| @@ -644,9 +645,71 @@ async def backfill( | 
| 644 | 645 |                         f"room {ev.room_id}, when we were backfilling in {room_id}" | 
| 645 | 646 |                     ) | 
| 646 | 647 | 
 | 
|  | 648 | +            # We expect the events from the `/backfill` response to start from | 
|  | 649 | +            # `?v` and include events that preceded it (so the list will be | 
|  | 650 | +            # newest -> oldest, reverse-chronological). It's described in the | 
|  | 651 | +            # spec this way so we can rely on people doing it the right way for | 
|  | 652 | +            # the historical messages to show up correctly. | 
|  | 653 | +            reverse_chronological_events = events | 
|  | 654 | +            # `[::-1]` is just syntax to reverse the list and give us a copy | 
|  | 655 | +            chronological_events = reverse_chronological_events[::-1] | 
|  | 656 | + | 
|  | 657 | +            # We want to calculate the `stream_ordering` from newest -> oldest | 
|  | 658 | +            # (reverse-chronological) (so MSC2716 historical events end up | 
|  | 659 | +            # sorting in the correct order) and persist oldest -> newest | 
|  | 660 | +            # (chronological) to get the least missing `prev_event` fetch | 
|  | 661 | +            # thrashing. | 
|  | 662 | +            # ------------------------------------------------------------------ | 
|  | 663 | + | 
|  | 664 | +            # Since we have been configured to write, we ought to have id generators, | 
|  | 665 | +            # rather than id trackers. | 
|  | 666 | +            assert ( | 
|  | 667 | +                self._instance_name in self._config.worker.writers.events | 
|  | 668 | +            ), "Can only write stream IDs on master" | 
|  | 669 | +            assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator) | 
|  | 670 | +            stream_ordering_manager = self._store._backfill_id_gen.get_next_mult( | 
|  | 671 | +                len(reverse_chronological_events) | 
|  | 672 | +            ) | 
|  | 673 | +            async with stream_ordering_manager as stream_orderings: | 
|  | 674 | +                # Calculate the `stream_ordering` from newest -> oldest | 
|  | 675 | +                # (reverse-chronological) (so historical events end up sorting | 
|  | 676 | +                # in the correct order). | 
|  | 677 | +                # | 
|  | 678 | +                # Backfilled events start with `stream_ordering=-1` and | 
|  | 679 | +                # decrement. For events, that we backfill at the same `depth` | 
|  | 680 | +                # (like chains of historical messages) in order for them to have | 
|  | 681 | +                # the best chance of ending up in the correct order, assign | 
|  | 682 | +                # `stream_ordering` to the assumed reverse-chronological list of | 
|  | 683 | +                # events to backfill (where the newest events get | 
|  | 684 | +                # stream_ordering assigned first) | 
|  | 685 | +                # | 
|  | 686 | +                # depth : stream_ordering : event | 
|  | 687 | +                # ----- : --------------- : ----------------------- | 
|  | 688 | +                # 1     :  1              : Event before 1 | 
|  | 689 | +                # 2     :  2              : Event before 2 | 
|  | 690 | +                # 3     : -4              : Historical message 1 | 
|  | 691 | +                # 3     : -4              : Historical message 2 | 
|  | 692 | +                # 3     : -3              : Historical message 3 | 
|  | 693 | +                # 3     : -2              : Historical message 4 | 
|  | 694 | +                # 3     : -1              : Historical message 5 | 
|  | 695 | +                # 3     :  3              : Event after 1 | 
|  | 696 | +                # 4     :  4              : Event after 2 | 
|  | 697 | +                # | 
|  | 698 | +                for event, stream in zip( | 
|  | 699 | +                    reverse_chronological_events, stream_orderings | 
|  | 700 | +                ): | 
|  | 701 | +                    event.internal_metadata.stream_ordering = stream | 
|  | 702 | + | 
| 647 | 703 |             await self._process_pulled_events( | 
| 648 | 704 |                 dest, | 
| 649 |  | -                events, | 
|  | 705 | +                # Persist events from oldest -> newest (chronological) to get | 
|  | 706 | +                # the least missing `prev_event` fetch thrashing. | 
|  | 707 | +                # `_process_pulled_events` does some sorting of its own by | 
|  | 708 | +                # `depth` but if we let it sort the reverse-chronological list | 
|  | 709 | +                # of events, it naively orders events with the same depth in the | 
|  | 710 | +                # opposite order we want. If we pass it an already sorted by | 
|  | 711 | +                # depth list, then everything lines up. | 
|  | 712 | +                chronological_events, | 
| 650 | 713 |                 backfilled=True, | 
| 651 | 714 |             ) | 
| 652 | 715 | 
 | 
|  | 
0 commit comments