@@ -467,27 +467,26 @@ public void testNonThrottleStats() throws Exception {
467467 assertThat (stats .getPrimaries ().getIndexing ().getTotal ().getThrottleTime ().millis (), equalTo (0L ));
468468 }
469469
470- public void testThrottleStats () {
470+ public void testThrottleStats () throws Exception {
471471 assertAcked (
472- prepareCreate ("test " ).setSettings (
472+ prepareCreate ("test_throttle_stats_index " ).setSettings (
473473 settingsBuilder ().put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , "1" )
474474 .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , "0" )
475475 .put (MergePolicyConfig .INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING .getKey (), "2" )
476476 .put (MergePolicyConfig .INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING .getKey (), "2" )
477477 .put (MergeSchedulerConfig .MAX_THREAD_COUNT_SETTING .getKey (), "1" )
478478 .put (MergeSchedulerConfig .MAX_MERGE_COUNT_SETTING .getKey (), "1" )
479+ .put (MergeSchedulerConfig .AUTO_THROTTLE_SETTING .getKey (), "true" )
479480 .put (IndexSettings .INDEX_TRANSLOG_DURABILITY_SETTING .getKey (), Translog .Durability .ASYNC .name ())
480481 )
481482 );
482- ensureGreen ();
483+ ensureGreen ("test_throttle_stats_index" );
483484 // make sure we see throttling kicking in:
484485 AtomicBoolean done = new AtomicBoolean ();
485486 AtomicLong termUpTo = new AtomicLong ();
486- long start = System .currentTimeMillis ();
487- for (int threadIdx = 0 ; threadIdx < 5 ; threadIdx ++) {
488- int finalThreadIdx = threadIdx ;
489- new Thread (() -> {
490- IndicesStatsResponse stats ;
487+ Thread [] indexingThreads = new Thread [5 ];
488+ for (int threadIdx = 0 ; threadIdx < indexingThreads .length ; threadIdx ++) {
489+ indexingThreads [threadIdx ] = new Thread (() -> {
491490 while (done .get () == false ) {
492491 for (int i = 0 ; i < 100 ; i ++) {
493492 // Provoke slowish merging by making many unique terms:
@@ -496,30 +495,35 @@ public void testThrottleStats() {
496495 sb .append (' ' );
497496 sb .append (termUpTo .incrementAndGet ());
498497 }
499- prepareIndex ("test" ).setId ("" + termUpTo .get ()).setSource ("field" + (i % 10 ), sb .toString ()).get ();
498+ prepareIndex ("test_throttle_stats_index" ).setId ("" + termUpTo .get ())
499+ .setSource ("field" + (i % 10 ), sb .toString ())
500+ .get ();
500501 if (i % 2 == 0 ) {
501- refresh ();
502+ refresh ("test_throttle_stats_index" );
502503 }
503504 }
504- refresh ();
505- if (finalThreadIdx == 0 ) {
506- stats = indicesAdmin ().prepareStats ().get ();
507- done .set (stats .getPrimaries ().getIndexing ().getTotal ().getThrottleTime ().millis () > 0 );
508- }
509- if (System .currentTimeMillis () - start > 300 * 1000 ) { // Wait 5 minutes for throttling to kick in
510- done .set (true );
511- fail ("index throttling didn't kick in after 5 minutes of intense merging" );
512- }
505+ refresh ("test_throttle_stats_index" );
513506 }
514- }).start ();
507+ });
508+ indexingThreads [threadIdx ].start ();
509+ }
510+
511+ assertBusy (() -> {
512+ IndicesStatsResponse stats = indicesAdmin ().prepareStats ("test_throttle_stats_index" ).get ();
513+ assertTrue (stats .getPrimaries ().getIndexing ().getTotal ().getThrottleTime ().millis () > 0 );
514+ done .set (true );
515+ }, 5L , TimeUnit .MINUTES );
516+
517+ for (Thread indexingThread : indexingThreads ) {
518+ indexingThread .join ();
515519 }
516520
517521 // Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
518522 // when ESIntegTestCase.after tries to remove indices created by the test:
519- logger .info ("test: now optimize" );
520- indicesAdmin ().prepareForceMerge ("test " ).get ();
521- flush ();
522- logger .info ("test: test done" );
523+ logger .info ("test throttle stats : now optimize" );
524+ indicesAdmin ().prepareForceMerge ("test_throttle_stats_index " ).get ();
525+ flush ("test_throttle_stats_index" );
526+ logger .info ("test throttle stats : test done" );
523527 }
524528
525529 public void testSimpleStats () throws Exception {
0 commit comments