99
1010package org .elasticsearch .index .engine ;
1111
12+ import org .elasticsearch .action .ActionFuture ;
1213import org .elasticsearch .action .admin .cluster .node .stats .NodeStats ;
1314import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsResponse ;
15+ import org .elasticsearch .action .admin .indices .segments .ShardSegments ;
1416import org .elasticsearch .action .admin .indices .stats .IndicesStatsResponse ;
17+ import org .elasticsearch .action .support .broadcast .BroadcastResponse ;
1518import org .elasticsearch .cluster .DiskUsageIntegTestCase ;
1619import org .elasticsearch .cluster .metadata .IndexMetadata ;
1720import org .elasticsearch .cluster .routing .allocation .DiskThresholdSettings ;
1821import org .elasticsearch .common .settings .Settings ;
1922import org .elasticsearch .common .util .concurrent .EsExecutors ;
2023import org .elasticsearch .index .IndexNotFoundException ;
2124import org .elasticsearch .indices .IndicesService ;
25+ import org .elasticsearch .plugins .Plugin ;
26+ import org .elasticsearch .plugins .PluginsService ;
27+ import org .elasticsearch .telemetry .TestTelemetryPlugin ;
2228import org .elasticsearch .test .ESIntegTestCase ;
2329import org .elasticsearch .threadpool .ThreadPool ;
2430import org .junit .BeforeClass ;
2531
32+ import java .util .ArrayList ;
33+ import java .util .Collection ;
34+ import java .util .List ;
2635import java .util .Locale ;
2736import java .util .stream .IntStream ;
2837
2938import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
3039import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertNoFailures ;
3140import static org .hamcrest .Matchers .equalTo ;
41+ import static org .hamcrest .Matchers .greaterThan ;
3242import static org .hamcrest .Matchers .lessThan ;
3343
3444@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
@@ -40,7 +50,14 @@ public static void setAvailableDiskSpaceBufferLimit() {
4050 // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
4151 // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
4252 // operations at this high abstraction level (merging is triggered more or less automatically in the background)
43- MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween (1_000_000L , 2_000_000L );
53+ MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween (10_000_000L , 20_000_000L );
54+ }
55+
56+ @ Override
57+ protected Collection <Class <? extends Plugin >> nodePlugins () {
58+ List <Class <? extends Plugin >> nodePluginsList = new ArrayList <>(super .nodePlugins ());
59+ nodePluginsList .add (TestTelemetryPlugin .class );
60+ return nodePluginsList ;
4461 }
4562
4663 @ Override
@@ -155,8 +172,111 @@ public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
155172 });
156173 }
157174
175+ public void testForceMergeIsBlockedThenUnblocked () throws Exception {
176+ String node = internalCluster ().startNode ();
177+ ensureStableCluster (1 );
178+ setTotalSpace (node , Long .MAX_VALUE );
179+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster ().getInstance (IndicesService .class , node )
180+ .getThreadPoolMergeExecutorService ();
181+ TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin (node );
182+ // create some index
183+ final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
184+ createIndex (
185+ indexName ,
186+ Settings .builder ().put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 ).put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 ).build ()
187+ );
188+ // get current disk space usage (for all indices on the node)
189+ IndicesStatsResponse stats = indicesAdmin ().prepareStats ().clear ().setStore (true ).get ();
190+ long usedDiskSpaceAfterIndexing = stats .getTotal ().getStore ().sizeInBytes ();
191+ // restrict the total disk space such that the next merge does not have sufficient disk space
192+ long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween (1L , 10L );
193+ setTotalSpace (node , insufficientTotalDiskSpace );
194+ // node stats' FS stats should report that there is insufficient disk space available
195+ assertBusy (() -> {
196+ NodesStatsResponse nodesStatsResponse = client ().admin ().cluster ().prepareNodesStats ().setFs (true ).get ();
197+ assertThat (nodesStatsResponse .getNodes ().size (), equalTo (1 ));
198+ NodeStats nodeStats = nodesStatsResponse .getNodes ().get (0 );
199+ assertThat (nodeStats .getFs ().getTotal ().getTotal ().getBytes (), equalTo (insufficientTotalDiskSpace ));
200+ assertThat (nodeStats .getFs ().getTotal ().getAvailable ().getBytes (), lessThan (MERGE_DISK_HIGH_WATERMARK_BYTES ));
201+ });
202+ int indexingRounds = randomIntBetween (2 , 5 );
203+ while (indexingRounds -- > 0 ) {
204+ indexRandom (
205+ true ,
206+ true ,
207+ true ,
208+ false ,
209+ IntStream .range (1 , randomIntBetween (2 , 5 ))
210+ .mapToObj (i -> prepareIndex (indexName ).setSource ("field" , randomAlphaOfLength (50 )))
211+ .toList ()
212+ );
213+ }
214+ // the max segments argument makes it a blocking call
215+ ActionFuture <BroadcastResponse > forceMergeFuture = indicesAdmin ().prepareForceMerge (indexName ).setMaxNumSegments (1 ).execute ();
216+ assertBusy (() -> {
217+ // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
218+ assertThat (threadPoolMergeExecutorService .getMergeTasksQueueLength (), equalTo (1 ));
219+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
220+ // telemetry says that there are indeed some segments enqueued to be merged
221+ testTelemetryPlugin .collect ();
222+ assertThat (
223+ testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_QUEUED_USAGE ).getLast ().getLong (),
224+ greaterThan (0L )
225+ );
226+ // but still no merges are currently running
227+ assertThat (
228+ testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_RUNNING_USAGE ).getLast ().getLong (),
229+ equalTo (0L )
230+ );
231+ // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
232+ IndicesStatsResponse indicesStatsResponse = client ().admin ().indices ().prepareStats (indexName ).setMerge (true ).get ();
233+ long currentMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getCurrent ();
234+ assertThat (currentMergeCount , equalTo (0L ));
235+ });
236+ // the force merge call is still blocked
237+ assertFalse (forceMergeFuture .isCancelled ());
238+ assertFalse (forceMergeFuture .isDone ());
239+ // merge executor still confirms merging is blocked due to insufficient disk space
240+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
241+ // make disk space available in order to unblock the merge
242+ if (randomBoolean ()) {
243+ setTotalSpace (node , Long .MAX_VALUE );
244+ } else {
245+ updateClusterSettings (
246+ Settings .builder ().put (ThreadPoolMergeExecutorService .INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING .getKey (), "0b" )
247+ );
248+ }
249+ // wait for the merge call to return
250+ safeGet (forceMergeFuture );
251+ IndicesStatsResponse indicesStatsResponse = indicesAdmin ().prepareStats (indexName ).setMerge (true ).get ();
252+ testTelemetryPlugin .collect ();
253+ // assert index stats and telemetry report no merging in progress (after force merge returned)
254+ long currentMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getCurrent ();
255+ assertThat (currentMergeCount , equalTo (0L ));
256+ assertThat (testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_QUEUED_USAGE ).getLast ().getLong (), equalTo (0L ));
257+ assertThat (testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_RUNNING_USAGE ).getLast ().getLong (), equalTo (0L ));
258+ // but some merging took place (there might have been other merges automatically triggered before the force merge call)
259+ long totalMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getTotal ();
260+ assertThat (totalMergeCount , greaterThan (0L ));
261+ assertThat (testTelemetryPlugin .getLongCounterMeasurement (MergeMetrics .MERGE_DOCS_TOTAL ).getLast ().getLong (), greaterThan (0L ));
262+ // assert there's a single segment after the force merge
263+ List <ShardSegments > shardSegments = getShardSegments (indexName );
264+ assertThat (shardSegments .size (), equalTo (1 ));
265+ assertThat (shardSegments .get (0 ).getSegments ().size (), equalTo (1 ));
266+ assertAcked (indicesAdmin ().prepareDelete (indexName ).get ());
267+ }
268+
158269 public void setTotalSpace (String dataNodeName , long totalSpace ) {
159270 getTestFileStore (dataNodeName ).setTotalSpace (totalSpace );
160271 refreshClusterInfo ();
161272 }
273+
274+ private TestTelemetryPlugin getTelemetryPlugin (String dataNodeName ) {
275+ var plugin = internalCluster ().getInstance (PluginsService .class , dataNodeName )
276+ .filterPlugins (TestTelemetryPlugin .class )
277+ .findFirst ()
278+ .orElseThrow ();
279+ plugin .resetMeter ();
280+ return plugin ;
281+ }
162282}
0 commit comments