99
1010package org .elasticsearch .index .engine ;
1111
12+ import org .elasticsearch .action .ActionFuture ;
1213import org .elasticsearch .action .admin .cluster .node .stats .NodeStats ;
1314import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsResponse ;
15+ import org .elasticsearch .action .admin .indices .segments .ShardSegments ;
1416import org .elasticsearch .action .admin .indices .stats .IndicesStatsResponse ;
17+ import org .elasticsearch .action .support .broadcast .BroadcastResponse ;
1518import org .elasticsearch .cluster .DiskUsageIntegTestCase ;
1619import org .elasticsearch .cluster .metadata .IndexMetadata ;
1720import org .elasticsearch .cluster .routing .allocation .DiskThresholdSettings ;
2326import org .elasticsearch .threadpool .ThreadPool ;
2427import org .junit .BeforeClass ;
2528
29+ import java .util .List ;
2630import java .util .Locale ;
2731import java .util .stream .IntStream ;
2832
2933import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
3034import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertNoFailures ;
3135import static org .hamcrest .Matchers .equalTo ;
36+ import static org .hamcrest .Matchers .greaterThan ;
3237import static org .hamcrest .Matchers .lessThan ;
3338
3439@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
@@ -40,7 +45,7 @@ public static void setAvailableDiskSpaceBufferLimit() {
4045 // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
4146 // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
4247 // operations at this high abstraction level (merging is triggered more or less automatically in the background)
43- MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween (1_000_000L , 2_000_000L );
48+ MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween (10_000_000L , 20_000_000L );
4449 }
4550
4651 @ Override
@@ -152,6 +157,84 @@ public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
152157 });
153158 }
154159
160+ public void testForceMergeIsBlockedThenUnblocked () throws Exception {
161+ String node = internalCluster ().startNode ();
162+ ensureStableCluster (1 );
163+ setTotalSpace (node , Long .MAX_VALUE );
164+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster ().getInstance (IndicesService .class , node )
165+ .getThreadPoolMergeExecutorService ();
166+ // create some index
167+ final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
168+ createIndex (
169+ indexName ,
170+ Settings .builder ().put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 ).put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 ).build ()
171+ );
172+ // get current disk space usage (for all indices on the node)
173+ IndicesStatsResponse stats = indicesAdmin ().prepareStats ().clear ().setStore (true ).get ();
174+ long usedDiskSpaceAfterIndexing = stats .getTotal ().getStore ().sizeInBytes ();
175+ // restrict the total disk space such that the next merge does not have sufficient disk space
176+ long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween (1L , 10L );
177+ setTotalSpace (node , insufficientTotalDiskSpace );
178+ // node stats' FS stats should report that there is insufficient disk space available
179+ assertBusy (() -> {
180+ NodesStatsResponse nodesStatsResponse = client ().admin ().cluster ().prepareNodesStats ().setFs (true ).get ();
181+ assertThat (nodesStatsResponse .getNodes ().size (), equalTo (1 ));
182+ NodeStats nodeStats = nodesStatsResponse .getNodes ().get (0 );
183+ assertThat (nodeStats .getFs ().getTotal ().getTotal ().getBytes (), equalTo (insufficientTotalDiskSpace ));
184+ assertThat (nodeStats .getFs ().getTotal ().getAvailable ().getBytes (), lessThan (MERGE_DISK_HIGH_WATERMARK_BYTES ));
185+ });
186+ int indexingRounds = randomIntBetween (2 , 5 );
187+ while (indexingRounds -- > 0 ) {
188+ indexRandom (
189+ true ,
190+ true ,
191+ true ,
192+ false ,
193+ IntStream .range (1 , randomIntBetween (2 , 5 ))
194+ .mapToObj (i -> prepareIndex (indexName ).setSource ("field" , randomAlphaOfLength (50 )))
195+ .toList ()
196+ );
197+ }
198+ // the max segments argument makes it a blocking call
199+ ActionFuture <BroadcastResponse > forceMergeFuture = indicesAdmin ().prepareForceMerge (indexName ).setMaxNumSegments (1 ).execute ();
200+ assertBusy (() -> {
201+ // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
202+ assertThat (threadPoolMergeExecutorService .getMergeTasksQueueLength (), equalTo (1 ));
203+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
204+ // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
205+ IndicesStatsResponse indicesStatsResponse = client ().admin ().indices ().prepareStats (indexName ).setMerge (true ).get ();
206+ long currentMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getCurrent ();
207+ assertThat (currentMergeCount , equalTo (0L ));
208+ });
209+ // the force merge call is still blocked
210+ assertFalse (forceMergeFuture .isCancelled ());
211+ assertFalse (forceMergeFuture .isDone ());
212+ // merge executor still confirms merging is blocked due to insufficient disk space
213+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
214+ // make disk space available in order to unblock the merge
215+ if (randomBoolean ()) {
216+ setTotalSpace (node , Long .MAX_VALUE );
217+ } else {
218+ updateClusterSettings (
219+ Settings .builder ().put (ThreadPoolMergeExecutorService .INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING .getKey (), "0b" )
220+ );
221+ }
222+ // wait for the merge call to return
223+ safeGet (forceMergeFuture );
224+ IndicesStatsResponse indicesStatsResponse = indicesAdmin ().prepareStats (indexName ).setMerge (true ).get ();
225+ // assert index stats reports no merging in progress (after force merge returned)
226+ long currentMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getCurrent ();
227+ assertThat (currentMergeCount , equalTo (0L ));
228+ // but some merging took place (there might have been other merges automatically triggered before the force merge call)
229+ long totalMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getTotal ();
230+ assertThat (totalMergeCount , greaterThan (0L ));
231+ // assert there's a single segment after the force merge
232+ List <ShardSegments > shardSegments = getShardSegments (indexName );
233+ assertThat (shardSegments .size (), equalTo (1 ));
234+ assertThat (shardSegments .get (0 ).getSegments ().size (), equalTo (1 ));
235+ assertAcked (indicesAdmin ().prepareDelete (indexName ).get ());
236+ }
237+
155238 public void setTotalSpace (String dataNodeName , long totalSpace ) {
156239 getTestFileStore (dataNodeName ).setTotalSpace (totalSpace );
157240 refreshClusterInfo ();
0 commit comments