1111
1212import org .elasticsearch .action .admin .cluster .node .stats .NodeStats ;
1313import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsResponse ;
14+ import org .elasticsearch .action .admin .indices .segments .ShardSegments ;
1415import org .elasticsearch .action .admin .indices .stats .IndicesStatsResponse ;
1516import org .elasticsearch .cluster .DiskUsageIntegTestCase ;
1617import org .elasticsearch .cluster .metadata .IndexMetadata ;
1920import org .elasticsearch .common .util .concurrent .EsExecutors ;
2021import org .elasticsearch .index .IndexNotFoundException ;
2122import org .elasticsearch .indices .IndicesService ;
23+ import org .elasticsearch .plugins .Plugin ;
24+ import org .elasticsearch .plugins .PluginsService ;
25+ import org .elasticsearch .telemetry .TestTelemetryPlugin ;
2226import org .elasticsearch .test .ESIntegTestCase ;
2327import org .elasticsearch .threadpool .ThreadPool ;
2428import org .junit .BeforeClass ;
2529
30+ import java .util .ArrayList ;
31+ import java .util .Collection ;
32+ import java .util .List ;
2633import java .util .Locale ;
2734import java .util .stream .IntStream ;
2835
2936import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
3037import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertNoFailures ;
3138import static org .hamcrest .Matchers .equalTo ;
39+ import static org .hamcrest .Matchers .greaterThan ;
3240import static org .hamcrest .Matchers .lessThan ;
3341
3442@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
@@ -40,7 +48,14 @@ public static void setAvailableDiskSpaceBufferLimit() {
4048 // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
4149 // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
4250 // operations at this high abstraction level (merging is triggered more or less automatically in the background)
43- MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween (1_000_000L , 2_000_000L );
51+ MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween (10_000_000L , 20_000_000L );
52+ }
53+
54+ @ Override
55+ protected Collection <Class <? extends Plugin >> nodePlugins () {
56+ List <Class <? extends Plugin >> nodePluginsList = new ArrayList <>(super .nodePlugins ());
57+ nodePluginsList .add (TestTelemetryPlugin .class );
58+ return nodePluginsList ;
4459 }
4560
4661 @ Override
@@ -155,8 +170,128 @@ public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
155170 });
156171 }
157172
173+ public void testForceMergeIsBlockedThenUnblocked () throws Exception {
174+ String node = internalCluster ().startNode ();
175+ ensureStableCluster (1 );
176+ setTotalSpace (node , Long .MAX_VALUE );
177+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster ().getInstance (IndicesService .class , node )
178+ .getThreadPoolMergeExecutorService ();
179+ TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin (node );
180+ // create some index
181+ final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
182+ createIndex (
183+ indexName ,
184+ Settings .builder ()
185+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
186+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
187+ .build ()
188+ );
189+ // get current disk space usage (for all indices on the node)
190+ IndicesStatsResponse stats = indicesAdmin ().prepareStats ().clear ().setStore (true ).get ();
191+ long usedDiskSpaceAfterIndexing = stats .getTotal ().getStore ().sizeInBytes ();
192+ // restrict the total disk space such that the next merge does not have sufficient disk space
193+ long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween (1L , 10L );
194+ setTotalSpace (node , insufficientTotalDiskSpace );
195+ // node stats' FS stats should report that there is insufficient disk space available
196+ assertBusy (() -> {
197+ NodesStatsResponse nodesStatsResponse = client ().admin ().cluster ().prepareNodesStats ().setFs (true ).get ();
198+ assertThat (nodesStatsResponse .getNodes ().size (), equalTo (1 ));
199+ NodeStats nodeStats = nodesStatsResponse .getNodes ().get (0 );
200+ assertThat (nodeStats .getFs ().getTotal ().getTotal ().getBytes (), equalTo (insufficientTotalDiskSpace ));
201+ assertThat (nodeStats .getFs ().getTotal ().getAvailable ().getBytes (), lessThan (MERGE_DISK_HIGH_WATERMARK_BYTES ));
202+ });
203+ int indexingRounds = randomIntBetween (2 , 5 );
204+ while (indexingRounds -- > 0 ) {
205+ indexRandom (
206+ true ,
207+ true ,
208+ true ,
209+ false ,
210+ IntStream .range (1 , randomIntBetween (2 , 5 ))
211+ .mapToObj (i -> prepareIndex (indexName ).setSource ("field" , randomAlphaOfLength (50 )))
212+ .toList ()
213+ );
214+ }
215+ // start force merging (which is blocking) on a separate thread
216+ Thread forceMergeThread = new Thread (
217+ // the max segments argument makes it a blocking call
218+ () -> assertNoFailures (indicesAdmin ().prepareForceMerge (indexName ).setMaxNumSegments (1 ).get ())
219+ );
220+ forceMergeThread .start ();
221+ assertBusy (() -> {
222+ // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
223+ assertThat (threadPoolMergeExecutorService .getMergeTasksQueueLength (), equalTo (1 ));
224+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
225+ // telemetry says that there are indeed some segments enqueued to be merged
226+ testTelemetryPlugin .collect ();
227+ assertThat (
228+ testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_QUEUED_USAGE ).getLast ().getLong (),
229+ greaterThan (0L )
230+ );
231+ // but still no merges are currently running
232+ assertThat (
233+ testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_RUNNING_USAGE ).getLast ().getLong (),
234+ equalTo (0L )
235+ );
236+ // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
237+ IndicesStatsResponse indicesStatsResponse = client ().admin ().indices ().prepareStats (indexName ).setMerge (true ).get ();
238+ long currentMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getCurrent ();
239+ assertThat (currentMergeCount , equalTo (0L ));
240+ });
241+ // the force merge call is still blocked
242+ assertTrue (forceMergeThread .isAlive ());
243+ // merge executor still confirms merging is blocked due to insufficient disk space
244+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
245+ // make disk space available in order to unblock the merge
246+ if (randomBoolean ()) {
247+ setTotalSpace (node , Long .MAX_VALUE );
248+ } else {
249+ updateClusterSettings (
250+ Settings .builder ().put (ThreadPoolMergeExecutorService .INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING .getKey (), "0b" )
251+ );
252+ }
253+ // assert that all the merges are now done and that the force-merge call returns
254+ assertBusy (() -> {
255+ IndicesStatsResponse indicesStatsResponse = client ().admin ().indices ().prepareStats (indexName ).setMerge (true ).get ();
256+ long currentMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getCurrent ();
257+ // NO merging is in progress
258+ assertThat (currentMergeCount , equalTo (0L ));
259+ long totalMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getTotal ();
260+ assertThat (totalMergeCount , greaterThan (0L ));
261+ // force merge call returned
262+ assertFalse (forceMergeThread .isAlive ());
263+ // telemetry also says that some merging took place
264+ testTelemetryPlugin .collect ();
265+ assertThat (testTelemetryPlugin .getLongCounterMeasurement (MergeMetrics .MERGE_DOCS_TOTAL ).getLast ().getLong (), greaterThan (0L ));
266+ // and no further merging
267+ assertThat (
268+ testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_QUEUED_USAGE ).getLast ().getLong (),
269+ equalTo (0L )
270+ );
271+ assertThat (
272+ testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_RUNNING_USAGE ).getLast ().getLong (),
273+ equalTo (0L )
274+ );
275+ });
276+ // assert there's a single segment after the force merge
277+ List <ShardSegments > shardSegments = getShardSegments (indexName );
278+ assertThat (shardSegments .size (), equalTo (1 ));
279+ assertThat (shardSegments .get (0 ).getSegments ().size (), equalTo (1 ));
280+ assertAcked (indicesAdmin ().prepareDelete (indexName ).get ());
281+ forceMergeThread .join ();
282+ }
283+
158284 public void setTotalSpace (String dataNodeName , long totalSpace ) {
159285 getTestFileStore (dataNodeName ).setTotalSpace (totalSpace );
160286 refreshClusterInfo ();
161287 }
288+
289+ private TestTelemetryPlugin getTelemetryPlugin (String dataNodeName ) {
290+ var plugin = internalCluster ().getInstance (PluginsService .class , dataNodeName )
291+ .filterPlugins (TestTelemetryPlugin .class )
292+ .findFirst ()
293+ .orElseThrow ();
294+ plugin .resetMeter ();
295+ return plugin ;
296+ }
162297}
0 commit comments