6060import org .elasticsearch .threadpool .ScalingExecutorBuilder ;
6161import org .elasticsearch .threadpool .TestThreadPool ;
6262import org .elasticsearch .threadpool .ThreadPool ;
63+ import org .junit .After ;
64+ import org .junit .Before ;
6365
6466import java .nio .file .Path ;
6567import java .util .ArrayList ;
9294@ LuceneTestCase .SuppressFileSystems (value = "HandleLimitFS" ) // we sometimes have >2048 open files
9395public class SnapshotStressTestsIT extends AbstractSnapshotIntegTestCase {
9496
97+ private int initialShardSnapshotPerNodeLimit ;
98+
99+ @ Before
100+ public void randomInitialShardSnapshotPerNodeLimit () {
101+ initialShardSnapshotPerNodeLimit = between (0 , 10 );
102+ }
103+
104+ @ After
105+ public void clearShardSnapshotPerNodeLimitSetting () {
106+ // Clear any persistent setting that may have been set during the test. The teardown process does not like it.
107+ safeGet (
108+ clusterAdmin ().prepareUpdateSettings (TEST_REQUEST_TIMEOUT , TEST_REQUEST_TIMEOUT )
109+ .setPersistentSettings (Settings .builder ().putNull (SnapshotsService .SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING .getKey ()))
110+ .execute ()
111+ );
112+ }
113+
95114 public void testRandomActivities () throws InterruptedException {
115+ logger .info ("--> initial shard snapshot per node limit: [{}]" , initialShardSnapshotPerNodeLimit );
96116 final DiscoveryNodes discoveryNodes = clusterAdmin ().prepareState (TEST_REQUEST_TIMEOUT )
97117 .clear ()
98118 .setNodes (true )
@@ -108,7 +128,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
108128 return Settings .builder ()
109129 .put (super .nodeSettings (nodeOrdinal , otherSettings ))
110130 .put (EnableAllocationDecider .CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey (), EnableAllocationDecider .Rebalance .ALL )
111- .put (SnapshotsService .SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING .getKey (), 5 ) // more aggressive limit
131+ .put (SnapshotsService .SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING .getKey (), initialShardSnapshotPerNodeLimit )
112132 .build ();
113133 }
114134
@@ -341,6 +361,10 @@ public void run() throws InterruptedException {
341361 startAllocationFiltering ();
342362 }
343363
364+ if (randomBoolean ()) {
365+ startUpdateShardSnapshotPerNodeLimit ();
366+ }
367+
344368 if (completedSnapshotLatch .await (30 , TimeUnit .SECONDS )) {
345369 logger .info ("--> completed target snapshot count, finishing test" );
346370 } else {
@@ -1394,6 +1418,43 @@ private void pollForAllocationFilterCompletion(
13941418 }));
13951419 }
13961420
1421+ private void startUpdateShardSnapshotPerNodeLimit () {
1422+ enqueueAction (() -> {
1423+ boolean rerun = true ;
1424+ try (TransferableReleasables localReleasables = new TransferableReleasables ()) {
1425+ if (usually ()) {
1426+ return ;
1427+ }
1428+
1429+ if (localReleasables .add (blockNodeRestarts ()) == null ) {
1430+ return ;
1431+ }
1432+
1433+ final Releasable releaseAll = localReleasables .transfer ();
1434+
1435+ final int newLimit = between (0 , 10 );
1436+ logger .info ("--> updating shard snapshot per node limit to [{}]" , newLimit );
1437+
1438+ clusterAdmin ().prepareUpdateSettings (TEST_REQUEST_TIMEOUT , TEST_REQUEST_TIMEOUT )
1439+ .setPersistentSettings (
1440+ Settings .builder ().put (SnapshotsService .SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING .getKey (), newLimit )
1441+ )
1442+ .execute (mustSucceed (response -> {
1443+ assertTrue (response .isAcknowledged ());
1444+ logger .info ("--> updated shard snapshot per node limit to [{}]" , newLimit );
1445+ Releasables .close (releaseAll );
1446+ startUpdateShardSnapshotPerNodeLimit ();
1447+ }));
1448+
1449+ rerun = false ;
1450+ } finally {
1451+ if (rerun ) {
1452+ startUpdateShardSnapshotPerNodeLimit ();
1453+ }
1454+ }
1455+ });
1456+ }
1457+
13971458 @ Nullable // if we couldn't block node restarts
13981459 private Releasable blockNodeRestarts () {
13991460 try (TransferableReleasables localReleasables = new TransferableReleasables ()) {
0 commit comments