-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Threadpool merge executor does not block aborted merges #129613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
elasticsearchmachine
merged 21 commits into
elastic:main
from
albertzaharovits:consider-aborting-merges-while-enqueued
Jun 19, 2025
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
838fae3
update budget of enqueued elements
albertzaharovits e222ab9
Test scaffold
albertzaharovits 4baa30c
Unmute test
albertzaharovits c7530d2
Restore setting default
albertzaharovits 1363bbb
Merge branch 'main' into consider-aborting-merges-while-enqueued
albertzaharovits 8d7b18d
[CI] Auto commit changes from spotless
51b5d62
Update docs/changelog/129613.yaml
albertzaharovits 73abc28
ES|QL: No plain strings in Literal (#129399)
luigidellaquila ea996c1
More test scaffolding
albertzaharovits 31fa7ae
Merge branch 'main' into consider-aborting-merges-while-enqueued
albertzaharovits d870bfb
boolean test for queue head over the available budget
albertzaharovits 96b6cb9
Even more test scaffolding
albertzaharovits 9297945
Merge branch 'main' into consider-aborting-merges-while-enqueued
albertzaharovits 8b0507c
[CI] Auto commit changes from spotless
b6aed72
IT done
albertzaharovits 1fc3b64
Fix testShardCloseWhenDiskSpaceInsufficient
albertzaharovits 12119c5
[CI] Auto commit changes from spotless
328ac82
do not force merge when indexing
albertzaharovits f47a419
testEnqueuedMergeTasksAreUnblockedWhenEstimatedMergeSizeChanges
albertzaharovits f2afd19
Merge branch 'main' into consider-aborting-merges-while-enqueued
albertzaharovits 2527c37
Delete docs/changelog/129613.yaml
albertzaharovits File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
162 changes: 162 additions & 0 deletions
162
.../src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,162 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.index.engine; | ||
|
|
||
| import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; | ||
| import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; | ||
| import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; | ||
| import org.elasticsearch.cluster.DiskUsageIntegTestCase; | ||
| import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
| import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
| import org.elasticsearch.index.IndexNotFoundException; | ||
| import org.elasticsearch.indices.IndicesService; | ||
| import org.elasticsearch.test.ESIntegTestCase; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.junit.BeforeClass; | ||
|
|
||
| import java.util.Locale; | ||
| import java.util.stream.IntStream; | ||
|
|
||
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.lessThan; | ||
|
|
||
| @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
| public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase { | ||
| protected static long MERGE_DISK_HIGH_WATERMARK_BYTES; | ||
|
|
||
| @BeforeClass | ||
| public static void setAvailableDiskSpaceBufferLimit() { | ||
| // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges, | ||
| // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging | ||
| // operations at this high abstraction level (merging is triggered more or less automatically in the background) | ||
| MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L); | ||
| } | ||
|
|
||
| @Override | ||
| protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { | ||
| return Settings.builder() | ||
| .put(super.nodeSettings(nodeOrdinal, otherSettings)) | ||
| // only the threadpool-based merge scheduler has the capability to block merges when disk space is insufficient | ||
| .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) | ||
| // the very short disk space polling interval ensures timely blocking of merges | ||
| .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "10ms") | ||
| // merges pile up more easily when there's only a few threads executing them | ||
| .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), randomIntBetween(1, 2)) | ||
| .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), MERGE_DISK_HIGH_WATERMARK_BYTES + "b") | ||
| // let's not worry about allocation watermarks (e.g. read-only shards) in this test suite | ||
| .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "0b") | ||
| .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "0b") | ||
| .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b") | ||
| .build(); | ||
| } | ||
|
|
||
| public void testShardCloseWhenDiskSpaceInsufficient() throws Exception { | ||
| String node = internalCluster().startNode(); | ||
| setTotalSpace(node, Long.MAX_VALUE); | ||
| var indicesService = internalCluster().getInstance(IndicesService.class, node); | ||
| ensureStableCluster(1); | ||
| // create index | ||
| final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); | ||
| createIndex( | ||
| indexName, | ||
| Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build() | ||
| ); | ||
| // do some indexing | ||
| indexRandom( | ||
| false, | ||
| false, | ||
| false, | ||
| false, | ||
| IntStream.range(1, randomIntBetween(2, 10)) | ||
| .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50))) | ||
| .toList() | ||
| ); | ||
| // get current disk space usage | ||
| IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); | ||
| long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes(); | ||
| // restrict the total disk space such that the next merge does not have sufficient disk space | ||
| long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L); | ||
| setTotalSpace(node, insufficientTotalDiskSpace); | ||
| // node stats' FS stats should report that there is insufficient disk space available | ||
| assertBusy(() -> { | ||
| NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get(); | ||
| assertThat(nodesStatsResponse.getNodes().size(), equalTo(1)); | ||
| NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); | ||
| assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace)); | ||
| assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES)); | ||
| }); | ||
| while (true) { | ||
| // maybe trigger a merge (this still depends on the merge policy, i.e. it is not 100% guaranteed) | ||
| assertNoFailures(indicesAdmin().prepareForceMerge(indexName).get()); | ||
| // keep indexing and ask for merging until node stats' threadpool stats reports enqueued merges, | ||
| // and the merge executor says they're blocked due to insufficient disk space if (nodesStatsResponse.getNodes() | ||
| NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get(); | ||
| if (nodesStatsResponse.getNodes() | ||
| .getFirst() | ||
| .getThreadPool() | ||
| .stats() | ||
| .stream() | ||
| .filter(s -> ThreadPool.Names.MERGE.equals(s.name())) | ||
| .findAny() | ||
| .get() | ||
| .queue() > 0 | ||
| && indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()) { | ||
| break; | ||
| } | ||
| // more indexing | ||
| indexRandom( | ||
| false, | ||
| false, | ||
| false, | ||
| false, | ||
| IntStream.range(1, randomIntBetween(2, 10)) | ||
| .mapToObj(i -> prepareIndex(indexName).setSource("another_field", randomAlphaOfLength(50))) | ||
| .toList() | ||
| ); | ||
| } | ||
| // now delete the index in this state, i.e. with merges enqueued and blocked | ||
| assertAcked(indicesAdmin().prepareDelete(indexName).get()); | ||
| // index should now be gone | ||
| assertBusy(() -> { | ||
| expectThrows( | ||
| IndexNotFoundException.class, | ||
| () -> indicesAdmin().prepareGetIndex(TEST_REQUEST_TIMEOUT).setIndices(indexName).get() | ||
| ); | ||
| }); | ||
| assertBusy(() -> { | ||
| // merge thread pool should be done with the enqueue merge tasks | ||
| NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get(); | ||
| assertThat( | ||
| nodesStatsResponse.getNodes() | ||
| .getFirst() | ||
| .getThreadPool() | ||
| .stats() | ||
| .stream() | ||
| .filter(s -> ThreadPool.Names.MERGE.equals(s.name())) | ||
| .findAny() | ||
| .get() | ||
| .queue(), | ||
| equalTo(0) | ||
| ); | ||
| // and the merge executor should also report that merging is done now | ||
| assertFalse(indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()); | ||
| assertTrue(indicesService.getThreadPoolMergeExecutorService().allDone()); | ||
| }); | ||
| } | ||
|
|
||
| public void setTotalSpace(String dataNodeName, long totalSpace) { | ||
| getTestFileStore(dataNodeName).setTotalSpace(totalSpace); | ||
| refreshClusterInfo(); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change will also adjust the budget of running merges that have been aborted to
0. That's a bit optimistic, but I find the alternative implementation convoluted, and it's probably counter-intuitive to estimate0for to-be-run merges but not for already-running ones.