2323"""
2424import abc
2525import contextlib
26+ import itertools
2627import logging
2728from bisect import bisect
2829from contextlib import contextmanager
@@ -188,15 +189,17 @@ async def user_syncing(
188189 """
189190
190191 @abc .abstractmethod
191- def get_currently_syncing_users_for_replication (self ) -> Iterable [str ]:
192- """Get an iterable of syncing users on this worker, to send to the presence handler
192+ def get_currently_syncing_users_for_replication (
193+ self ,
194+ ) -> Iterable [Tuple [str , Optional [str ]]]:
195+ """Get an iterable of syncing users and devices on this worker, to send to the presence handler
193196
194197 This is called when a replication connection is established. It should return
195- a list of user ids , which are then sent as USER_SYNC commands to inform the
196- process handling presence about those users.
198+ a list of tuples of user ID & device ID , which are then sent as USER_SYNC commands
199+ to inform the process handling presence about those users/devices .
197200
198201 Returns:
199- An iterable of user_id strings .
202+ An iterable of tuples of user ID and device ID .
200203 """
201204
202205 async def get_state (self , target_user : UserID ) -> UserPresenceState :
@@ -284,7 +287,12 @@ async def bump_presence_active_time(
284287 """
285288
286289 async def update_external_syncs_row ( # noqa: B027 (no-op by design)
287- self , process_id : str , user_id : str , is_syncing : bool , sync_time_msec : int
290+ self ,
291+ process_id : str ,
292+ user_id : str ,
293+ device_id : Optional [str ],
294+ is_syncing : bool ,
295+ sync_time_msec : int ,
288296 ) -> None :
289297 """Update the syncing users for an external process as a delta.
290298
@@ -295,6 +303,7 @@ async def update_external_syncs_row( # noqa: B027 (no-op by design)
295303 syncing against. This allows synapse to process updates
296304 as user start and stop syncing against a given process.
297305 user_id: The user who has started or stopped syncing
306+ device_id: The user's device that has started or stopped syncing
298307 is_syncing: Whether or not the user is now syncing
299308 sync_time_msec: Time in ms when the user was last syncing
300309 """
@@ -425,16 +434,18 @@ def __init__(self, hs: "HomeServer"):
425434 hs .config .worker .writers .presence ,
426435 )
427436
428- # The number of ongoing syncs on this process, by user id .
437+ # The number of ongoing syncs on this process, by ( user ID, device ID) .
429438 # Empty if _presence_enabled is false.
430- self ._user_to_num_current_syncs : Dict [str , int ] = {}
439+ self ._user_device_to_num_current_syncs : Dict [
440+ Tuple [str , Optional [str ]], int
441+ ] = {}
431442
432443 self .notifier = hs .get_notifier ()
433444 self .instance_id = hs .get_instance_id ()
434445
435- # user_id -> last_sync_ms. Lists the users that have stopped syncing but
436- # we haven't notified the presence writer of that yet
437- self .users_going_offline : Dict [str , int ] = {}
446+ # ( user_id, device_id) -> last_sync_ms. Lists the devices that have stopped
447+ # syncing but we haven't notified the presence writer of that yet
448+ self ._user_devices_going_offline : Dict [Tuple [ str , Optional [ str ]] , int ] = {}
438449
439450 self ._bump_active_client = ReplicationBumpPresenceActiveTime .make_client (hs )
440451 self ._set_state_client = ReplicationPresenceSetState .make_client (hs )
@@ -457,39 +468,47 @@ async def _on_shutdown(self) -> None:
457468 ClearUserSyncsCommand (self .instance_id )
458469 )
459470
460- def send_user_sync (self , user_id : str , is_syncing : bool , last_sync_ms : int ) -> None :
471+ def send_user_sync (
472+ self ,
473+ user_id : str ,
474+ device_id : Optional [str ],
475+ is_syncing : bool ,
476+ last_sync_ms : int ,
477+ ) -> None :
461478 if self ._presence_enabled :
462479 self .hs .get_replication_command_handler ().send_user_sync (
463- self .instance_id , user_id , is_syncing , last_sync_ms
480+ self .instance_id , user_id , device_id , is_syncing , last_sync_ms
464481 )
465482
466- def mark_as_coming_online (self , user_id : str ) -> None :
483+ def mark_as_coming_online (self , user_id : str , device_id : Optional [ str ] ) -> None :
467484 """A user has started syncing. Send a UserSync to the presence writer,
468485 unless they had recently stopped syncing.
469486 """
470- going_offline = self .users_going_offline .pop (user_id , None )
487+ going_offline = self ._user_devices_going_offline .pop (( user_id , device_id ) , None )
471488 if not going_offline :
472489 # Safe to skip because we haven't yet told the presence writer they
473490 # were offline
474- self .send_user_sync (user_id , True , self .clock .time_msec ())
491+ self .send_user_sync (user_id , device_id , True , self .clock .time_msec ())
475492
476- def mark_as_going_offline (self , user_id : str ) -> None :
493+ def mark_as_going_offline (self , user_id : str , device_id : Optional [ str ] ) -> None :
477494 """A user has stopped syncing. We wait before notifying the presence
478495 writer as its likely they'll come back soon. This allows us to avoid
479496 sending a stopped syncing immediately followed by a started syncing
480497 notification to the presence writer
481498 """
482- self .users_going_offline [ user_id ] = self .clock .time_msec ()
499+ self ._user_devices_going_offline [( user_id , device_id ) ] = self .clock .time_msec ()
483500
484501 def send_stop_syncing (self ) -> None :
485502 """Check if there are any users who have stopped syncing a while ago and
486503 haven't come back yet. If there are poke the presence writer about them.
487504 """
488505 now = self .clock .time_msec ()
489- for user_id , last_sync_ms in list (self .users_going_offline .items ()):
506+ for (user_id , device_id ), last_sync_ms in list (
507+ self ._user_devices_going_offline .items ()
508+ ):
490509 if now - last_sync_ms > UPDATE_SYNCING_USERS_MS :
491- self .users_going_offline .pop (user_id , None )
492- self .send_user_sync (user_id , False , last_sync_ms )
510+ self ._user_devices_going_offline .pop (( user_id , device_id ) , None )
511+ self .send_user_sync (user_id , device_id , False , last_sync_ms )
493512
494513 async def user_syncing (
495514 self ,
@@ -515,23 +534,23 @@ async def user_syncing(
515534 is_sync = True ,
516535 )
517536
518- curr_sync = self ._user_to_num_current_syncs .get (user_id , 0 )
519- self ._user_to_num_current_syncs [ user_id ] = curr_sync + 1
537+ curr_sync = self ._user_device_to_num_current_syncs .get (( user_id , device_id ) , 0 )
538+ self ._user_device_to_num_current_syncs [( user_id , device_id ) ] = curr_sync + 1
520539
521540 # If this is the first in-flight sync, notify replication
522- if self ._user_to_num_current_syncs [ user_id ] == 1 :
523- self .mark_as_coming_online (user_id )
541+ if self ._user_device_to_num_current_syncs [( user_id , device_id ) ] == 1 :
542+ self .mark_as_coming_online (user_id , device_id )
524543
525544 def _end () -> None :
526545 # We check that the user_id is in user_to_num_current_syncs because
527546 # user_to_num_current_syncs may have been cleared if we are
528547 # shutting down.
529- if user_id in self ._user_to_num_current_syncs :
530- self ._user_to_num_current_syncs [ user_id ] -= 1
548+ if ( user_id , device_id ) in self ._user_device_to_num_current_syncs :
549+ self ._user_device_to_num_current_syncs [( user_id , device_id ) ] -= 1
531550
532551 # If there are no more in-flight syncs, notify replication
533- if self ._user_to_num_current_syncs [ user_id ] == 0 :
534- self .mark_as_going_offline (user_id )
552+ if self ._user_device_to_num_current_syncs [( user_id , device_id ) ] == 0 :
553+ self .mark_as_going_offline (user_id , device_id )
535554
536555 @contextlib .contextmanager
537556 def _user_syncing () -> Generator [None , None , None ]:
@@ -598,10 +617,12 @@ async def process_replication_rows(
598617 # If this is a federation sender, notify about presence updates.
599618 await self .maybe_send_presence_to_interested_destinations (state_to_notify )
600619
601- def get_currently_syncing_users_for_replication (self ) -> Iterable [str ]:
620+ def get_currently_syncing_users_for_replication (
621+ self ,
622+ ) -> Iterable [Tuple [str , Optional [str ]]]:
602623 return [
603- user_id
604- for user_id , count in self ._user_to_num_current_syncs .items ()
624+ user_id_device_id
625+ for user_id_device_id , count in self ._user_device_to_num_current_syncs .items ()
605626 if count > 0
606627 ]
607628
@@ -723,17 +744,23 @@ def __init__(self, hs: "HomeServer"):
723744
724745 # Keeps track of the number of *ongoing* syncs on this process. While
725746 # this is non zero a user will never go offline.
726- self .user_to_num_current_syncs : Dict [str , int ] = {}
747+ self ._user_device_to_num_current_syncs : Dict [
748+ Tuple [str , Optional [str ]], int
749+ ] = {}
727750
728751 # Keeps track of the number of *ongoing* syncs on other processes.
752+ #
729753 # While any sync is ongoing on another process the user will never
730754 # go offline.
755+ #
731756 # Each process has a unique identifier and an update frequency. If
732757 # no update is received from that process within the update period then
733758 # we assume that all the sync requests on that process have stopped.
734- # Stored as a dict from process_id to set of user_id, and a dict of
735- # process_id to millisecond timestamp last updated.
736- self .external_process_to_current_syncs : Dict [str , Set [str ]] = {}
759+ # Stored as a dict from process_id to set of (user_id, device_id), and
760+ # a dict of process_id to millisecond timestamp last updated.
761+ self .external_process_to_current_syncs : Dict [
762+ str , Set [Tuple [str , Optional [str ]]]
763+ ] = {}
737764 self .external_process_last_updated_ms : Dict [str , int ] = {}
738765
739766 self .external_sync_linearizer = Linearizer (name = "external_sync_linearizer" )
@@ -938,7 +965,10 @@ async def _handle_timeouts(self) -> None:
938965 # that were syncing on that process to see if they need to be timed
939966 # out.
940967 users_to_check .update (
941- self .external_process_to_current_syncs .pop (process_id , ())
968+ user_id
969+ for user_id , device_id in self .external_process_to_current_syncs .pop (
970+ process_id , ()
971+ )
942972 )
943973 self .external_process_last_updated_ms .pop (process_id )
944974
@@ -951,11 +981,15 @@ async def _handle_timeouts(self) -> None:
951981
952982 syncing_user_ids = {
953983 user_id
954- for user_id , count in self .user_to_num_current_syncs .items ()
984+ for ( user_id , _ ), count in self ._user_device_to_num_current_syncs .items ()
955985 if count
956986 }
957- for user_ids in self .external_process_to_current_syncs .values ():
958- syncing_user_ids .update (user_ids )
987+ syncing_user_ids .update (
988+ user_id
989+ for user_id , _ in itertools .chain (
990+ * self .external_process_to_current_syncs .values ()
991+ )
992+ )
959993
960994 changes = handle_timeouts (
961995 states ,
@@ -1013,8 +1047,8 @@ async def user_syncing(
10131047 if not affect_presence or not self ._presence_enabled :
10141048 return _NullContextManager ()
10151049
1016- curr_sync = self .user_to_num_current_syncs .get (user_id , 0 )
1017- self .user_to_num_current_syncs [ user_id ] = curr_sync + 1
1050+ curr_sync = self ._user_device_to_num_current_syncs .get (( user_id , device_id ) , 0 )
1051+ self ._user_device_to_num_current_syncs [( user_id , device_id ) ] = curr_sync + 1
10181052
10191053 # Note that this causes last_active_ts to be incremented which is not
10201054 # what the spec wants.
@@ -1027,7 +1061,7 @@ async def user_syncing(
10271061
10281062 async def _end () -> None :
10291063 try :
1030- self .user_to_num_current_syncs [ user_id ] -= 1
1064+ self ._user_device_to_num_current_syncs [( user_id , device_id ) ] -= 1
10311065
10321066 prev_state = await self .current_state_for_user (user_id )
10331067 await self ._update_states (
@@ -1049,12 +1083,19 @@ def _user_syncing() -> Generator[None, None, None]:
10491083
10501084 return _user_syncing ()
10511085
1052- def get_currently_syncing_users_for_replication (self ) -> Iterable [str ]:
1086+ def get_currently_syncing_users_for_replication (
1087+ self ,
1088+ ) -> Iterable [Tuple [str , Optional [str ]]]:
10531089 # since we are the process handling presence, there is nothing to do here.
10541090 return []
10551091
10561092 async def update_external_syncs_row (
1057- self , process_id : str , user_id : str , is_syncing : bool , sync_time_msec : int
1093+ self ,
1094+ process_id : str ,
1095+ user_id : str ,
1096+ device_id : Optional [str ],
1097+ is_syncing : bool ,
1098+ sync_time_msec : int ,
10581099 ) -> None :
10591100 """Update the syncing users for an external process as a delta.
10601101
@@ -1063,6 +1104,7 @@ async def update_external_syncs_row(
10631104 syncing against. This allows synapse to process updates
10641105 as user start and stop syncing against a given process.
10651106 user_id: The user who has started or stopped syncing
1107+ device_id: The user's device that has started or stopped syncing
10661108 is_syncing: Whether or not the user is now syncing
10671109 sync_time_msec: Time in ms when the user was last syncing
10681110 """
@@ -1073,26 +1115,27 @@ async def update_external_syncs_row(
10731115 process_id , set ()
10741116 )
10751117
1076- # USER_SYNC is sent when a user starts or stops syncing on a remote
1077- # process. (But only for the initial and last device.)
1118+ # USER_SYNC is sent when a user's device starts or stops syncing on
1119+ # a remote # process. (But only for the initial and last sync for that
1120+ # device.)
10781121 #
1079- # When a user *starts* syncing it also calls set_state(...) which
1122+ # When a device *starts* syncing it also calls set_state(...) which
10801123 # will update the state, last_active_ts, and last_user_sync_ts.
1081- # Simply ensure the user is tracked as syncing in this case.
1124+ # Simply ensure the user & device is tracked as syncing in this case.
10821125 #
1083- # When a user *stops* syncing, update the last_user_sync_ts and mark
1126+ # When a device *stops* syncing, update the last_user_sync_ts and mark
10841127 # them as no longer syncing. Note this doesn't quite match the
10851128 # monolith behaviour, which updates last_user_sync_ts at the end of
10861129 # every sync, not just the last in-flight sync.
1087- if is_syncing and user_id not in process_presence :
1088- process_presence .add (user_id )
1089- elif not is_syncing and user_id in process_presence :
1130+ if is_syncing and ( user_id , device_id ) not in process_presence :
1131+ process_presence .add (( user_id , device_id ) )
1132+ elif not is_syncing and ( user_id , device_id ) in process_presence :
10901133 new_state = prev_state .copy_and_replace (
10911134 last_user_sync_ts = sync_time_msec
10921135 )
10931136 await self ._update_states ([new_state ])
10941137
1095- process_presence .discard (user_id )
1138+ process_presence .discard (( user_id , device_id ) )
10961139
10971140 self .external_process_last_updated_ms [process_id ] = self .clock .time_msec ()
10981141
@@ -1106,7 +1149,9 @@ async def update_external_syncs_clear(self, process_id: str) -> None:
11061149 process_presence = self .external_process_to_current_syncs .pop (
11071150 process_id , set ()
11081151 )
1109- prev_states = await self .current_state_for_users (process_presence )
1152+ prev_states = await self .current_state_for_users (
1153+ {user_id for user_id , device_id in process_presence }
1154+ )
11101155 time_now_ms = self .clock .time_msec ()
11111156
11121157 await self ._update_states (
0 commit comments