diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java new file mode 100644 index 0000000000000..c2687468a7f26 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.cluster.DiskUsageIntegTestCase; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.BeforeClass; + +import java.util.Locale; +import java.util.stream.IntStream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase { + protected static long MERGE_DISK_HIGH_WATERMARK_BYTES; + + @BeforeClass + public static void setAvailableDiskSpaceBufferLimit() { + // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges, + // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging + // operations at this high abstraction level (merging is triggered more or less automatically in the background) + MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + // only the threadpool-based merge scheduler has the capability to block merges when disk space is insufficient + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + // the very short disk space polling interval ensures timely blocking of merges + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "10ms") + // merges pile up more easily when there's only a few threads executing them + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), randomIntBetween(1, 2)) + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), MERGE_DISK_HIGH_WATERMARK_BYTES + "b") + // let's not worry about allocation watermarks (e.g. read-only shards) in this test suite + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "0b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "0b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b") + .build(); + } + + public void testShardCloseWhenDiskSpaceInsufficient() throws Exception { + String node = internalCluster().startNode(); + setTotalSpace(node, Long.MAX_VALUE); + var indicesService = internalCluster().getInstance(IndicesService.class, node); + ensureStableCluster(1); + // create index + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex( + indexName, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build() + ); + // do some indexing + indexRandom( + false, + false, + false, + false, + IntStream.range(1, randomIntBetween(2, 10)) + .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50))) + .toList() + ); + // get current disk space usage + IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); + long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes(); + // restrict the total disk space such that the next merge does not have sufficient disk space + long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L); + setTotalSpace(node, insufficientTotalDiskSpace); + // node stats' FS stats should report that there is insufficient disk space available + assertBusy(() -> { + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get(); + assertThat(nodesStatsResponse.getNodes().size(), equalTo(1)); + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); + assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace)); + assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES)); + }); + while (true) { + // maybe trigger a merge (this still depends on the merge policy, i.e. it is not 100% guaranteed) + assertNoFailures(indicesAdmin().prepareForceMerge(indexName).get()); + // keep indexing and ask for merging until node stats' threadpool stats reports enqueued merges, + // and the merge executor says they're blocked due to insufficient disk space if (nodesStatsResponse.getNodes() + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get(); + if (nodesStatsResponse.getNodes() + .getFirst() + .getThreadPool() + .stats() + .stream() + .filter(s -> ThreadPool.Names.MERGE.equals(s.name())) + .findAny() + .get() + .queue() > 0 + && indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()) { + break; + } + // more indexing + indexRandom( + false, + false, + false, + false, + IntStream.range(1, randomIntBetween(2, 10)) + .mapToObj(i -> prepareIndex(indexName).setSource("another_field", randomAlphaOfLength(50))) + .toList() + ); + } + // now delete the index in this state, i.e. with merges enqueued and blocked + assertAcked(indicesAdmin().prepareDelete(indexName).get()); + // index should now be gone + assertBusy(() -> { + expectThrows( + IndexNotFoundException.class, + () -> indicesAdmin().prepareGetIndex(TEST_REQUEST_TIMEOUT).setIndices(indexName).get() + ); + }); + assertBusy(() -> { + // merge thread pool should be done with the enqueue merge tasks + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get(); + assertThat( + nodesStatsResponse.getNodes() + .getFirst() + .getThreadPool() + .stats() + .stream() + .filter(s -> ThreadPool.Names.MERGE.equals(s.name())) + .findAny() + .get() + .queue(), + equalTo(0) + ); + // and the merge executor should also report that merging is done now + assertFalse(indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()); + assertTrue(indicesService.getThreadPoolMergeExecutorService().allDone()); + }); + } + + public void setTotalSpace(String dataNodeName, long totalSpace) { + getTestFileStore(dataNodeName).setTotalSpace(totalSpace); + refreshClusterInfo(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index c2b1f5e9abb4f..f6e9257200002 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -20,6 +20,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; import org.elasticsearch.monitor.fs.FsInfo; @@ -28,6 +29,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.IdentityHashMap; @@ -59,10 +61,7 @@ public class ThreadPoolMergeExecutorService implements Closeable { /** How frequently we check disk usage (default: 5 seconds). */ public static final Setting INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.positiveTimeSetting( "indices.merge.disk.check_interval", - // disabled by default - // there's currently a problem where (aborting) merges are blocked when shards are closed (because disk space is insufficient) - // see: https://github.com/elastic/elasticsearch/issues/129335 - TimeValue.timeValueSeconds(0), + TimeValue.timeValueSeconds(5), Property.Dynamic, Property.NodeScope ); @@ -294,6 +293,10 @@ public boolean allDone() { return queuedMergeTasks.isQueueEmpty() && runningMergeTasks.isEmpty() && ioThrottledMergeTasksCount.get() == 0L; } + public boolean isMergingBlockedDueToInsufficientDiskSpace() { + return availableDiskSpacePeriodicMonitor.isScheduled() && queuedMergeTasks.queueHeadIsOverTheAvailableBudget(); + } + /** * Enqueues a runnable that executes exactly one merge task, the smallest that is runnable at some point in time. * A merge task is not runnable if its scheduler already reached the configured max-allowed concurrency level. @@ -550,9 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold( static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget { MergeTaskPriorityBlockingQueue() { - // start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked) - // use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is - // updated according to the remaining disk space requirements of the currently running merge tasks + // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked) + // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated super(MergeTask::estimatedRemainingMergeSize, 0L); } @@ -563,7 +565,7 @@ long getAvailableBudget() { // exposed for tests MergeTask peekQueue() { - return enqueuedByBudget.peek(); + return enqueuedByBudget.peek().v1(); } } @@ -573,7 +575,7 @@ MergeTask peekQueue() { */ static class PriorityBlockingQueueWithBudget { private final ToLongFunction budgetFunction; - protected final PriorityQueue enqueuedByBudget; + protected final PriorityQueue> enqueuedByBudget; private final IdentityHashMap unreleasedBudgetPerElement; private final ReentrantLock lock; private final Condition elementAvailable; @@ -581,7 +583,7 @@ static class PriorityBlockingQueueWithBudget { PriorityBlockingQueueWithBudget(ToLongFunction budgetFunction, long initialAvailableBudget) { this.budgetFunction = budgetFunction; - this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(budgetFunction)); + this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(Tuple::v2)); this.unreleasedBudgetPerElement = new IdentityHashMap<>(); this.lock = new ReentrantLock(); this.elementAvailable = lock.newCondition(); @@ -592,7 +594,7 @@ boolean enqueue(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { - enqueuedByBudget.offer(e); + enqueuedByBudget.offer(new Tuple<>(e, budgetFunction.applyAsLong(e))); elementAvailable.signal(); } finally { lock.unlock(); @@ -608,14 +610,14 @@ ElementWithReleasableBudget take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - E peek; - long peekBudget; + Tuple head; // blocks until the smallest budget element fits the currently available budget - while ((peek = enqueuedByBudget.peek()) == null || (peekBudget = budgetFunction.applyAsLong(peek)) > availableBudget) { + while ((head = enqueuedByBudget.peek()) == null || head.v2() > availableBudget) { elementAvailable.await(); } + head = enqueuedByBudget.poll(); // deducts and holds up that element's budget from the available budget - return newElementWithReleasableBudget(enqueuedByBudget.poll(), peekBudget); + return newElementWithReleasableBudget(head.v1(), head.v2()); } finally { lock.unlock(); } @@ -623,7 +625,7 @@ ElementWithReleasableBudget take() throws InterruptedException { /** * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements - * that are still in use. The budget of in-use elements is also updated (by re-applying the budget function). + * that are still in use. The elements budget is also updated by re-applying the budget function. * The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element * is over this newly computed available budget. */ @@ -632,9 +634,11 @@ void updateBudget(long availableBudget) { lock.lock(); try { this.availableBudget = availableBudget; - // update the per-element budget (these are all the elements that are using any budget) + // updates the budget of enqueued elements (and possibly reorders the priority queue) + updateBudgetOfEnqueuedElementsAndReorderQueue(); + // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget) unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element())); - // available budget is decreased by the used per-element budget (for all dequeued elements that are still in use) + // the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use) this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum(); elementAvailable.signalAll(); } finally { @@ -642,10 +646,38 @@ void updateBudget(long availableBudget) { } } + private void updateBudgetOfEnqueuedElementsAndReorderQueue() { + assert this.lock.isHeldByCurrentThread(); + int queueSizeBefore = enqueuedByBudget.size(); + var it = enqueuedByBudget.iterator(); + List> elementsToReorder = new ArrayList<>(); + while (it.hasNext()) { + var elementWithBudget = it.next(); + Long previousBudget = elementWithBudget.v2(); + long latestBudget = budgetFunction.applyAsLong(elementWithBudget.v1()); + if (previousBudget.equals(latestBudget) == false) { + // the budget (estimation) of an enqueued element has changed + // this element will be reordered by removing and reinserting using the latest budget (estimation) + it.remove(); + elementsToReorder.add(new Tuple<>(elementWithBudget.v1(), latestBudget)); + } + } + // reinsert elements based on the latest budget (estimation) + for (var reorderedElement : elementsToReorder) { + enqueuedByBudget.offer(reorderedElement); + } + assert queueSizeBefore == enqueuedByBudget.size(); + } + boolean isQueueEmpty() { return enqueuedByBudget.isEmpty(); } + boolean queueHeadIsOverTheAvailableBudget() { + var head = enqueuedByBudget.peek(); + return head != null && head.v2() > availableBudget; + } + int queueSize() { return enqueuedByBudget.size(); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index a73d19181a57a..b21172c7007e8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -537,8 +537,13 @@ void abort() { long estimatedRemainingMergeSize() { // TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges, // or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized? - long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes(); - return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten()); + if (onGoingMerge.getMerge().isAborted()) { + // if the merge is aborted the assumption is that merging will soon stop with negligible further writing + return 0L; + } else { + long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes(); + return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten()); + } } public long getMergeMemoryEstimateBytes() { diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java index d5df3b9e95607..6eaafd6e3a3ac 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java @@ -896,6 +896,106 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro } } + public void testEnqueuedMergeTasksAreUnblockedWhenEstimatedMergeSizeChanges() throws Exception { + long diskSpaceLimitBytes = randomLongBetween(10L, 100L); + aFileStore.usableSpace = diskSpaceLimitBytes + randomLongBetween(1L, 100L); + aFileStore.totalSpace = aFileStore.usableSpace + randomLongBetween(1L, 10L); + bFileStore.usableSpace = diskSpaceLimitBytes + randomLongBetween(1L, 100L); + bFileStore.totalSpace = bFileStore.usableSpace + randomLongBetween(1L, 10L); + boolean aHasMoreSpace = aFileStore.usableSpace > bFileStore.usableSpace; + Settings.Builder settingsBuilder = Settings.builder().put(settings); + settingsBuilder.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), diskSpaceLimitBytes + "b"); + try ( + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService + .maybeCreateThreadPoolMergeExecutorService( + testThreadPool, + ClusterSettings.createBuiltInClusterSettings(settingsBuilder.build()), + nodeEnvironment + ) + ) { + assert threadPoolMergeExecutorService != null; + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), greaterThanOrEqualTo(1)); + final long availableBudget = aHasMoreSpace + ? aFileStore.usableSpace - diskSpaceLimitBytes + : bFileStore.usableSpace - diskSpaceLimitBytes; + final AtomicLong expectedAvailableBudget = new AtomicLong(availableBudget); + assertBusy( + () -> assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())) + ); + List tasksRunList = new ArrayList<>(); + List tasksAbortList = new ArrayList<>(); + int submittedMergesCount = randomIntBetween(1, 5); + long[] mergeSizeEstimates = new long[submittedMergesCount]; + for (int i = 0; i < submittedMergesCount; i++) { + // all these merge estimates are over-budget + mergeSizeEstimates[i] = availableBudget + randomLongBetween(1L, 10L); + } + for (int i = 0; i < submittedMergesCount;) { + ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class); + when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean()); + doAnswer(mock -> { + Schedule schedule = randomFrom(Schedule.values()); + if (schedule == BACKLOG) { + testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + // re-enqueue backlogged merge task + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask); + }); + } else if (schedule == RUN) { + tasksRunList.add(mergeTask); + } else if (schedule == ABORT) { + tasksAbortList.add(mergeTask); + } + return schedule; + }).when(mergeTask).schedule(); + // randomly let some task complete + if (randomBoolean()) { + // this task is not blocked + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(0L, availableBudget)); + } else { + // this task will initially be blocked because over-budget + int finalI = i; + doAnswer(mock -> mergeSizeEstimates[finalI]).when(mergeTask).estimatedRemainingMergeSize(); + i++; + } + assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask)); + } + // assert tasks are blocked because their estimated merge size is over the available budget + assertBusy(() -> { + assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(submittedMergesCount)); + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(availableBudget)); + assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(0)); + }); + // change estimates to be under the available budget + for (int i = 0; i < submittedMergesCount; i++) { + mergeSizeEstimates[i] = randomLongBetween(0L, availableBudget); + } + // assert tasks are all unblocked because their estimated merge size is now under the available budget + assertBusy(() -> { + assertFalse(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0)); + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(availableBudget)); + }); + // assert all merge tasks are either run or aborted + assertBusy(() -> { + for (ThreadPoolMergeScheduler.MergeTask mergeTask : tasksRunList) { + verify(mergeTask, times(1)).run(); + verify(mergeTask, times(0)).abort(); + } + for (ThreadPoolMergeScheduler.MergeTask mergeTask : tasksAbortList) { + verify(mergeTask, times(0)).run(); + verify(mergeTask, times(1)).abort(); + } + }); + } + if (setThreadPoolMergeSchedulerSetting) { + assertWarnings( + "[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch " + + "and will be removed in a future release. See the breaking changes documentation for the next major version." + ); + } + } + public void testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable() throws Exception { aFileStore.totalSpace = randomLongBetween(300L, 1_000L); bFileStore.totalSpace = randomLongBetween(300L, 1_000L); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 9be119a4cbf35..a9c8a9043cccd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1754,7 +1754,7 @@ public void indexRandom(boolean forceRefresh, List builders * @param builders the documents to index. */ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List builders) { - indexRandom(forceRefresh, dummyDocuments, true, builders); + indexRandom(forceRefresh, dummyDocuments, true, true, builders); } /** @@ -1764,13 +1764,37 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List builders) { + indexRandom(forceRefresh, dummyDocuments, maybeFlush, true, builders); + } + + /** + * Indexes the given {@link IndexRequestBuilder} instances randomly. It shuffles the given builders and either + * indexes them in a blocking or async fashion. This is very useful to catch problems that relate to internal document + * ids or index segment creations. Some features might have bug when a given document is the first or the last in a + * segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index + * layout. + * + * @param forceRefresh if {@code true} all involved indices are refreshed once the documents are indexed. + * @param dummyDocuments if {@code true} some empty dummy documents may be randomly inserted into the document list and deleted once + * all documents are indexed. This is useful to produce deleted documents on the server side. + * @param maybeFlush if {@code true} this method may randomly execute full flushes after index operations. + * @param maybeForceMerge if {@code true} this method may randomly execute force merges after index operations. + * @param builders the documents to index. + */ + public void indexRandom( + boolean forceRefresh, + boolean dummyDocuments, + boolean maybeFlush, + boolean maybeForceMerge, + List builders + ) { Random random = random(); Set indices = new HashSet<>(); builders = new ArrayList<>(builders); @@ -1803,13 +1827,13 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma new LatchedActionListener(ActionListener.noop(), newLatch(inFlightAsyncOperations)) .delegateResponse((l, e) -> fail(e)) ); - postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush); + postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush, maybeForceMerge); } } else { logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, false); for (IndexRequestBuilder indexRequestBuilder : builders) { indexRequestBuilder.get(); - postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush); + postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush, maybeForceMerge); } } } else { @@ -1888,7 +1912,12 @@ private static CountDownLatch newLatch(List latches) { /** * Maybe refresh, force merge, or flush then always make sure there aren't too many in flight async operations. */ - private void postIndexAsyncActions(String[] indices, List inFlightAsyncOperations, boolean maybeFlush) { + private void postIndexAsyncActions( + String[] indices, + List inFlightAsyncOperations, + boolean maybeFlush, + boolean maybeForceMerge + ) { if (rarely()) { if (rarely()) { indicesAdmin().prepareRefresh(indices) @@ -1898,7 +1927,7 @@ private void postIndexAsyncActions(String[] indices, List inFlig indicesAdmin().prepareFlush(indices) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .execute(new LatchedActionListener<>(ActionListener.noop(), newLatch(inFlightAsyncOperations))); - } else if (rarely()) { + } else if (maybeForceMerge && rarely()) { indicesAdmin().prepareForceMerge(indices) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setMaxNumSegments(between(1, 10))