99
1010package org .elasticsearch .index .engine ;
1111
12+ import org .elasticsearch .action .ActionFuture ;
1213import org .elasticsearch .action .admin .cluster .node .stats .NodeStats ;
1314import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsResponse ;
15+ import org .elasticsearch .action .admin .indices .segments .ShardSegments ;
1416import org .elasticsearch .action .admin .indices .stats .IndicesStatsResponse ;
17+ import org .elasticsearch .action .support .broadcast .BroadcastResponse ;
1518import org .elasticsearch .cluster .DiskUsageIntegTestCase ;
1619import org .elasticsearch .cluster .metadata .IndexMetadata ;
1720import org .elasticsearch .cluster .routing .allocation .DiskThresholdSettings ;
1821import org .elasticsearch .common .settings .Settings ;
1922import org .elasticsearch .common .util .concurrent .EsExecutors ;
2023import org .elasticsearch .index .IndexNotFoundException ;
2124import org .elasticsearch .indices .IndicesService ;
25+ import org .elasticsearch .plugins .Plugin ;
26+ import org .elasticsearch .plugins .PluginsService ;
27+ import org .elasticsearch .telemetry .TestTelemetryPlugin ;
2228import org .elasticsearch .test .ESIntegTestCase ;
2329import org .elasticsearch .threadpool .ThreadPool ;
2430import org .junit .BeforeClass ;
2531
32+ import java .util .ArrayList ;
33+ import java .util .Collection ;
34+ import java .util .List ;
2635import java .util .Locale ;
2736import java .util .stream .IntStream ;
2837
2938import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
3039import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertNoFailures ;
3140import static org .hamcrest .Matchers .equalTo ;
41+ import static org .hamcrest .Matchers .greaterThan ;
3242import static org .hamcrest .Matchers .lessThan ;
3343
3444@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 )
@@ -40,7 +50,14 @@ public static void setAvailableDiskSpaceBufferLimit() {
4050 // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
4151 // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
4252 // operations at this high abstraction level (merging is triggered more or less automatically in the background)
43- MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween (1_000_000L , 2_000_000L );
53+ MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween (10_000_000L , 20_000_000L );
54+ }
55+
56+ @ Override
57+ protected Collection <Class <? extends Plugin >> nodePlugins () {
58+ List <Class <? extends Plugin >> nodePluginsList = new ArrayList <>(super .nodePlugins ());
59+ nodePluginsList .add (TestTelemetryPlugin .class );
60+ return nodePluginsList ;
4461 }
4562
4663 @ Override
@@ -152,8 +169,111 @@ public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
152169 });
153170 }
154171
172+ public void testForceMergeIsBlockedThenUnblocked () throws Exception {
173+ String node = internalCluster ().startNode ();
174+ ensureStableCluster (1 );
175+ setTotalSpace (node , Long .MAX_VALUE );
176+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster ().getInstance (IndicesService .class , node )
177+ .getThreadPoolMergeExecutorService ();
178+ TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin (node );
179+ // create some index
180+ final String indexName = randomAlphaOfLength (10 ).toLowerCase (Locale .ROOT );
181+ createIndex (
182+ indexName ,
183+ Settings .builder ().put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 ).put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 ).build ()
184+ );
185+ // get current disk space usage (for all indices on the node)
186+ IndicesStatsResponse stats = indicesAdmin ().prepareStats ().clear ().setStore (true ).get ();
187+ long usedDiskSpaceAfterIndexing = stats .getTotal ().getStore ().sizeInBytes ();
188+ // restrict the total disk space such that the next merge does not have sufficient disk space
189+ long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween (1L , 10L );
190+ setTotalSpace (node , insufficientTotalDiskSpace );
191+ // node stats' FS stats should report that there is insufficient disk space available
192+ assertBusy (() -> {
193+ NodesStatsResponse nodesStatsResponse = client ().admin ().cluster ().prepareNodesStats ().setFs (true ).get ();
194+ assertThat (nodesStatsResponse .getNodes ().size (), equalTo (1 ));
195+ NodeStats nodeStats = nodesStatsResponse .getNodes ().get (0 );
196+ assertThat (nodeStats .getFs ().getTotal ().getTotal ().getBytes (), equalTo (insufficientTotalDiskSpace ));
197+ assertThat (nodeStats .getFs ().getTotal ().getAvailable ().getBytes (), lessThan (MERGE_DISK_HIGH_WATERMARK_BYTES ));
198+ });
199+ int indexingRounds = randomIntBetween (2 , 5 );
200+ while (indexingRounds -- > 0 ) {
201+ indexRandom (
202+ true ,
203+ true ,
204+ true ,
205+ false ,
206+ IntStream .range (1 , randomIntBetween (2 , 5 ))
207+ .mapToObj (i -> prepareIndex (indexName ).setSource ("field" , randomAlphaOfLength (50 )))
208+ .toList ()
209+ );
210+ }
211+ // the max segments argument makes it a blocking call
212+ ActionFuture <BroadcastResponse > forceMergeFuture = indicesAdmin ().prepareForceMerge (indexName ).setMaxNumSegments (1 ).execute ();
213+ assertBusy (() -> {
214+ // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
215+ assertThat (threadPoolMergeExecutorService .getMergeTasksQueueLength (), equalTo (1 ));
216+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
217+ // telemetry says that there are indeed some segments enqueued to be merged
218+ testTelemetryPlugin .collect ();
219+ assertThat (
220+ testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_QUEUED_USAGE ).getLast ().getLong (),
221+ greaterThan (0L )
222+ );
223+ // but still no merges are currently running
224+ assertThat (
225+ testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_RUNNING_USAGE ).getLast ().getLong (),
226+ equalTo (0L )
227+ );
228+ // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
229+ IndicesStatsResponse indicesStatsResponse = client ().admin ().indices ().prepareStats (indexName ).setMerge (true ).get ();
230+ long currentMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getCurrent ();
231+ assertThat (currentMergeCount , equalTo (0L ));
232+ });
233+ // the force merge call is still blocked
234+ assertFalse (forceMergeFuture .isCancelled ());
235+ assertFalse (forceMergeFuture .isDone ());
236+ // merge executor still confirms merging is blocked due to insufficient disk space
237+ assertTrue (threadPoolMergeExecutorService .isMergingBlockedDueToInsufficientDiskSpace ());
238+ // make disk space available in order to unblock the merge
239+ if (randomBoolean ()) {
240+ setTotalSpace (node , Long .MAX_VALUE );
241+ } else {
242+ updateClusterSettings (
243+ Settings .builder ().put (ThreadPoolMergeExecutorService .INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING .getKey (), "0b" )
244+ );
245+ }
246+ // wait for the merge call to return
247+ safeGet (forceMergeFuture );
248+ IndicesStatsResponse indicesStatsResponse = indicesAdmin ().prepareStats (indexName ).setMerge (true ).get ();
249+ testTelemetryPlugin .collect ();
250+ // assert index stats and telemetry report no merging in progress (after force merge returned)
251+ long currentMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getCurrent ();
252+ assertThat (currentMergeCount , equalTo (0L ));
253+ assertThat (testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_QUEUED_USAGE ).getLast ().getLong (), equalTo (0L ));
254+ assertThat (testTelemetryPlugin .getLongGaugeMeasurement (MergeMetrics .MERGE_SEGMENTS_RUNNING_USAGE ).getLast ().getLong (), equalTo (0L ));
255+ // but some merging took place (there might have been other merges automatically triggered before the force merge call)
256+ long totalMergeCount = indicesStatsResponse .getIndices ().get (indexName ).getPrimaries ().merge .getTotal ();
257+ assertThat (totalMergeCount , greaterThan (0L ));
258+ assertThat (testTelemetryPlugin .getLongCounterMeasurement (MergeMetrics .MERGE_DOCS_TOTAL ).getLast ().getLong (), greaterThan (0L ));
259+ // assert there's a single segment after the force merge
260+ List <ShardSegments > shardSegments = getShardSegments (indexName );
261+ assertThat (shardSegments .size (), equalTo (1 ));
262+ assertThat (shardSegments .get (0 ).getSegments ().size (), equalTo (1 ));
263+ assertAcked (indicesAdmin ().prepareDelete (indexName ).get ());
264+ }
265+
155266 public void setTotalSpace (String dataNodeName , long totalSpace ) {
156267 getTestFileStore (dataNodeName ).setTotalSpace (totalSpace );
157268 refreshClusterInfo ();
158269 }
270+
271+ private TestTelemetryPlugin getTelemetryPlugin (String dataNodeName ) {
272+ var plugin = internalCluster ().getInstance (PluginsService .class , dataNodeName )
273+ .filterPlugins (TestTelemetryPlugin .class )
274+ .findFirst ()
275+ .orElseThrow ();
276+ plugin .resetMeter ();
277+ return plugin ;
278+ }
159279}
0 commit comments