@@ -801,13 +801,66 @@ async def get_last_event_in_room_before_stream_ordering(
801801            before this stream ordering. 
802802        """ 
803803
804-         last_row  =  await  self .get_room_event_before_stream_ordering (
805-             room_id = room_id ,
806-             stream_ordering = end_token .stream ,
804+         def  get_last_event_in_room_before_stream_ordering_txn (
805+             txn : LoggingTransaction ,
806+         ) ->  Optional [str ]:
807+             # We need to handle the fact that the stream tokens can be vector 
808+             # clocks. We do this by getting all rows between the minimum and 
809+             # maximum stream ordering in the token, plus one row less than the 
810+             # minimum stream ordering. We then filter the results against the 
811+             # token and return the first row that matches. 
812+ 
813+             sql  =  """ 
814+                 SELECT * FROM ( 
815+                     SELECT instance_name, stream_ordering, topological_ordering, event_id 
816+                     FROM events 
817+                     LEFT JOIN rejections USING (event_id) 
818+                     WHERE room_id = ? 
819+                         AND ? < stream_ordering AND stream_ordering <= ? 
820+                         AND NOT outlier 
821+                         AND rejections.event_id IS NULL 
822+                     ORDER BY stream_ordering DESC 
823+                 ) AS a 
824+                 UNION 
825+                 SELECT * FROM ( 
826+                     SELECT instance_name, stream_ordering, topological_ordering, event_id 
827+                     FROM events 
828+                     LEFT JOIN rejections USING (event_id) 
829+                     WHERE room_id = ? 
830+                         AND stream_ordering <= ? 
831+                         AND NOT outlier 
832+                         AND rejections.event_id IS NULL 
833+                     ORDER BY stream_ordering DESC 
834+                     LIMIT 1 
835+                 ) AS b 
836+             """ 
837+             txn .execute (
838+                 sql ,
839+                 (
840+                     room_id ,
841+                     end_token .stream ,
842+                     end_token .get_max_stream_pos (),
843+                     room_id ,
844+                     end_token .stream ,
845+                 ),
846+             )
847+ 
848+             for  instance_name , stream_ordering , topological_ordering , event_id  in  txn :
849+                 if  _filter_results (
850+                     lower_token = None ,
851+                     upper_token = end_token ,
852+                     instance_name = instance_name ,
853+                     topological_ordering = topological_ordering ,
854+                     stream_ordering = stream_ordering ,
855+                 ):
856+                     return  event_id 
857+ 
858+             return  None 
859+ 
860+         return  await  self .db_pool .runInteraction (
861+             "get_last_event_in_room_before_stream_ordering" ,
862+             get_last_event_in_room_before_stream_ordering_txn ,
807863        )
808-         if  last_row :
809-             return  last_row [2 ]
810-         return  None 
811864
812865    async  def  get_current_room_stream_token_for_room_id (
813866        self , room_id : str 
0 commit comments