99
1010package org .elasticsearch .index .engine ;
1111
12+ import org .elasticsearch .action .ActionFuture ;
1213import org .elasticsearch .action .admin .cluster .node .stats .NodeStats ;
1314import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsResponse ;
15+ import org .elasticsearch .action .admin .indices .segments .ShardSegments ;
1416import org .elasticsearch .action .admin .indices .stats .IndicesStatsResponse ;
17+ import org .elasticsearch .action .support .broadcast .BroadcastResponse ;
1518import org .elasticsearch .cluster .DiskUsageIntegTestCase ;
1619import org .elasticsearch .cluster .metadata .IndexMetadata ;
1720import org .elasticsearch .cluster .routing .allocation .DiskThresholdSettings ;
2326import org .elasticsearch .threadpool .ThreadPool ;
2427import org .junit .BeforeClass ;
2528
29+ import java .util .List ;
2630import java .util .Locale ;
2731import java .util .stream .IntStream ;
2832
2933import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
3034import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertNoFailures ;
3135import static org .hamcrest .Matchers .equalTo ;
36+ import static org .hamcrest .Matchers .greaterThan ;
3237import static org .hamcrest .Matchers .lessThan ;
3338
3439@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
@@ -40,7 +45,7 @@ public static void setAvailableDiskSpaceBufferLimit() {
4045 // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
4146 // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
4247 // operations at this high abstraction level (merging is triggered more or less automatically in the background)
43- MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween (1_000_000L , 2_000_000L );
48+ MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween (10_000_000L , 20_000_000L );
4449 }
4550
4651 @ Override
@@ -155,6 +160,84 @@ public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
155160 });
156161 }
157162
163+ public void testForceMergeIsBlockedThenUnblocked () throws Exception {
164+ String node = internalCluster ().startNode ();
165+ ensureStableCluster (1 );
166+ setTotalSpace (node , Long .MAX_VALUE );
167+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster ().getInstance (IndicesService .class , node )
168+ .getThreadPoolMergeExecutorService ();
169+ // create some index
170+ final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
171+ createIndex (
172+ indexName ,
173+ Settings .builder ().put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 ).put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 ).build ()
174+ );
175+ // get current disk space usage (for all indices on the node)
176+ IndicesStatsResponse stats = indicesAdmin ().prepareStats ().clear ().setStore (true ).get ();
177+ long usedDiskSpaceAfterIndexing = stats .getTotal ().getStore ().sizeInBytes ();
178+ // restrict the total disk space such that the next merge does not have sufficient disk space
179+ long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween (1L , 10L );
180+ setTotalSpace (node , insufficientTotalDiskSpace );
181+ // node stats' FS stats should report that there is insufficient disk space available
182+ assertBusy (() -> {
183+ NodesStatsResponse nodesStatsResponse = client ().admin ().cluster ().prepareNodesStats ().setFs (true ).get ();
184+ assertThat (nodesStatsResponse .getNodes ().size (), equalTo (1 ));
185+ NodeStats nodeStats = nodesStatsResponse .getNodes ().get (0 );
186+ assertThat (nodeStats .getFs ().getTotal ().getTotal ().getBytes (), equalTo (insufficientTotalDiskSpace ));
187+ assertThat (nodeStats .getFs ().getTotal ().getAvailable ().getBytes (), lessThan (MERGE_DISK_HIGH_WATERMARK_BYTES ));
188+ });
189+ int indexingRounds = randomIntBetween (2 , 5 );
190+ while (indexingRounds -- > 0 ) {
191+ indexRandom (
192+ true ,
193+ true ,
194+ true ,
195+ false ,
196+ IntStream .range (1 , randomIntBetween (2 , 5 ))
197+ .mapToObj (i -> prepareIndex (indexName ).setSource ("field" , randomAlphaOfLength (50 )))
198+ .toList ()
199+ );
200+ }
201+ // the max segments argument makes it a blocking call
202+ ActionFuture <BroadcastResponse > forceMergeFuture = indicesAdmin ().prepareForceMerge (indexName ).setMaxNumSegments (1 ).execute ();
203+ assertBusy (() -> {
204+ // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
205+ assertThat (threadPoolMergeExecutorService .getMergeTasksQueueLength (), equalTo (1 ));
206+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
207+ // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
208+ IndicesStatsResponse indicesStatsResponse = client ().admin ().indices ().prepareStats (indexName ).setMerge (true ).get ();
209+ long currentMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getCurrent ();
210+ assertThat (currentMergeCount , equalTo (0L ));
211+ });
212+ // the force merge call is still blocked
213+ assertFalse (forceMergeFuture .isCancelled ());
214+ assertFalse (forceMergeFuture .isDone ());
215+ // merge executor still confirms merging is blocked due to insufficient disk space
216+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
217+ // make disk space available in order to unblock the merge
218+ if (randomBoolean ()) {
219+ setTotalSpace (node , Long .MAX_VALUE );
220+ } else {
221+ updateClusterSettings (
222+ Settings .builder ().put (ThreadPoolMergeExecutorService .INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING .getKey (), "0b" )
223+ );
224+ }
225+ // wait for the merge call to return
226+ safeGet (forceMergeFuture );
227+ IndicesStatsResponse indicesStatsResponse = indicesAdmin ().prepareStats (indexName ).setMerge (true ).get ();
228+ // assert index stats and telemetry report no merging in progress (after force merge returned)
229+ long currentMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getCurrent ();
230+ assertThat (currentMergeCount , equalTo (0L ));
231+ // but some merging took place (there might have been other merges automatically triggered before the force merge call)
232+ long totalMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getTotal ();
233+ assertThat (totalMergeCount , greaterThan (0L ));
234+ // assert there's a single segment after the force merge
235+ List <ShardSegments > shardSegments = getShardSegments (indexName );
236+ assertThat (shardSegments .size (), equalTo (1 ));
237+ assertThat (shardSegments .get (0 ).getSegments ().size (), equalTo (1 ));
238+ assertAcked (indicesAdmin ().prepareDelete (indexName ).get ());
239+ }
240+
158241 public void setTotalSpace (String dataNodeName , long totalSpace ) {
159242 getTestFileStore (dataNodeName ).setTotalSpace (totalSpace );
160243 refreshClusterInfo ();
0 commit comments