@@ -650,47 +650,50 @@ async def _handle_successful_auth(self, response, connection):
650650 connection .remote_uuid = remote_uuid
651651
652652 existing = self .active_connections .get (remote_uuid , None )
653- if existing :
654- await connection .close_connection (
655- "Duplicate connection to same member with UUID: %s" % remote_uuid , None
656- )
657- return existing
658-
659- new_cluster_id = response ["cluster_id" ]
660- changed_cluster = self ._cluster_id is not None and self ._cluster_id != new_cluster_id
661- if changed_cluster :
662- await self ._check_client_state_on_cluster_change (connection )
663- _logger .warning (
664- "Switching from current cluster: %s to new cluster: %s" ,
665- self ._cluster_id ,
666- new_cluster_id ,
667- )
668- self ._on_cluster_restart ()
669653
654+ if existing :
655+ await connection .close_connection (
656+ "Duplicate connection to same member with UUID: %s" % remote_uuid , None
657+ )
658+ return existing
659+
660+ new_cluster_id = response ["cluster_id" ]
661+ changed_cluster = self ._cluster_id is not None and self ._cluster_id != new_cluster_id
662+ if changed_cluster :
663+ await self ._check_client_state_on_cluster_change (connection )
664+ _logger .warning (
665+ "Switching from current cluster: %s to new cluster: %s" ,
666+ self ._cluster_id ,
667+ new_cluster_id ,
668+ )
669+ self ._on_cluster_restart ()
670+
671+ async with self ._lock :
670672 is_initial_connection = not self .active_connections
671673 self .active_connections [remote_uuid ] = connection
672674 fire_connected_lifecycle_event = False
673- if is_initial_connection :
674- self ._cluster_id = new_cluster_id
675- # In split brain, the client might connect to the one half
676- # of the cluster, and then later might reconnect to the
677- # other half, after the half it was connected to is
678- # completely dead. Since the cluster id is preserved in
679- # split brain scenarios, it is impossible to distinguish
680- # reconnection to the same cluster vs reconnection to the
681- # other half of the split brain. However, in the latter,
682- # we might need to send some state to the other half of
683- # the split brain (like Compact schemas). That forces us
684- # to send the client state to the cluster after the first
685- # cluster connection, regardless the cluster id is
686- # changed or not.
687- if self ._established_initial_cluster_connection :
688- self ._client_state = ClientState .CONNECTED_TO_CLUSTER
689- await self ._initialize_on_cluster (new_cluster_id )
690- else :
691- fire_connected_lifecycle_event = True
692- self ._established_initial_cluster_connection = True
693- self ._client_state = ClientState .INITIALIZED_ON_CLUSTER
675+
676+ if is_initial_connection :
677+ self ._cluster_id = new_cluster_id
678+ # In split brain, the client might connect to the one half
679+ # of the cluster, and then later might reconnect to the
680+ # other half, after the half it was connected to is
681+ # completely dead. Since the cluster id is preserved in
682+ # split brain scenarios, it is impossible to distinguish
683+ # reconnection to the same cluster vs reconnection to the
684+ # other half of the split brain. However, in the latter,
685+ # we might need to send some state to the other half of
686+ # the split brain (like Compact schemas). That forces us
687+ # to send the client state to the cluster after the first
688+ # cluster connection, regardless the cluster id is
689+ # changed or not.
690+ if self ._established_initial_cluster_connection :
691+ self ._client_state = ClientState .CONNECTED_TO_CLUSTER
692+ await self ._initialize_on_cluster (new_cluster_id )
693+ else :
694+ fire_connected_lifecycle_event = True
695+ self ._established_initial_cluster_connection = True
696+ self ._client_state = ClientState .INITIALIZED_ON_CLUSTER
694697
695698 if fire_connected_lifecycle_event :
696699 self ._lifecycle_service .fire_lifecycle_event (LifecycleState .CONNECTED )
0 commit comments