4242from synapse .metrics import LaterGauge
4343from synapse .metrics .background_process_metrics import run_as_background_process
4444from synapse .streams .config import PaginationConfig
45- from synapse .types import Collection , RoomStreamToken , StreamToken , UserID
45+ from synapse .types import (
46+ Collection ,
47+ PersistedEventPosition ,
48+ RoomStreamToken ,
49+ StreamToken ,
50+ UserID ,
51+ )
4652from synapse .util .async_helpers import ObservableDeferred , timeout_deferred
4753from synapse .util .metrics import Measure
4854from synapse .visibility import filter_events_for_client
@@ -187,7 +193,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):
187193 self .store = hs .get_datastore ()
188194 self .pending_new_room_events = (
189195 []
190- ) # type: List[Tuple[int , EventBase, Collection[UserID]]]
196+ ) # type: List[Tuple[PersistedEventPosition , EventBase, Collection[UserID]]]
191197
192198 # Called when there are new things to stream over replication
193199 self .replication_callbacks = [] # type: List[Callable[[], None]]
@@ -246,8 +252,8 @@ def add_replication_callback(self, cb: Callable[[], None]):
246252 def on_new_room_event (
247253 self ,
248254 event : EventBase ,
249- room_stream_id : int ,
250- max_room_stream_id : int ,
255+ event_pos : PersistedEventPosition ,
256+ max_room_stream_token : RoomStreamToken ,
251257 extra_users : Collection [UserID ] = [],
252258 ):
253259 """ Used by handlers to inform the notifier something has happened
@@ -261,16 +267,16 @@ def on_new_room_event(
261267 until all previous events have been persisted before notifying
262268 the client streams.
263269 """
264- self .pending_new_room_events .append ((room_stream_id , event , extra_users ))
265- self ._notify_pending_new_room_events (max_room_stream_id )
270+ self .pending_new_room_events .append ((event_pos , event , extra_users ))
271+ self ._notify_pending_new_room_events (max_room_stream_token )
266272
267273 self .notify_replication ()
268274
269- def _notify_pending_new_room_events (self , max_room_stream_id : int ):
275+ def _notify_pending_new_room_events (self , max_room_stream_token : RoomStreamToken ):
270276 """Notify for the room events that were queued waiting for a previous
271277 event to be persisted.
272278 Args:
273- max_room_stream_id : The highest stream_id below which all
279+ max_room_stream_token : The highest stream_id below which all
274280 events have been persisted.
275281 """
276282 pending = self .pending_new_room_events
@@ -279,11 +285,9 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):
279285 users = set () # type: Set[UserID]
280286 rooms = set () # type: Set[str]
281287
282- for room_stream_id , event , extra_users in pending :
283- if room_stream_id > max_room_stream_id :
284- self .pending_new_room_events .append (
285- (room_stream_id , event , extra_users )
286- )
288+ for event_pos , event , extra_users in pending :
289+ if event_pos .persisted_after (max_room_stream_token ):
290+ self .pending_new_room_events .append ((event_pos , event , extra_users ))
287291 else :
288292 if (
289293 event .type == EventTypes .Member
@@ -296,39 +300,38 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):
296300
297301 if users or rooms :
298302 self .on_new_event (
299- "room_key" ,
300- RoomStreamToken (None , max_room_stream_id ),
301- users = users ,
302- rooms = rooms ,
303+ "room_key" , max_room_stream_token , users = users , rooms = rooms ,
303304 )
304- self ._on_updated_room_token (max_room_stream_id )
305+ self ._on_updated_room_token (max_room_stream_token )
305306
306- def _on_updated_room_token (self , max_room_stream_id : int ):
307+ def _on_updated_room_token (self , max_room_stream_token : RoomStreamToken ):
307308 """Poke services that might care that the room position has been
308309 updated.
309310 """
310311
311312 # poke any interested application service.
312313 run_as_background_process (
313- "_notify_app_services" , self ._notify_app_services , max_room_stream_id
314+ "_notify_app_services" , self ._notify_app_services , max_room_stream_token
314315 )
315316
316317 run_as_background_process (
317- "_notify_pusher_pool" , self ._notify_pusher_pool , max_room_stream_id
318+ "_notify_pusher_pool" , self ._notify_pusher_pool , max_room_stream_token
318319 )
319320
320321 if self .federation_sender :
321- self .federation_sender .notify_new_events (max_room_stream_id )
322+ self .federation_sender .notify_new_events (max_room_stream_token . stream )
322323
323- async def _notify_app_services (self , max_room_stream_id : int ):
324+ async def _notify_app_services (self , max_room_stream_token : RoomStreamToken ):
324325 try :
325- await self .appservice_handler .notify_interested_services (max_room_stream_id )
326+ await self .appservice_handler .notify_interested_services (
327+ max_room_stream_token .stream
328+ )
326329 except Exception :
327330 logger .exception ("Error notifying application services of event" )
328331
329- async def _notify_pusher_pool (self , max_room_stream_id : int ):
332+ async def _notify_pusher_pool (self , max_room_stream_token : RoomStreamToken ):
330333 try :
331- await self ._pusher_pool .on_new_notifications (max_room_stream_id )
334+ await self ._pusher_pool .on_new_notifications (max_room_stream_token . stream )
332335 except Exception :
333336 logger .exception ("Error pusher pool of event" )
334337
0 commit comments