3131import org .elasticsearch .cluster .ClusterState ;
3232import org .elasticsearch .cluster .ClusterStateUpdateTask ;
3333import org .elasticsearch .cluster .metadata .NodesShutdownMetadata ;
34+ import org .elasticsearch .cluster .metadata .ProjectId ;
3435import org .elasticsearch .cluster .metadata .SingleNodeShutdownMetadata ;
3536import org .elasticsearch .cluster .node .DiscoveryNode ;
3637import org .elasticsearch .cluster .node .DiscoveryNodes ;
38+ import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
3739import org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider ;
3840import org .elasticsearch .cluster .service .ClusterService ;
3941import org .elasticsearch .common .Priority ;
@@ -335,6 +337,8 @@ public void run() throws InterruptedException {
335337
336338 if (randomBoolean ()) {
337339 startNodeShutdownMarker ();
340+ } else {
341+ startAllocationFiltering ();
338342 }
339343
340344 if (completedSnapshotLatch .await (30 , TimeUnit .SECONDS )) {
@@ -1316,6 +1320,80 @@ public void clusterStateProcessed(ClusterState initialState, ClusterState newSta
13161320 });
13171321 }
13181322
1323+ private void startAllocationFiltering () {
1324+ enqueueAction (() -> {
1325+ boolean rerun = true ;
1326+ try (TransferableReleasables localReleasables = new TransferableReleasables ()) {
1327+ if (usually ()) {
1328+ return ;
1329+ }
1330+
1331+ final List <TrackedIndex > trackedIndices = indices .values ()
1332+ .stream ()
1333+ .filter (index -> index .supportsAllocationFilter )
1334+ .toList ();
1335+ if (trackedIndices .isEmpty ()) {
1336+ return ;
1337+ }
1338+ final var trackedIndex = randomFrom (trackedIndices );
1339+ if (localReleasables .add (tryAcquirePermit (trackedIndex .permits )) == null ) {
1340+ return ;
1341+ }
1342+
1343+ if (localReleasables .add (blockNodeRestarts ()) == null ) {
1344+ return ;
1345+ }
1346+ final var trackedNode = randomFrom (shuffledNodes .stream ().filter (node -> node .isDataNode ).toList ());
1347+
1348+ final Releasable releaseAll = localReleasables .transfer ();
1349+
1350+ logger .info (" --> moving index [{}] away from node [{}]" , trackedIndex .indexName , trackedNode .nodeName );
1351+
1352+ SubscribableListener .<AcknowledgedResponse >newForked (
1353+ l -> indicesAdmin ().prepareUpdateSettings (trackedIndex .indexName )
1354+ .setSettings (Settings .builder ().put ("index.routing.allocation.exclude._name" , trackedNode .nodeName ))
1355+ .execute (l )
1356+ ).addListener (mustSucceed (acknowledgedResponse -> {
1357+ assertTrue (acknowledgedResponse .isAcknowledged ());
1358+ logger .info ("--> updated index [{}] settings to exclude node [{}]" , trackedIndex .indexName , trackedNode .nodeName );
1359+ pollForIndexRebalanceCompletion (trackedNode , trackedIndex .indexName , releaseAll , this ::startAllocationFiltering );
1360+ }));
1361+ rerun = false ;
1362+
1363+ } finally {
1364+ if (rerun ) {
1365+ startAllocationFiltering ();
1366+ }
1367+ }
1368+ });
1369+ }
1370+
1371+ private void pollForIndexRebalanceCompletion (
1372+ TrackedNode excludedNode ,
1373+ String indexName ,
1374+ Releasable onCompletion ,
1375+ Runnable onSuccess
1376+ ) {
1377+ clientExecutor .execute (mustSucceed (() -> {
1378+ final var clusterService = internalCluster ().getCurrentMasterNodeInstance (ClusterService .class );
1379+ final ClusterState state = clusterService .state ();
1380+
1381+ final var allRebalanced = state .routingTable (ProjectId .DEFAULT )
1382+ .index (indexName )
1383+ .allShards ()
1384+ .flatMap (IndexShardRoutingTable ::allShards )
1385+ .allMatch (shardRouting -> shardRouting .active () && excludedNode .nodeId .equals (shardRouting .currentNodeId ()) == false );
1386+
1387+ if (allRebalanced ) {
1388+ logger .info ("--> moved index [{}] away from node [{}]" , indexName , excludedNode .nodeName );
1389+ Releasables .close (onCompletion );
1390+ onSuccess .run ();
1391+ } else {
1392+ pollForIndexRebalanceCompletion (excludedNode , indexName , onCompletion , onSuccess );
1393+ }
1394+ }));
1395+ }
1396+
13191397 @ Nullable // if we couldn't block node restarts
13201398 private Releasable blockNodeRestarts () {
13211399 try (TransferableReleasables localReleasables = new TransferableReleasables ()) {
@@ -1459,6 +1537,7 @@ private class TrackedIndex {
14591537
14601538 // these fields are only changed when all permits held by the delete/recreate process:
14611539 private int shardCount ;
1540+ private boolean supportsAllocationFilter ;
14621541 private Semaphore docPermits ;
14631542
14641543 private TrackedIndex (String indexName ) {
@@ -1482,8 +1561,10 @@ private void createIndexAndContinue(Releasable releasable) {
14821561 shardCount = between (1 , 5 );
14831562 docPermits = new Semaphore (between (1000 , 3000 ));
14841563 logger .info ("--> create index [{}] with max [{}] docs" , indexName , docPermits .availablePermits ());
1564+ final int replicaCount = between (0 , cluster .numDataNodes () - 1 );
1565+ supportsAllocationFilter = 1 + replicaCount < cluster .numDataNodes ();
14851566 indicesAdmin ().prepareCreate (indexName )
1486- .setSettings (indexSettings (shardCount , between ( 0 , cluster . numDataNodes () - 1 ) ))
1567+ .setSettings (indexSettings (shardCount , replicaCount ))
14871568 .execute (mustSucceed (response -> {
14881569 assertTrue (response .isAcknowledged ());
14891570 logger .info ("--> finished create index [{}]" , indexName );
@@ -1763,11 +1844,13 @@ private static class TrackedNode {
17631844 private final String nodeName ;
17641845 private final boolean isMasterNode ;
17651846 private final boolean isDataNode ;
1847+ private final String nodeId ;
17661848
17671849 TrackedNode (String nodeName , boolean isMasterNode , boolean isDataNode ) {
17681850 this .nodeName = nodeName ;
17691851 this .isMasterNode = isMasterNode ;
17701852 this .isDataNode = isDataNode ;
1853+ this .nodeId = getNodeId (nodeName );
17711854 }
17721855
17731856 Semaphore getPermits () {
0 commit comments