|
| 1 | +/* |
| 2 | + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one |
| 3 | + * or more contributor license agreements. Licensed under the "Elastic License |
| 4 | + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side |
| 5 | + * Public License v 1"; you may not use this file except in compliance with, at |
| 6 | + * your election, the "Elastic License 2.0", the "GNU Affero General Public |
| 7 | + * License v3.0 only", or the "Server Side Public License, v 1". |
| 8 | + */ |
| 9 | + |
| 10 | +package org.elasticsearch.index.engine; |
| 11 | + |
| 12 | +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; |
| 13 | +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; |
| 14 | +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; |
| 15 | +import org.elasticsearch.cluster.DiskUsageIntegTestCase; |
| 16 | +import org.elasticsearch.cluster.metadata.IndexMetadata; |
| 17 | +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; |
| 18 | +import org.elasticsearch.common.settings.Settings; |
| 19 | +import org.elasticsearch.common.util.concurrent.EsExecutors; |
| 20 | +import org.elasticsearch.index.IndexNotFoundException; |
| 21 | +import org.elasticsearch.indices.IndicesService; |
| 22 | +import org.elasticsearch.test.ESIntegTestCase; |
| 23 | +import org.elasticsearch.threadpool.ThreadPool; |
| 24 | +import org.junit.BeforeClass; |
| 25 | + |
| 26 | +import java.util.Locale; |
| 27 | +import java.util.stream.IntStream; |
| 28 | + |
| 29 | +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; |
| 30 | +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; |
| 31 | +import static org.hamcrest.Matchers.equalTo; |
| 32 | +import static org.hamcrest.Matchers.lessThan; |
| 33 | + |
| 34 | +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) |
| 35 | +public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase { |
| 36 | + protected static long MERGE_DISK_HIGH_WATERMARK_BYTES; |
| 37 | + |
| 38 | + @BeforeClass |
| 39 | + public static void setAvailableDiskSpaceBufferLimit() { |
| 40 | + // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges, |
| 41 | + // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging |
| 42 | + // operations at this high abstraction level (merging is triggered more or less automatically in the background) |
| 43 | + MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L); |
| 44 | + } |
| 45 | + |
| 46 | + @Override |
| 47 | + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { |
| 48 | + return Settings.builder() |
| 49 | + .put(super.nodeSettings(nodeOrdinal, otherSettings)) |
| 50 | + // only the threadpool-based merge scheduler has the capability to block merges when disk space is insufficient |
| 51 | + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) |
| 52 | + // the very short disk space polling interval ensures timely blocking of merges |
| 53 | + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "10ms") |
| 54 | + // merges pile up more easily when there's only a few threads executing them |
| 55 | + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), randomIntBetween(1, 2)) |
| 56 | + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), MERGE_DISK_HIGH_WATERMARK_BYTES + "b") |
| 57 | + // let's not worry about allocation watermarks (e.g. read-only shards) in this test suite |
| 58 | + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "0b") |
| 59 | + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "0b") |
| 60 | + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b") |
| 61 | + .build(); |
| 62 | + } |
| 63 | + |
| 64 | + public void testShardCloseWhenDiskSpaceInsufficient() throws Exception { |
| 65 | + String node = internalCluster().startNode(); |
| 66 | + setTotalSpace(node, Long.MAX_VALUE); |
| 67 | + var indicesService = internalCluster().getInstance(IndicesService.class, node); |
| 68 | + ensureStableCluster(1); |
| 69 | + // create index |
| 70 | + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); |
| 71 | + createIndex( |
| 72 | + indexName, |
| 73 | + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build() |
| 74 | + ); |
| 75 | + // do some indexing |
| 76 | + indexRandom( |
| 77 | + false, |
| 78 | + false, |
| 79 | + false, |
| 80 | + false, |
| 81 | + IntStream.range(1, randomIntBetween(2, 10)) |
| 82 | + .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50))) |
| 83 | + .toList() |
| 84 | + ); |
| 85 | + // get current disk space usage |
| 86 | + IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); |
| 87 | + long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes(); |
| 88 | + // restrict the total disk space such that the next merge does not have sufficient disk space |
| 89 | + long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L); |
| 90 | + setTotalSpace(node, insufficientTotalDiskSpace); |
| 91 | + // node stats' FS stats should report that there is insufficient disk space available |
| 92 | + assertBusy(() -> { |
| 93 | + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get(); |
| 94 | + assertThat(nodesStatsResponse.getNodes().size(), equalTo(1)); |
| 95 | + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); |
| 96 | + assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace)); |
| 97 | + assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES)); |
| 98 | + }); |
| 99 | + while (true) { |
| 100 | + // maybe trigger a merge (this still depends on the merge policy, i.e. it is not 100% guaranteed) |
| 101 | + assertNoFailures(indicesAdmin().prepareForceMerge(indexName).get()); |
| 102 | + // keep indexing and ask for merging until node stats' threadpool stats reports enqueued merges, |
| 103 | + // and the merge executor says they're blocked due to insufficient disk space if (nodesStatsResponse.getNodes() |
| 104 | + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get(); |
| 105 | + if (nodesStatsResponse.getNodes() |
| 106 | + .getFirst() |
| 107 | + .getThreadPool() |
| 108 | + .stats() |
| 109 | + .stream() |
| 110 | + .filter(s -> ThreadPool.Names.MERGE.equals(s.name())) |
| 111 | + .findAny() |
| 112 | + .get() |
| 113 | + .queue() > 0 |
| 114 | + && indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()) { |
| 115 | + break; |
| 116 | + } |
| 117 | + // more indexing |
| 118 | + indexRandom( |
| 119 | + false, |
| 120 | + false, |
| 121 | + false, |
| 122 | + false, |
| 123 | + IntStream.range(1, randomIntBetween(2, 10)) |
| 124 | + .mapToObj(i -> prepareIndex(indexName).setSource("another_field", randomAlphaOfLength(50))) |
| 125 | + .toList() |
| 126 | + ); |
| 127 | + } |
| 128 | + // now delete the index in this state, i.e. with merges enqueued and blocked |
| 129 | + assertAcked(indicesAdmin().prepareDelete(indexName).get()); |
| 130 | + // index should now be gone |
| 131 | + assertBusy(() -> { |
| 132 | + expectThrows( |
| 133 | + IndexNotFoundException.class, |
| 134 | + () -> indicesAdmin().prepareGetIndex(TEST_REQUEST_TIMEOUT).setIndices(indexName).get() |
| 135 | + ); |
| 136 | + }); |
| 137 | + assertBusy(() -> { |
| 138 | + // merge thread pool should be done with the enqueue merge tasks |
| 139 | + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get(); |
| 140 | + assertThat( |
| 141 | + nodesStatsResponse.getNodes() |
| 142 | + .getFirst() |
| 143 | + .getThreadPool() |
| 144 | + .stats() |
| 145 | + .stream() |
| 146 | + .filter(s -> ThreadPool.Names.MERGE.equals(s.name())) |
| 147 | + .findAny() |
| 148 | + .get() |
| 149 | + .queue(), |
| 150 | + equalTo(0) |
| 151 | + ); |
| 152 | + // and the merge executor should also report that merging is done now |
| 153 | + assertFalse(indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()); |
| 154 | + assertTrue(indicesService.getThreadPoolMergeExecutorService().allDone()); |
| 155 | + }); |
| 156 | + } |
| 157 | + |
| 158 | + public void setTotalSpace(String dataNodeName, long totalSpace) { |
| 159 | + getTestFileStore(dataNodeName).setTotalSpace(totalSpace); |
| 160 | + refreshClusterInfo(); |
| 161 | + } |
| 162 | +} |
0 commit comments