7676import java .util .concurrent .Semaphore ;
7777import java .util .concurrent .TimeUnit ;
7878import java .util .concurrent .atomic .AtomicBoolean ;
79- import java .util .concurrent .locks .LockSupport ;
8079import java .util .stream .Collectors ;
8180import java .util .stream .Stream ;
8281
@@ -173,8 +172,8 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception {
173172 // index throttling for a shard on this node, it will pause indexing for that shard until throttling
174173 // is deactivated.
175174 final String node_1 = internalCluster ().startNode (
176- Settings .builder ()
177- . put ( IndexingMemoryController . PAUSE_INDEXING_ON_THROTTLE . getKey (), true ) );
175+ Settings .builder (). put ( IndexingMemoryController . PAUSE_INDEXING_ON_THROTTLE . getKey (), true )
176+ );
178177
179178 logger .info ("--> creating test index ..." );
180179 prepareCreate ("test" , indexSettings (1 , 0 )).get ();
@@ -204,8 +203,7 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception {
204203
205204 // Activate index throttling on "test" index primary shard
206205 IndicesService indicesService = internalCluster ().getInstance (IndicesService .class , node_1 );
207- IndexService indexService = indicesService .indexService (resolveIndex ("test" ));
208- IndexShard shard = indexService .getShard (0 );
206+ IndexShard shard = indicesService .indexServiceSafe (resolveIndex ("test" )).getShard (0 );
209207 shard .activateThrottling ();
210208 // Verify that indexing is paused for the throttled shard
211209 assertBusy (() -> { assertThat (shard .isIndexingPaused (), equalTo (true )); });
@@ -230,16 +228,18 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception {
230228 .get ();
231229 assertThat (clusterHealthResponse .isTimedOut (), equalTo (false ));
232230
231+ // Relocated shard is not throttled
232+ assertThat (shard .isIndexingPaused (), equalTo (false ));
233233 logger .info ("--> verifying count after relocation ..." );
234234 indicesAdmin ().prepareRefresh ().get ();
235235 assertHitCount (prepareSearch ("test" ).setSize (0 ), 21 );
236- logger .info ("--> Test finished ..." );
237236 }
238237
239238 public void testRelocationWhileIndexingRandom () throws Exception {
240239 int numberOfRelocations = scaledRandomIntBetween (1 , rarely () ? 10 : 4 );
241240 int numberOfReplicas = randomBoolean () ? 0 : 1 ;
242241 int numberOfNodes = numberOfReplicas == 0 ? 2 : 3 ;
242+ boolean throttleIndexing = randomBoolean ();
243243
244244 logger .info (
245245 "testRelocationWhileIndexingRandom(numRelocations={}, numberOfReplicas={}, numberOfNodes={})" ,
@@ -248,9 +248,12 @@ public void testRelocationWhileIndexingRandom() throws Exception {
248248 numberOfNodes
249249 );
250250
251+ // Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate
252+ // index throttling for a shard on this node, it will pause indexing for that shard until throttling
253+ // is deactivated.
251254 String [] nodes = new String [numberOfNodes ];
252255 logger .info ("--> starting [node1] ..." );
253- nodes [0 ] = internalCluster ().startNode ();
256+ nodes [0 ] = internalCluster ().startNode (Settings . builder (). put ( IndexingMemoryController . PAUSE_INDEXING_ON_THROTTLE . getKey (), true ) );
254257
255258 logger .info ("--> creating test index ..." );
256259 prepareCreate ("test" , indexSettings (1 , numberOfReplicas )).get ();
@@ -274,6 +277,14 @@ public void testRelocationWhileIndexingRandom() throws Exception {
274277 waitForDocs (numDocs , indexer );
275278 logger .info ("--> {} docs indexed" , numDocs );
276279
280+ if (throttleIndexing ) {
281+ // Activate index throttling on "test" index primary shard
282+ IndicesService indicesService = internalCluster ().getInstance (IndicesService .class , nodes [0 ]);
283+ IndexShard shard = indicesService .indexServiceSafe (resolveIndex ("test" )).getShard (0 );
284+ shard .activateThrottling ();
285+ // Verify that indexing is paused for the throttled shard
286+ assertBusy (() -> { assertThat (shard .isIndexingPaused (), equalTo (true )); });
287+ }
277288 logger .info ("--> starting relocations..." );
278289 int nodeShiftBased = numberOfReplicas ; // if we have replicas shift those
279290 for (int i = 0 ; i < numberOfRelocations ; i ++) {
0 commit comments