|
73 | 73 | import java.util.concurrent.TimeUnit; |
74 | 74 | import java.util.concurrent.atomic.AtomicBoolean; |
75 | 75 | import java.util.concurrent.atomic.AtomicInteger; |
| 76 | +import java.util.concurrent.atomic.AtomicLong; |
76 | 77 | import java.util.concurrent.atomic.AtomicReference; |
77 | 78 |
|
78 | 79 | import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; |
@@ -568,50 +569,61 @@ public void testNonThrottleStats() throws Exception { |
568 | 569 |
|
569 | 570 | public void testThrottleStats() throws Exception { |
570 | 571 | assertAcked( |
571 | | - prepareCreate("test").setSettings( |
| 572 | + prepareCreate("test_throttle_stats_index").setSettings( |
572 | 573 | settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") |
573 | 574 | .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") |
574 | 575 | .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") |
575 | 576 | .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") |
576 | 577 | .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1") |
577 | 578 | .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "1") |
| 579 | + .put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "true") |
578 | 580 | .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name()) |
579 | 581 | ) |
580 | 582 | ); |
581 | | - ensureGreen(); |
582 | | - long termUpto = 0; |
583 | | - IndicesStatsResponse stats; |
| 583 | + ensureGreen("test_throttle_stats_index"); |
584 | 584 | // make sure we see throttling kicking in: |
585 | | - boolean done = false; |
586 | | - long start = System.currentTimeMillis(); |
587 | | - while (done == false) { |
588 | | - for (int i = 0; i < 100; i++) { |
589 | | - // Provoke slowish merging by making many unique terms: |
590 | | - StringBuilder sb = new StringBuilder(); |
591 | | - for (int j = 0; j < 100; j++) { |
592 | | - sb.append(' '); |
593 | | - sb.append(termUpto++); |
594 | | - } |
595 | | - client().prepareIndex("test", "type", "" + termUpto).setSource("field" + (i % 10), sb.toString()).get(); |
596 | | - if (i % 2 == 0) { |
597 | | - refresh(); |
| 585 | + AtomicBoolean done = new AtomicBoolean(); |
| 586 | + AtomicLong termUpTo = new AtomicLong(); |
| 587 | + Thread[] indexingThreads = new Thread[5]; |
| 588 | + for (int threadIdx = 0; threadIdx < indexingThreads.length; threadIdx++) { |
| 589 | + indexingThreads[threadIdx] = new Thread(() -> { |
| 590 | + while (done.get() == false) { |
| 591 | + for (int i = 0; i < 100; i++) { |
| 592 | + // Provoke slowish merging by making many unique terms: |
| 593 | + StringBuilder sb = new StringBuilder(); |
| 594 | + for (int j = 0; j < 100; j++) { |
| 595 | + sb.append(' '); |
| 596 | + sb.append(termUpTo.incrementAndGet()); |
| 597 | + } |
| 598 | + client().prepareIndex("test_throttle_stats_index", "type", "" + termUpTo.get()) |
| 599 | + .setSource("field" + (i % 10), sb.toString()) |
| 600 | + .get(); |
| 601 | + if (i % 2 == 0) { |
| 602 | + refresh("test_throttle_stats_index"); |
| 603 | + } |
| 604 | + } |
| 605 | + refresh("test_throttle_stats_index"); |
598 | 606 | } |
599 | | - } |
600 | | - refresh(); |
601 | | - stats = client().admin().indices().prepareStats().execute().actionGet(); |
602 | | - // nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); |
603 | | - done = stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0; |
604 | | - if (System.currentTimeMillis() - start > 300 * 1000) { // Wait 5 minutes for throttling to kick in |
605 | | - fail("index throttling didn't kick in after 5 minutes of intense merging"); |
606 | | - } |
| 607 | + }); |
| 608 | + indexingThreads[threadIdx].start(); |
| 609 | + } |
| 610 | + |
| 611 | + assertBusy(() -> { |
| 612 | + IndicesStatsResponse stats = client().admin().indices().prepareStats("test_throttle_stats_index").get(); |
| 613 | + assertTrue(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0); |
| 614 | + done.set(true); |
| 615 | + }, 5L, TimeUnit.MINUTES); |
| 616 | + |
| 617 | + for (Thread indexingThread : indexingThreads) { |
| 618 | + indexingThread.join(); |
607 | 619 | } |
608 | 620 |
|
609 | 621 | // Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked" |
610 | 622 | // when ESIntegTestCase.after tries to remove indices created by the test: |
611 | | - logger.info("test: now optimize"); |
612 | | - client().admin().indices().prepareForceMerge("test").get(); |
613 | | - flush(); |
614 | | - logger.info("test: test done"); |
| 623 | + logger.info("test throttle stats: now optimize"); |
| 624 | + client().admin().indices().prepareForceMerge("test_throttle_stats_index").get(); |
| 625 | + flush("test_throttle_stats_index"); |
| 626 | + logger.info("test throttle stats: test done"); |
615 | 627 | } |
616 | 628 |
|
617 | 629 | public void testSimpleStats() throws Exception { |
|
0 commit comments