@@ -801,13 +801,66 @@ async def get_last_event_in_room_before_stream_ordering(
801801 before this stream ordering.
802802 """
803803
804- last_row = await self .get_room_event_before_stream_ordering (
805- room_id = room_id ,
806- stream_ordering = end_token .stream ,
804+ def get_last_event_in_room_before_stream_ordering_txn (
805+ txn : LoggingTransaction ,
806+ ) -> Optional [str ]:
807+ # We need to handle the fact that the stream tokens can be vector
808+ # clocks. We do this by getting all rows between the minimum and
809+ # maximum stream ordering in the token, plus one row less than the
810+ # minimum stream ordering. We then filter the results against the
811+ # token and return the first row that matches.
812+
813+ sql = """
814+ SELECT * FROM (
815+ SELECT instance_name, stream_ordering, topological_ordering, event_id
816+ FROM events
817+ LEFT JOIN rejections USING (event_id)
818+ WHERE room_id = ?
819+ AND ? < stream_ordering AND stream_ordering <= ?
820+ AND NOT outlier
821+ AND rejections.event_id IS NULL
822+ ORDER BY stream_ordering DESC
823+ ) AS a
824+ UNION
825+ SELECT * FROM (
826+ SELECT instance_name, stream_ordering, topological_ordering, event_id
827+ FROM events
828+ LEFT JOIN rejections USING (event_id)
829+ WHERE room_id = ?
830+ AND stream_ordering <= ?
831+ AND NOT outlier
832+ AND rejections.event_id IS NULL
833+ ORDER BY stream_ordering DESC
834+ LIMIT 1
835+ ) AS b
836+ """
837+ txn .execute (
838+ sql ,
839+ (
840+ room_id ,
841+ end_token .stream ,
842+ end_token .get_max_stream_pos (),
843+ room_id ,
844+ end_token .stream ,
845+ ),
846+ )
847+
848+ for instance_name , stream_ordering , topological_ordering , event_id in txn :
849+ if _filter_results (
850+ lower_token = None ,
851+ upper_token = end_token ,
852+ instance_name = instance_name ,
853+ topological_ordering = topological_ordering ,
854+ stream_ordering = stream_ordering ,
855+ ):
856+ return event_id
857+
858+ return None
859+
860+ return await self .db_pool .runInteraction (
861+ "get_last_event_in_room_before_stream_ordering" ,
862+ get_last_event_in_room_before_stream_ordering_txn ,
807863 )
808- if last_row :
809- return last_row [2 ]
810- return None
811864
812865 async def get_current_room_stream_token_for_room_id (
813866 self , room_id : str
0 commit comments