diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java index e956999ebf2d2..b5f10002e217e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -79,6 +79,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; @@ -466,7 +467,7 @@ public void testNonThrottleStats() throws Exception { assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis(), equalTo(0L)); } - public void testThrottleStats() throws Exception { + public void testThrottleStats() { assertAcked( prepareCreate("test").setSettings( settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") @@ -479,31 +480,38 @@ public void testThrottleStats() throws Exception { ) ); ensureGreen(); - long termUpto = 0; - IndicesStatsResponse stats; // make sure we see throttling kicking in: - boolean done = false; + AtomicBoolean done = new AtomicBoolean(); + AtomicLong termUpTo = new AtomicLong(); long start = System.currentTimeMillis(); - while (done == false) { - for (int i = 0; i < 100; i++) { - // Provoke slowish merging by making many unique terms: - StringBuilder sb = new StringBuilder(); - for (int j = 0; j < 100; j++) { - sb.append(' '); - sb.append(termUpto++); - } - prepareIndex("test").setId("" + termUpto).setSource("field" + (i % 10), sb.toString()).get(); - if (i % 2 == 0) { + for (int threadIdx = 0; threadIdx < 5; threadIdx++) { + int finalThreadIdx = threadIdx; + new Thread(() -> { + IndicesStatsResponse stats; + while (done.get() == false) { + for (int i = 0; i < 100; i++) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for (int j = 0; j < 100; j++) { + sb.append(' '); + sb.append(termUpTo.incrementAndGet()); + } + prepareIndex("test").setId("" + termUpTo.get()).setSource("field" + (i % 10), sb.toString()).get(); + if (i % 2 == 0) { + refresh(); + } + } refresh(); + if (finalThreadIdx == 0) { + stats = indicesAdmin().prepareStats().get(); + done.set(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0); + } + if (System.currentTimeMillis() - start > 300 * 1000) { // Wait 5 minutes for throttling to kick in + done.set(true); + fail("index throttling didn't kick in after 5 minutes of intense merging"); + } } - } - refresh(); - stats = indicesAdmin().prepareStats().get(); - // nodesStats = clusterAdmin().prepareNodesStats().setIndices(true).get(); - done = stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0; - if (System.currentTimeMillis() - start > 300 * 1000) { // Wait 5 minutes for throttling to kick in - fail("index throttling didn't kick in after 5 minutes of intense merging"); - } + }).start(); } // Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked" diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 5217edb5490dc..7c78698ac6f66 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -272,6 +272,10 @@ interface UpdateConsumer { } } + public boolean usingMaxTargetIORateBytesPerSec() { + return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get(); + } + // exposed for tests Set getRunningMergeTasks() { return runningMergeTasks; diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 8cfdc59268365..f645edaff64a8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -50,7 +50,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics ); private final ShardId shardId; private final MergeSchedulerConfig config; - private final Logger logger; + protected final Logger logger; private final MergeTracking mergeTracking; private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final PriorityQueue backloggedMergeTasks = new PriorityQueue<>( @@ -191,7 +191,10 @@ private void checkMergeTaskThrottling() { int configuredMaxMergeCount = config.getMaxMergeCount(); // both currently running and enqueued merge tasks are considered "active" for throttling purposes int activeMerges = (int) (submittedMergesCount - doneMergesCount); - if (activeMerges > configuredMaxMergeCount && shouldThrottleIncomingMerges.get() == false) { + if (activeMerges > configuredMaxMergeCount + // only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load + && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() + && shouldThrottleIncomingMerges.get() == false) { // maybe enable merge task throttling synchronized (shouldThrottleIncomingMerges) { if (shouldThrottleIncomingMerges.getAndSet(true) == false) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 5e085c083b785..ae9168357eb32 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -173,6 +173,165 @@ public void testSimpleMergeTaskReEnqueueingBySize() { } } + public void testIndexingThrottlingWhenSubmittingMerges() { + final int maxThreadCount = randomIntBetween(1, 5); + // settings validation requires maxMergeCount >= maxThreadCount + final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5); + List submittedMergeTasks = new ArrayList<>(); + AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService( + submittedMergeTasks, + isUsingMaxTargetIORate + ); + Settings mergeSchedulerSettings = Settings.builder() + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount) + .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount) + .build(); + TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), + threadPoolMergeExecutorService + ); + // make sure there are more merges submitted than the max merge count limit (which triggers IO throttling) + int excessMerges = randomIntBetween(1, 10); + int mergesToSubmit = maxMergeCount + excessMerges; + boolean expectIndexThrottling = false; + int submittedMerges = 0; + // merges are submitted, while some are also scheduled (but none is run) + while (submittedMerges < mergesToSubmit - 1) { + isUsingMaxTargetIORate.set(randomBoolean()); + if (submittedMergeTasks.isEmpty() == false && randomBoolean()) { + // maybe schedule one submitted merge + MergeTask mergeTask = randomFrom(submittedMergeTasks); + submittedMergeTasks.remove(mergeTask); + mergeTask.schedule(); + } else { + // submit one merge + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + submittedMerges++; + if (isUsingMaxTargetIORate.get() && submittedMerges > maxMergeCount) { + expectIndexThrottling = true; + } else if (submittedMerges <= maxMergeCount) { + expectIndexThrottling = false; + } + } + // assert IO throttle state + assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); + } + // submit one last merge when IO throttling is at max value + isUsingMaxTargetIORate.set(true); + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + // assert index throttling because IO throttling is at max value + assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(true)); + } + + public void testIndexingThrottlingWhileMergesAreRunning() { + final int maxThreadCount = randomIntBetween(1, 5); + // settings validation requires maxMergeCount >= maxThreadCount + final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5); + List submittedMergeTasks = new ArrayList<>(); + List scheduledToRunMergeTasks = new ArrayList<>(); + AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService( + submittedMergeTasks, + isUsingMaxTargetIORate + ); + Settings mergeSchedulerSettings = Settings.builder() + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount) + .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount) + .build(); + TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), + threadPoolMergeExecutorService + ); + int mergesToRun = randomIntBetween(0, 5); + // make sure there are more merges submitted and not run + int excessMerges = randomIntBetween(1, 10); + int mergesToSubmit = maxMergeCount + mergesToRun + excessMerges; + int mergesOutstanding = 0; + boolean expectIndexThrottling = false; + // merges are submitted, while some are also scheduled and run + while (mergesToSubmit > 0) { + isUsingMaxTargetIORate.set(randomBoolean()); + if (submittedMergeTasks.isEmpty() == false && randomBoolean()) { + // maybe schedule one submitted merge + MergeTask mergeTask = randomFrom(submittedMergeTasks); + submittedMergeTasks.remove(mergeTask); + Schedule schedule = mergeTask.schedule(); + if (schedule == Schedule.RUN) { + scheduledToRunMergeTasks.add(mergeTask); + } + } else { + if (mergesToRun > 0 && scheduledToRunMergeTasks.isEmpty() == false && randomBoolean()) { + // maybe run one scheduled merge + MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks); + scheduledToRunMergeTasks.remove(mergeTask); + mergeTask.run(); + mergesToRun--; + mergesOutstanding--; + } else { + // submit one merge + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + mergesToSubmit--; + mergesOutstanding++; + } + if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) { + expectIndexThrottling = true; + } else if (mergesOutstanding <= maxMergeCount) { + expectIndexThrottling = false; + } + } + // assert IO throttle state + assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); + } + // execute all remaining merges (submitted or scheduled) + while (mergesToRun > 0 || submittedMergeTasks.isEmpty() == false || scheduledToRunMergeTasks.isEmpty() == false) { + // simulate that the {@link ThreadPoolMergeExecutorService} maybe peaked IO un-throttling + isUsingMaxTargetIORate.set(randomBoolean()); + if (submittedMergeTasks.isEmpty() == false && (scheduledToRunMergeTasks.isEmpty() || randomBoolean())) { + // maybe schedule one submitted merge + MergeTask mergeTask = randomFrom(submittedMergeTasks); + submittedMergeTasks.remove(mergeTask); + Schedule schedule = mergeTask.schedule(); + if (schedule == Schedule.RUN) { + scheduledToRunMergeTasks.add(mergeTask); + } + } else { + // maybe run one scheduled merge + MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks); + scheduledToRunMergeTasks.remove(mergeTask); + mergeTask.run(); + mergesToRun--; + mergesOutstanding--; + if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) { + expectIndexThrottling = true; + } else if (mergesOutstanding <= maxMergeCount) { + expectIndexThrottling = false; + } + } + // assert IO throttle state + assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); + } + // all merges done + assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(false)); + } + public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception { // test with min 2 allowed concurrent merges int mergeExecutorThreadCount = randomIntBetween(2, 5); @@ -493,4 +652,49 @@ private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) { private static MergeInfo getNewMergeInfo(long estimatedMergeBytes, int maxNumSegments) { return new MergeInfo(randomNonNegativeInt(), estimatedMergeBytes, randomBoolean(), maxNumSegments); } + + static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler { + AtomicBoolean isIndexingThrottlingEnabled = new AtomicBoolean(false); + + TestThreadPoolMergeScheduler( + ShardId shardId, + IndexSettings indexSettings, + ThreadPoolMergeExecutorService threadPoolMergeExecutorService + ) { + super(shardId, indexSettings, threadPoolMergeExecutorService); + } + + @Override + protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { + isIndexingThrottlingEnabled.set(true); + } + + @Override + protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { + isIndexingThrottlingEnabled.set(false); + } + + boolean isIndexingThrottlingEnabled() { + return isIndexingThrottlingEnabled.get(); + } + } + + static ThreadPoolMergeExecutorService mockThreadPoolMergeExecutorService( + List submittedMergeTasks, + AtomicBoolean isUsingMaxTargetIORate + ) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + doAnswer(invocation -> { + MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; + submittedMergeTasks.add(mergeTask); + return null; + }).when(threadPoolMergeExecutorService).submitMergeTask(any(MergeTask.class)); + doAnswer(invocation -> { + MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; + submittedMergeTasks.add(mergeTask); + return null; + }).when(threadPoolMergeExecutorService).reEnqueueBackloggedMergeTask(any(MergeTask.class)); + doAnswer(invocation -> isUsingMaxTargetIORate.get()).when(threadPoolMergeExecutorService).usingMaxTargetIORateBytesPerSec(); + return threadPoolMergeExecutorService; + } }