@@ -495,9 +495,6 @@ async def notify_from_replication(
495495 users = users_to_states .keys (),
496496 )
497497
498- # If this is a federation sender, notify about presence updates.
499- await self .maybe_send_presence_to_interested_destinations (states )
500-
501498 async def process_replication_rows (
502499 self , stream_name : str , instance_name : str , token : int , rows : list
503500 ):
@@ -519,11 +516,27 @@ async def process_replication_rows(
519516 for row in rows
520517 ]
521518
522- for state in states :
523- self .user_to_current_state [state .user_id ] = state
519+ # The list of states to notify sync streams and remote servers about.
520+ # This is calculated by comparing the old and new states for each user
521+ # using `should_notify(..)`.
522+ #
523+ # Note that this is necessary as the presence writer will periodically
524+ # flush presence state changes that should not be notified about to the
525+ # DB, and so will be sent over the replication stream.
526+ state_to_notify = []
527+
528+ for new_state in states :
529+ old_state = self .user_to_current_state .get (new_state .user_id )
530+ self .user_to_current_state [new_state .user_id ] = new_state
531+
532+ if not old_state or should_notify (old_state , new_state ):
533+ state_to_notify .append (new_state )
524534
525535 stream_id = token
526- await self .notify_from_replication (states , stream_id )
536+ await self .notify_from_replication (state_to_notify , stream_id )
537+
538+ # If this is a federation sender, notify about presence updates.
539+ await self .maybe_send_presence_to_interested_destinations (state_to_notify )
527540
528541 def get_currently_syncing_users_for_replication (self ) -> Iterable [str ]:
529542 return [
0 commit comments