From 9ad2375cd6b014975b0d11175febf817eb69a6f2 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Thu, 19 Jun 2025 17:51:13 +0300 Subject: [PATCH] Threadpool merge executor does not block aborted merges (#129613) This PR addresses a bug where aborted merges are blocked if there's insufficient disk space. Previously, the merge disk space estimation did not consider if the operation has been aborted when/while it was enqueued for execution. Consequently, aborted merges, for e.g. when closing a shard, were blocked if their disk space estimation was exceeding the available disk space threshold. In this case, the shard close operation would itself block. This fix estimates a disk space budget of `0` for aborted merges, and it periodically checks if any enqueued merge tasks have been aborted (more generally, it checks if the budget estimate for any merge tasks has changed, and reorders the queue if so). This way aborted merges are prioritized and are never blocked. Closes https://github.com/elastic/elasticsearch/issues/129335 --- .../index/engine/MergeWithLowDiskSpaceIT.java | 162 ++++++++++++++++++ .../ThreadPoolMergeExecutorService.java | 68 ++++++-- .../engine/ThreadPoolMergeScheduler.java | 9 +- ...oolMergeExecutorServiceDiskSpaceTests.java | 100 +++++++++++ .../elasticsearch/test/ESIntegTestCase.java | 49 ++++-- 5 files changed, 358 insertions(+), 30 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java new file mode 100644 index 0000000000000..c2687468a7f26 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.cluster.DiskUsageIntegTestCase; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.BeforeClass; + +import java.util.Locale; +import java.util.stream.IntStream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase { + protected static long MERGE_DISK_HIGH_WATERMARK_BYTES; + + @BeforeClass + public static void setAvailableDiskSpaceBufferLimit() { + // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges, + // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging + // operations at this high abstraction level (merging is triggered more or less automatically in the background) + MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + // only the threadpool-based merge scheduler has the capability to block merges when disk space is insufficient + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + // the very short disk space polling interval ensures timely blocking of merges + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "10ms") + // merges pile up more easily when there's only a few threads executing them + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), randomIntBetween(1, 2)) + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), MERGE_DISK_HIGH_WATERMARK_BYTES + "b") + // let's not worry about allocation watermarks (e.g. read-only shards) in this test suite + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "0b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "0b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b") + .build(); + } + + public void testShardCloseWhenDiskSpaceInsufficient() throws Exception { + String node = internalCluster().startNode(); + setTotalSpace(node, Long.MAX_VALUE); + var indicesService = internalCluster().getInstance(IndicesService.class, node); + ensureStableCluster(1); + // create index + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex( + indexName, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build() + ); + // do some indexing + indexRandom( + false, + false, + false, + false, + IntStream.range(1, randomIntBetween(2, 10)) + .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50))) + .toList() + ); + // get current disk space usage + IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); + long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes(); + // restrict the total disk space such that the next merge does not have sufficient disk space + long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L); + setTotalSpace(node, insufficientTotalDiskSpace); + // node stats' FS stats should report that there is insufficient disk space available + assertBusy(() -> { + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get(); + assertThat(nodesStatsResponse.getNodes().size(), equalTo(1)); + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); + assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace)); + assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES)); + }); + while (true) { + // maybe trigger a merge (this still depends on the merge policy, i.e. it is not 100% guaranteed) + assertNoFailures(indicesAdmin().prepareForceMerge(indexName).get()); + // keep indexing and ask for merging until node stats' threadpool stats reports enqueued merges, + // and the merge executor says they're blocked due to insufficient disk space if (nodesStatsResponse.getNodes() + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get(); + if (nodesStatsResponse.getNodes() + .getFirst() + .getThreadPool() + .stats() + .stream() + .filter(s -> ThreadPool.Names.MERGE.equals(s.name())) + .findAny() + .get() + .queue() > 0 + && indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()) { + break; + } + // more indexing + indexRandom( + false, + false, + false, + false, + IntStream.range(1, randomIntBetween(2, 10)) + .mapToObj(i -> prepareIndex(indexName).setSource("another_field", randomAlphaOfLength(50))) + .toList() + ); + } + // now delete the index in this state, i.e. with merges enqueued and blocked + assertAcked(indicesAdmin().prepareDelete(indexName).get()); + // index should now be gone + assertBusy(() -> { + expectThrows( + IndexNotFoundException.class, + () -> indicesAdmin().prepareGetIndex(TEST_REQUEST_TIMEOUT).setIndices(indexName).get() + ); + }); + assertBusy(() -> { + // merge thread pool should be done with the enqueue merge tasks + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get(); + assertThat( + nodesStatsResponse.getNodes() + .getFirst() + .getThreadPool() + .stats() + .stream() + .filter(s -> ThreadPool.Names.MERGE.equals(s.name())) + .findAny() + .get() + .queue(), + equalTo(0) + ); + // and the merge executor should also report that merging is done now + assertFalse(indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()); + assertTrue(indicesService.getThreadPoolMergeExecutorService().allDone()); + }); + } + + public void setTotalSpace(String dataNodeName, long totalSpace) { + getTestFileStore(dataNodeName).setTotalSpace(totalSpace); + refreshClusterInfo(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index c2b1f5e9abb4f..f6e9257200002 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -20,6 +20,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; import org.elasticsearch.monitor.fs.FsInfo; @@ -28,6 +29,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.IdentityHashMap; @@ -59,10 +61,7 @@ public class ThreadPoolMergeExecutorService implements Closeable { /** How frequently we check disk usage (default: 5 seconds). */ public static final Setting INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.positiveTimeSetting( "indices.merge.disk.check_interval", - // disabled by default - // there's currently a problem where (aborting) merges are blocked when shards are closed (because disk space is insufficient) - // see: https://github.com/elastic/elasticsearch/issues/129335 - TimeValue.timeValueSeconds(0), + TimeValue.timeValueSeconds(5), Property.Dynamic, Property.NodeScope ); @@ -294,6 +293,10 @@ public boolean allDone() { return queuedMergeTasks.isQueueEmpty() && runningMergeTasks.isEmpty() && ioThrottledMergeTasksCount.get() == 0L; } + public boolean isMergingBlockedDueToInsufficientDiskSpace() { + return availableDiskSpacePeriodicMonitor.isScheduled() && queuedMergeTasks.queueHeadIsOverTheAvailableBudget(); + } + /** * Enqueues a runnable that executes exactly one merge task, the smallest that is runnable at some point in time. * A merge task is not runnable if its scheduler already reached the configured max-allowed concurrency level. @@ -550,9 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold( static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget { MergeTaskPriorityBlockingQueue() { - // start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked) - // use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is - // updated according to the remaining disk space requirements of the currently running merge tasks + // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked) + // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated super(MergeTask::estimatedRemainingMergeSize, 0L); } @@ -563,7 +565,7 @@ long getAvailableBudget() { // exposed for tests MergeTask peekQueue() { - return enqueuedByBudget.peek(); + return enqueuedByBudget.peek().v1(); } } @@ -573,7 +575,7 @@ MergeTask peekQueue() { */ static class PriorityBlockingQueueWithBudget { private final ToLongFunction budgetFunction; - protected final PriorityQueue enqueuedByBudget; + protected final PriorityQueue> enqueuedByBudget; private final IdentityHashMap unreleasedBudgetPerElement; private final ReentrantLock lock; private final Condition elementAvailable; @@ -581,7 +583,7 @@ static class PriorityBlockingQueueWithBudget { PriorityBlockingQueueWithBudget(ToLongFunction budgetFunction, long initialAvailableBudget) { this.budgetFunction = budgetFunction; - this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(budgetFunction)); + this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(Tuple::v2)); this.unreleasedBudgetPerElement = new IdentityHashMap<>(); this.lock = new ReentrantLock(); this.elementAvailable = lock.newCondition(); @@ -592,7 +594,7 @@ boolean enqueue(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { - enqueuedByBudget.offer(e); + enqueuedByBudget.offer(new Tuple<>(e, budgetFunction.applyAsLong(e))); elementAvailable.signal(); } finally { lock.unlock(); @@ -608,14 +610,14 @@ ElementWithReleasableBudget take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - E peek; - long peekBudget; + Tuple head; // blocks until the smallest budget element fits the currently available budget - while ((peek = enqueuedByBudget.peek()) == null || (peekBudget = budgetFunction.applyAsLong(peek)) > availableBudget) { + while ((head = enqueuedByBudget.peek()) == null || head.v2() > availableBudget) { elementAvailable.await(); } + head = enqueuedByBudget.poll(); // deducts and holds up that element's budget from the available budget - return newElementWithReleasableBudget(enqueuedByBudget.poll(), peekBudget); + return newElementWithReleasableBudget(head.v1(), head.v2()); } finally { lock.unlock(); } @@ -623,7 +625,7 @@ ElementWithReleasableBudget take() throws InterruptedException { /** * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements - * that are still in use. The budget of in-use elements is also updated (by re-applying the budget function). + * that are still in use. The elements budget is also updated by re-applying the budget function. * The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element * is over this newly computed available budget. */ @@ -632,9 +634,11 @@ void updateBudget(long availableBudget) { lock.lock(); try { this.availableBudget = availableBudget; - // update the per-element budget (these are all the elements that are using any budget) + // updates the budget of enqueued elements (and possibly reorders the priority queue) + updateBudgetOfEnqueuedElementsAndReorderQueue(); + // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget) unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element())); - // available budget is decreased by the used per-element budget (for all dequeued elements that are still in use) + // the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use) this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum(); elementAvailable.signalAll(); } finally { @@ -642,10 +646,38 @@ void updateBudget(long availableBudget) { } } + private void updateBudgetOfEnqueuedElementsAndReorderQueue() { + assert this.lock.isHeldByCurrentThread(); + int queueSizeBefore = enqueuedByBudget.size(); + var it = enqueuedByBudget.iterator(); + List> elementsToReorder = new ArrayList<>(); + while (it.hasNext()) { + var elementWithBudget = it.next(); + Long previousBudget = elementWithBudget.v2(); + long latestBudget = budgetFunction.applyAsLong(elementWithBudget.v1()); + if (previousBudget.equals(latestBudget) == false) { + // the budget (estimation) of an enqueued element has changed + // this element will be reordered by removing and reinserting using the latest budget (estimation) + it.remove(); + elementsToReorder.add(new Tuple<>(elementWithBudget.v1(), latestBudget)); + } + } + // reinsert elements based on the latest budget (estimation) + for (var reorderedElement : elementsToReorder) { + enqueuedByBudget.offer(reorderedElement); + } + assert queueSizeBefore == enqueuedByBudget.size(); + } + boolean isQueueEmpty() { return enqueuedByBudget.isEmpty(); } + boolean queueHeadIsOverTheAvailableBudget() { + var head = enqueuedByBudget.peek(); + return head != null && head.v2() > availableBudget; + } + int queueSize() { return enqueuedByBudget.size(); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index a73d19181a57a..b21172c7007e8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -537,8 +537,13 @@ void abort() { long estimatedRemainingMergeSize() { // TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges, // or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized? - long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes(); - return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten()); + if (onGoingMerge.getMerge().isAborted()) { + // if the merge is aborted the assumption is that merging will soon stop with negligible further writing + return 0L; + } else { + long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes(); + return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten()); + } } public long getMergeMemoryEstimateBytes() { diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java index d5df3b9e95607..6eaafd6e3a3ac 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java @@ -896,6 +896,106 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro } } + public void testEnqueuedMergeTasksAreUnblockedWhenEstimatedMergeSizeChanges() throws Exception { + long diskSpaceLimitBytes = randomLongBetween(10L, 100L); + aFileStore.usableSpace = diskSpaceLimitBytes + randomLongBetween(1L, 100L); + aFileStore.totalSpace = aFileStore.usableSpace + randomLongBetween(1L, 10L); + bFileStore.usableSpace = diskSpaceLimitBytes + randomLongBetween(1L, 100L); + bFileStore.totalSpace = bFileStore.usableSpace + randomLongBetween(1L, 10L); + boolean aHasMoreSpace = aFileStore.usableSpace > bFileStore.usableSpace; + Settings.Builder settingsBuilder = Settings.builder().put(settings); + settingsBuilder.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), diskSpaceLimitBytes + "b"); + try ( + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService + .maybeCreateThreadPoolMergeExecutorService( + testThreadPool, + ClusterSettings.createBuiltInClusterSettings(settingsBuilder.build()), + nodeEnvironment + ) + ) { + assert threadPoolMergeExecutorService != null; + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), greaterThanOrEqualTo(1)); + final long availableBudget = aHasMoreSpace + ? aFileStore.usableSpace - diskSpaceLimitBytes + : bFileStore.usableSpace - diskSpaceLimitBytes; + final AtomicLong expectedAvailableBudget = new AtomicLong(availableBudget); + assertBusy( + () -> assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())) + ); + List tasksRunList = new ArrayList<>(); + List tasksAbortList = new ArrayList<>(); + int submittedMergesCount = randomIntBetween(1, 5); + long[] mergeSizeEstimates = new long[submittedMergesCount]; + for (int i = 0; i < submittedMergesCount; i++) { + // all these merge estimates are over-budget + mergeSizeEstimates[i] = availableBudget + randomLongBetween(1L, 10L); + } + for (int i = 0; i < submittedMergesCount;) { + ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class); + when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean()); + doAnswer(mock -> { + Schedule schedule = randomFrom(Schedule.values()); + if (schedule == BACKLOG) { + testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + // re-enqueue backlogged merge task + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask); + }); + } else if (schedule == RUN) { + tasksRunList.add(mergeTask); + } else if (schedule == ABORT) { + tasksAbortList.add(mergeTask); + } + return schedule; + }).when(mergeTask).schedule(); + // randomly let some task complete + if (randomBoolean()) { + // this task is not blocked + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(0L, availableBudget)); + } else { + // this task will initially be blocked because over-budget + int finalI = i; + doAnswer(mock -> mergeSizeEstimates[finalI]).when(mergeTask).estimatedRemainingMergeSize(); + i++; + } + assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask)); + } + // assert tasks are blocked because their estimated merge size is over the available budget + assertBusy(() -> { + assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(submittedMergesCount)); + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(availableBudget)); + assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(0)); + }); + // change estimates to be under the available budget + for (int i = 0; i < submittedMergesCount; i++) { + mergeSizeEstimates[i] = randomLongBetween(0L, availableBudget); + } + // assert tasks are all unblocked because their estimated merge size is now under the available budget + assertBusy(() -> { + assertFalse(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0)); + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(availableBudget)); + }); + // assert all merge tasks are either run or aborted + assertBusy(() -> { + for (ThreadPoolMergeScheduler.MergeTask mergeTask : tasksRunList) { + verify(mergeTask, times(1)).run(); + verify(mergeTask, times(0)).abort(); + } + for (ThreadPoolMergeScheduler.MergeTask mergeTask : tasksAbortList) { + verify(mergeTask, times(0)).run(); + verify(mergeTask, times(1)).abort(); + } + }); + } + if (setThreadPoolMergeSchedulerSetting) { + assertWarnings( + "[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch " + + "and will be removed in a future release. See the breaking changes documentation for the next major version." + ); + } + } + public void testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable() throws Exception { aFileStore.totalSpace = randomLongBetween(300L, 1_000L); bFileStore.totalSpace = randomLongBetween(300L, 1_000L); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 9be119a4cbf35..a9c8a9043cccd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1754,7 +1754,7 @@ public void indexRandom(boolean forceRefresh, List builders * @param builders the documents to index. */ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List builders) { - indexRandom(forceRefresh, dummyDocuments, true, builders); + indexRandom(forceRefresh, dummyDocuments, true, true, builders); } /** @@ -1764,13 +1764,37 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List builders) { + indexRandom(forceRefresh, dummyDocuments, maybeFlush, true, builders); + } + + /** + * Indexes the given {@link IndexRequestBuilder} instances randomly. It shuffles the given builders and either + * indexes them in a blocking or async fashion. This is very useful to catch problems that relate to internal document + * ids or index segment creations. Some features might have bug when a given document is the first or the last in a + * segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index + * layout. + * + * @param forceRefresh if {@code true} all involved indices are refreshed once the documents are indexed. + * @param dummyDocuments if {@code true} some empty dummy documents may be randomly inserted into the document list and deleted once + * all documents are indexed. This is useful to produce deleted documents on the server side. + * @param maybeFlush if {@code true} this method may randomly execute full flushes after index operations. + * @param maybeForceMerge if {@code true} this method may randomly execute force merges after index operations. + * @param builders the documents to index. + */ + public void indexRandom( + boolean forceRefresh, + boolean dummyDocuments, + boolean maybeFlush, + boolean maybeForceMerge, + List builders + ) { Random random = random(); Set indices = new HashSet<>(); builders = new ArrayList<>(builders); @@ -1803,13 +1827,13 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma new LatchedActionListener(ActionListener.noop(), newLatch(inFlightAsyncOperations)) .delegateResponse((l, e) -> fail(e)) ); - postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush); + postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush, maybeForceMerge); } } else { logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, false); for (IndexRequestBuilder indexRequestBuilder : builders) { indexRequestBuilder.get(); - postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush); + postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush, maybeForceMerge); } } } else { @@ -1888,7 +1912,12 @@ private static CountDownLatch newLatch(List latches) { /** * Maybe refresh, force merge, or flush then always make sure there aren't too many in flight async operations. */ - private void postIndexAsyncActions(String[] indices, List inFlightAsyncOperations, boolean maybeFlush) { + private void postIndexAsyncActions( + String[] indices, + List inFlightAsyncOperations, + boolean maybeFlush, + boolean maybeForceMerge + ) { if (rarely()) { if (rarely()) { indicesAdmin().prepareRefresh(indices) @@ -1898,7 +1927,7 @@ private void postIndexAsyncActions(String[] indices, List inFlig indicesAdmin().prepareFlush(indices) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .execute(new LatchedActionListener<>(ActionListener.noop(), newLatch(inFlightAsyncOperations))); - } else if (rarely()) { + } else if (maybeForceMerge && rarely()) { indicesAdmin().prepareForceMerge(indices) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setMaxNumSegments(between(1, 10))