@@ -519,7 +519,7 @@ async def _load_filtered_recents(
519519 if len (recents ) > timeline_limit :
520520 limited = True
521521 recents = recents [- timeline_limit :]
522- room_key = RoomStreamToken . parse ( recents [0 ].internal_metadata .before )
522+ room_key = recents [0 ].internal_metadata .before
523523
524524 prev_batch_token = now_token .copy_and_replace ("room_key" , room_key )
525525
@@ -1595,16 +1595,24 @@ async def _get_rooms_changed(
15951595
15961596 if leave_events :
15971597 leave_event = leave_events [- 1 ]
1598- leave_stream_token = await self .store .get_stream_token_for_event (
1598+ leave_position = await self .store .get_position_for_event (
15991599 leave_event .event_id
16001600 )
1601- leave_token = since_token .copy_and_replace (
1602- "room_key" , leave_stream_token
1603- )
16041601
1605- if since_token and since_token .is_after (leave_token ):
1602+ # If the leave event happened before the since token then we
1603+ # bail.
1604+ if since_token and not leave_position .persisted_after (
1605+ since_token .room_key
1606+ ):
16061607 continue
16071608
1609+ # We can safely convert the position of the leave event into a
1610+ # stream token as it'll only be used in the context of this
1611+ # room. (c.f. the docstring of `to_room_stream_token`).
1612+ leave_token = since_token .copy_and_replace (
1613+ "room_key" , leave_position .to_room_stream_token ()
1614+ )
1615+
16081616 # If this is an out of band message, like a remote invite
16091617 # rejection, we include it in the recents batch. Otherwise, we
16101618 # let _load_filtered_recents handle fetching the correct
0 commit comments