Skip to content

Commit 3452dce

Browse files
force merge future + avoid assert busy for calls after force merge
1 parent 1c01153 commit 3452dce

File tree

1 file changed

+19
-31
lines changed

1 file changed

+19
-31
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99

1010
package org.elasticsearch.index.engine;
1111

12+
import org.elasticsearch.action.ActionFuture;
1213
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
1314
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
1415
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
1516
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
17+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1618
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
1719
import org.elasticsearch.cluster.metadata.IndexMetadata;
1820
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
@@ -209,12 +211,8 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception {
209211
.toList()
210212
);
211213
}
212-
// start force merging (which is blocking) on a separate thread
213-
Thread forceMergeThread = new Thread(
214-
// the max segments argument makes it a blocking call
215-
() -> assertNoFailures(indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get())
216-
);
217-
forceMergeThread.start();
214+
// the max segments argument makes it a blocking call
215+
ActionFuture<BroadcastResponse> forceMergeFuture = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).execute();
218216
assertBusy(() -> {
219217
// merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
220218
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1));
@@ -236,7 +234,8 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception {
236234
assertThat(currentMergeCount, equalTo(0L));
237235
});
238236
// the force merge call is still blocked
239-
assertTrue(forceMergeThread.isAlive());
237+
assertFalse(forceMergeFuture.isCancelled());
238+
assertFalse(forceMergeFuture.isDone());
240239
// merge executor still confirms merging is blocked due to insufficient disk space
241240
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
242241
// make disk space available in order to unblock the merge
@@ -247,35 +246,24 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception {
247246
Settings.builder().put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), "0b")
248247
);
249248
}
250-
// assert that all the merges are now done and that the force-merge call returns
251-
assertBusy(() -> {
252-
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get();
253-
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
254-
// NO merging is in progress
255-
assertThat(currentMergeCount, equalTo(0L));
256-
long totalMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getTotal();
257-
assertThat(totalMergeCount, greaterThan(0L));
258-
// force merge call returned
259-
assertFalse(forceMergeThread.isAlive());
260-
// telemetry also says that some merging took place
261-
testTelemetryPlugin.collect();
262-
assertThat(testTelemetryPlugin.getLongCounterMeasurement(MergeMetrics.MERGE_DOCS_TOTAL).getLast().getLong(), greaterThan(0L));
263-
// and no further merging
264-
assertThat(
265-
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(),
266-
equalTo(0L)
267-
);
268-
assertThat(
269-
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(),
270-
equalTo(0L)
271-
);
272-
});
249+
// wait for the merge call to return
250+
safeGet(forceMergeFuture);
251+
IndicesStatsResponse indicesStatsResponse = indicesAdmin().prepareStats(indexName).setMerge(true).get();
252+
testTelemetryPlugin.collect();
253+
// assert index stats and telemetry report no merging in progress (after force merge returned)
254+
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
255+
assertThat(currentMergeCount, equalTo(0L));
256+
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), equalTo(0L));
257+
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), equalTo(0L));
258+
// but some merging took place (there might have been other merges automatically triggered before the force merge call)
259+
long totalMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getTotal();
260+
assertThat(totalMergeCount, greaterThan(0L));
261+
assertThat(testTelemetryPlugin.getLongCounterMeasurement(MergeMetrics.MERGE_DOCS_TOTAL).getLast().getLong(), greaterThan(0L));
273262
// assert there's a single segment after the force merge
274263
List<ShardSegments> shardSegments = getShardSegments(indexName);
275264
assertThat(shardSegments.size(), equalTo(1));
276265
assertThat(shardSegments.get(0).getSegments().size(), equalTo(1));
277266
assertAcked(indicesAdmin().prepareDelete(indexName).get());
278-
forceMergeThread.join();
279267
}
280268

281269
public void setTotalSpace(String dataNodeName, long totalSpace) {

0 commit comments

Comments
 (0)