@@ -1397,66 +1397,63 @@ protected final void doEnsureClusterStateConsistency(NamedWriteableRegistry name
13971397        final  List <SubscribableListener <ClusterStateResponse >> localStates  = new  ArrayList <>(cluster ().size ());
13981398        final  var  masterName  = internalCluster ().getMasterName ();
13991399        for  (Client  client  : cluster ().getClients ()) {
1400-             localStates .add (
1401-                 SubscribableListener .newForked (l  -> prepareClusterStateRequest (client )
1402-                     .execute (l ))
1403-             );
1400+             localStates .add (SubscribableListener .newForked (l  -> prepareClusterStateRequest (client ).execute (l )));
14041401        }
14051402        try  (RefCountingListener  refCountingListener  = new  RefCountingListener (future )) {
1406-             SubscribableListener .<ClusterStateResponse >newForked (
1407-                 l  -> prepareClusterStateRequest ( client ( masterName )). execute ( l ) 
1408-             ). andThenAccept (masterStateResponse  -> { 
1409-                 byte []  masterClusterStateBytes  =  ClusterState . Builder . toBytes ( masterStateResponse . getState ()); 
1410-                 // remove local node reference 
1411-                 final   ClusterState   masterClusterState  =  ClusterState . Builder . fromBytes ( 
1412-                     masterClusterStateBytes ,
1413-                     null , 
1414-                     namedWriteableRegistry 
1415-                 );
1416-                 Map < String ,  Object >  masterStateMap  =  convertToMap ( masterClusterState ,  xContentParams () );
1417-                 String   masterId  =  masterClusterState . nodes (). getMasterNodeId (); 
1418-                 if  ( masterId  ==  null ) { 
1419-                     logger . warn ( "Failed  to find an elected  master in the  cluster state: "  +  masterClusterState );
1420-                     throw   new   AssertionError ( "Unable to find master in cluster state. Expecting a stable master node" ); 
1421-                 } 
1422-                 for  ( SubscribableListener < ClusterStateResponse >  localStateListener  :  localStates )  {
1423-                     localStateListener . andThenAccept (localClusterStateResponse  -> { 
1424-                         byte []  localClusterStateBytes  =  ClusterState . Builder . toBytes ( localClusterStateResponse . getState ()); 
1425-                         // remove local node reference 
1426-                         final   ClusterState   localClusterState  =  ClusterState . Builder . fromBytes ( 
1427-                             localClusterStateBytes ,
1428-                             null , 
1429-                             namedWriteableRegistry 
1430-                         );
1431-                         final   Map < String ,  Object >  localStateMap  =  convertToMap ( localClusterState ,  xContentParams ()); 
1432-                         // Check  that the non- master node has  the same version of the cluster state as  the master and  
1433-                         // that the master node matches the master (otherwise there is no requirement for the cluster state to 
1434-                         // match )
1435-                         if  ( masterClusterState . version () ==  localClusterState . version () 
1436-                             &&  masterId . equals ( localClusterState . nodes (). getMasterNodeId ()))  {
1437-                             try  { 
1438-                                 assertEquals ( 
1439-                                     "cluster state UUID does not match" ,
1440-                                     masterClusterState .stateUUID (), 
1441-                                     localClusterState . stateUUID () 
1442-                                 ); 
1443-                                 // Compare JSON serialization 
1444-                                 assertNull ( 
1445-                                     "cluster state JSON serialization does not match" , 
1446-                                     differenceBetweenMapsIgnoringArrayOrder ( masterStateMap ,  localStateMap ) 
1447-                                 ); 
1448-                             }  catch  ( final   AssertionError   error ) { 
1449-                                 logger . error ( 
1450-                                     "Cluster state from master: \n {} \n Local cluster state: \n {}"  ,
1451-                                     masterClusterState .toString (), 
1452-                                     localClusterState . toString () 
1453-                                 ) ;
1454-                                 throw   error ; 
1403+             SubscribableListener .<ClusterStateResponse >newForked (l  ->  prepareClusterStateRequest ( client ( masterName )). execute ( l )) 
1404+                 . andThenAccept ( masterStateResponse  -> { 
1405+                      byte []  masterClusterStateBytes  =  ClusterState . Builder . toBytes (masterStateResponse . getState ()); 
1406+                      // remove local node reference 
1407+                      final   ClusterState   masterClusterState  =  ClusterState . Builder . fromBytes ( 
1408+                          masterClusterStateBytes , 
1409+                          null ,
1410+                          namedWriteableRegistry 
1411+                     ); 
1412+                      Map < String ,  Object >  masterStateMap  =  convertToMap ( masterClusterState ,  xContentParams () );
1413+                      String   masterId  =  masterClusterState . nodes (). getMasterNodeId ( );
1414+                      if  ( masterId  ==  null ) { 
1415+                          logger . warn ( "Failed to find an elected master in the cluster state: "  +  masterClusterState ); 
1416+                          throw   new   AssertionError ( "Unable  to find master in cluster state. Expecting a stable master node"  );
1417+                     } 
1418+                      for  ( SubscribableListener < ClusterStateResponse >  localStateListener  :  localStates ) { 
1419+                          localStateListener . andThenAccept ( localClusterStateResponse  ->  {
1420+                              byte []  localClusterStateBytes  =  ClusterState . Builder . toBytes (localClusterStateResponse . getState ()); 
1421+                              // remove local node reference 
1422+                              final   ClusterState   localClusterState  =  ClusterState . Builder . fromBytes ( 
1423+                                  localClusterStateBytes , 
1424+                                  null ,
1425+                                  namedWriteableRegistry 
1426+                             ); 
1427+                              final   Map < String ,  Object >  localStateMap  =  convertToMap ( localClusterState ,  xContentParams () );
1428+                              // Check that the non-master node has the same version of the cluster state as the master and 
1429+                              // that the master node matches  the master (otherwise there is no requirement for  the cluster state to  
1430+                              // match) 
1431+                              if  ( masterClusterState . version () ==  localClusterState . version ( )
1432+                                 &&  masterId . equals ( localClusterState . nodes (). getMasterNodeId ())) { 
1433+                                  try  {
1434+                                      assertEquals ( 
1435+                                          "cluster state UUID does not match" , 
1436+                                          masterClusterState . stateUUID () ,
1437+                                          localClusterState .stateUUID ()
1438+                                     ); 
1439+                                      // Compare JSON serialization 
1440+                                      assertNull ( 
1441+                                          "cluster state JSON serialization does not match" , 
1442+                                          differenceBetweenMapsIgnoringArrayOrder ( masterStateMap ,  localStateMap ) 
1443+                                     ); 
1444+                                 }  catch  ( final   AssertionError   error ) { 
1445+                                      logger . error ( 
1446+                                          "Cluster state from master: \n {} \n Local cluster state: \n {}" , 
1447+                                          masterClusterState . toString () ,
1448+                                          localClusterState .toString ()
1449+                                     ); 
1450+                                      throw   error ;
1451+                                 } 
14551452                            }
1456-                         }
1457-                     }). addListener ( refCountingListener . acquire ()); 
1458-                 }
1459-             }) .addListener (refCountingListener .acquire ());
1453+                         }). addListener ( refCountingListener . acquire ()); 
1454+                     }
1455+                 }) 
1456+                  .addListener (refCountingListener .acquire ());
14601457        }
14611458        safeGet (future );
14621459    }
0 commit comments