Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,36 @@

package org.elasticsearch.index.engine;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.BeforeClass;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand All @@ -40,7 +50,14 @@ public static void setAvailableDiskSpaceBufferLimit() {
// this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
// because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
// operations at this high abstraction level (merging is triggered more or less automatically in the background)
MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L);
MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(10_000_000L, 20_000_000L);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> nodePluginsList = new ArrayList<>(super.nodePlugins());
nodePluginsList.add(TestTelemetryPlugin.class);
return nodePluginsList;
}

@Override
Expand Down Expand Up @@ -155,8 +172,111 @@ public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
});
}

public void testForceMergeIsBlockedThenUnblocked() throws Exception {
String node = internalCluster().startNode();
ensureStableCluster(1);
setTotalSpace(node, Long.MAX_VALUE);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node)
.getThreadPoolMergeExecutorService();
TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node);
// create some index
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
);
// get current disk space usage (for all indices on the node)
IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
// restrict the total disk space such that the next merge does not have sufficient disk space
long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
setTotalSpace(node, insufficientTotalDiskSpace);
// node stats' FS stats should report that there is insufficient disk space available
assertBusy(() -> {
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
});
int indexingRounds = randomIntBetween(2, 5);
while (indexingRounds-- > 0) {
indexRandom(
true,
true,
true,
false,
IntStream.range(1, randomIntBetween(2, 5))
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
.toList()
);
}
// the max segments argument makes it a blocking call
ActionFuture<BroadcastResponse> forceMergeFuture = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).execute();
assertBusy(() -> {
// merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1));
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
// telemetry says that there are indeed some segments enqueued to be merged
testTelemetryPlugin.collect();
assertThat(
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(),
greaterThan(0L)
);
// but still no merges are currently running
assertThat(
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(),
equalTo(0L)
);
// indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get();
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
assertThat(currentMergeCount, equalTo(0L));
});
// the force merge call is still blocked
assertFalse(forceMergeFuture.isCancelled());
assertFalse(forceMergeFuture.isDone());
// merge executor still confirms merging is blocked due to insufficient disk space
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
// make disk space available in order to unblock the merge
if (randomBoolean()) {
setTotalSpace(node, Long.MAX_VALUE);
} else {
updateClusterSettings(
Settings.builder().put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), "0b")
);
}
// wait for the merge call to return
safeGet(forceMergeFuture);
IndicesStatsResponse indicesStatsResponse = indicesAdmin().prepareStats(indexName).setMerge(true).get();
testTelemetryPlugin.collect();
// assert index stats and telemetry report no merging in progress (after force merge returned)
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
assertThat(currentMergeCount, equalTo(0L));
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), equalTo(0L));
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), equalTo(0L));
// but some merging took place (there might have been other merges automatically triggered before the force merge call)
long totalMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getTotal();
assertThat(totalMergeCount, greaterThan(0L));
assertThat(testTelemetryPlugin.getLongCounterMeasurement(MergeMetrics.MERGE_DOCS_TOTAL).getLast().getLong(), greaterThan(0L));
// assert there's a single segment after the force merge
List<ShardSegments> shardSegments = getShardSegments(indexName);
assertThat(shardSegments.size(), equalTo(1));
assertThat(shardSegments.get(0).getSegments().size(), equalTo(1));
assertAcked(indicesAdmin().prepareDelete(indexName).get());
}

public void setTotalSpace(String dataNodeName, long totalSpace) {
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
refreshClusterInfo();
}

private TestTelemetryPlugin getTelemetryPlugin(String dataNodeName) {
var plugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();
plugin.resetMeter();
return plugin;
}
}