@@ -253,15 +253,15 @@ public void testRelocationWhileIndexingRandom() throws Exception {
253253 String [] nodes = new String [numberOfNodes ];
254254 logger .info ("--> starting [node1] ..." );
255255 nodes [0 ] = internalCluster ().startNode (
256- Settings .builder ().put (IndexingMemoryController .PAUSE_INDEXING_ON_THROTTLE .getKey (), randomBoolean ())
257- );
256+ Settings .builder ().put (IndexingMemoryController .PAUSE_INDEXING_ON_THROTTLE .getKey (), randomBoolean ()));
258257
259258 logger .info ("--> creating test index ..." );
260259 prepareCreate ("test" , indexSettings (1 , numberOfReplicas )).get ();
261260
262261 for (int i = 2 ; i <= numberOfNodes ; i ++) {
263262 logger .info ("--> starting [node{}] ..." , i );
264- nodes [i - 1 ] = internalCluster ().startNode ();
263+ nodes [i - 1 ] = internalCluster ().startNode (
264+ Settings .builder ().put (IndexingMemoryController .PAUSE_INDEXING_ON_THROTTLE .getKey (), randomBoolean ()));
265265 if (i != numberOfNodes ) {
266266 ClusterHealthResponse healthResponse = clusterAdmin ().prepareHealth (TEST_REQUEST_TIMEOUT )
267267 .setWaitForEvents (Priority .LANGUID )
@@ -278,15 +278,6 @@ public void testRelocationWhileIndexingRandom() throws Exception {
278278 waitForDocs (numDocs , indexer );
279279 logger .info ("--> {} docs indexed" , numDocs );
280280
281- IndicesService indicesService = internalCluster ().getInstance (IndicesService .class , nodes [0 ]);
282- IndexShard shard = indicesService .indexServiceSafe (resolveIndex ("test" )).getShard (0 );
283- if (throttleIndexing ) {
284- // Activate index throttling on "test" index primary shard
285- shard .activateThrottling ();
286- // Verify that indexing is throttled for this shard
287- Engine engine = shard .getEngineOrNull ();
288- assertThat (engine != null && engine .isThrottled (), equalTo (true ));
289- }
290281 logger .info ("--> starting relocations..." );
291282 int nodeShiftBased = numberOfReplicas ; // if we have replicas shift those
292283 for (int i = 0 ; i < numberOfRelocations ; i ++) {
@@ -295,6 +286,18 @@ public void testRelocationWhileIndexingRandom() throws Exception {
295286 fromNode += nodeShiftBased ;
296287 toNode += nodeShiftBased ;
297288 numDocs = scaledRandomIntBetween (200 , 1000 );
289+
290+ // Throttle indexing on source shard
291+ if (throttleIndexing ) {
292+ IndicesService indicesService = internalCluster ().getInstance (IndicesService .class , nodes [fromNode ]);
293+ IndexShard shard = indicesService .indexServiceSafe (resolveIndex ("test" )).getShard (0 );
294+ // Activate index throttling on "test" index primary shard
295+ logger .info ("--> activate throttling for shard on node {}..." , nodes [fromNode ]);
296+ shard .activateThrottling ();
297+ // Verify that indexing is throttled for this shard
298+ Engine engine = shard .getEngineOrNull ();
299+ assertThat (engine != null && engine .isThrottled (), equalTo (true ));
300+ }
298301 logger .debug ("--> Allow indexer to index [{}] documents" , numDocs );
299302 indexer .continueIndexing (numDocs );
300303 logger .info ("--> START relocate the shard from {} to {}" , nodes [fromNode ], nodes [toNode ]);
@@ -314,23 +317,15 @@ public void testRelocationWhileIndexingRandom() throws Exception {
314317 assertThat (clusterHealthResponse .isTimedOut (), equalTo (false ));
315318 indexer .pauseIndexing ();
316319 logger .info ("--> DONE relocate the shard from {} to {}" , fromNode , toNode );
320+ // Deactivate throttle on source shard
317321 if (throttleIndexing ) {
318- // Deactivate throttling on source shard to allow indexing threads to pass
322+ IndicesService indicesService = internalCluster ().getInstance (IndicesService .class , nodes [fromNode ]);
323+ IndexShard shard = indicesService .indexServiceSafe (resolveIndex ("test" )).getShard (0 );
324+ logger .info ("--> deactivate throttling for shard on node {}..." , nodes [fromNode ]);
319325 shard .deactivateThrottling ();
320- // Activate throttling on target shard before next relocation
321- indicesService = internalCluster ().getInstance (IndicesService .class , nodes [toNode ]);
322- shard = indicesService .indexServiceSafe (resolveIndex ("test" )).getShard (0 );
323- shard .activateThrottling ();
324- // Verify that indexing is throttled for this shard
325- Engine engine = shard .getEngineOrNull ();
326- assertThat (engine != null && engine .isThrottled (), equalTo (true ));
327326 }
328327 }
329328 logger .info ("--> done relocations" );
330- // Deactivate throttling on the primary shard to allow indexing threads to pass
331- if (throttleIndexing ) {
332- shard .deactivateThrottling ();
333- }
334329 logger .info ("--> waiting for indexing threads to stop ..." );
335330 indexer .stopAndAwaitStopped ();
336331 logger .info ("--> indexing threads stopped" );
0 commit comments