@@ -63,6 +63,11 @@ public class ThreadPoolMergeExecutorService implements Closeable {
6363 Property .Dynamic ,
6464 Property .NodeScope
6565 );
66+ /**
67+ * The occupied disk space threshold beyond which NO new merges are started.
68+ * Conservatively, the estimated temporary disk space required for the to-be-started merge is counted as occupied disk space.
69+ * Defaults to the routing allocation flood stage limit value (beyond which shards are toggled read-only).
70+ */
6671 public static final Setting <RelativeByteSizeValue > INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING = new Setting <>(
6772 "indices.merge.disk.watermark.high" ,
6873 CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING ,
@@ -91,9 +96,16 @@ public Iterator<Setting<?>> settings() {
9196 Property .Dynamic ,
9297 Property .NodeScope
9398 );
99+ /**
100+ * The available disk space headroom below which NO new merges are started.
101+ * Conservatively, the estimated temporary disk space required for the to-be-started merge is NOT counted as available disk space.
102+ * Defaults to the routing allocation flood stage headroom value (below which shards are toggled read-only),
103+ * unless the merge occupied disk space threshold is specified, in which case the default headroom value here is unset.
104+ */
94105 public static final Setting <ByteSizeValue > INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING = new Setting <>(
95106 "indices.merge.disk.watermark.high.max_headroom" ,
96107 (settings ) -> {
108+ // if the user explicitly set a value for the occupied disk space threshold, disable the implicit headroom value
97109 if (INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING .exists (settings )) {
98110 return "-1" ;
99111 } else {
@@ -119,11 +131,13 @@ public void validate(final ByteSizeValue value, final Map<Setting<?>, Object> se
119131 );
120132 }
121133 }
122- final ByteSizeValue highHeadroom = (ByteSizeValue ) settings .get (INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING );
123134 final RelativeByteSizeValue highWatermark = (RelativeByteSizeValue ) settings .get (INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING );
135+ final ByteSizeValue highHeadroom = (ByteSizeValue ) settings .get (INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING );
124136 if (highWatermark .isAbsolute () && highHeadroom .equals (ByteSizeValue .MINUS_ONE ) == false ) {
125137 throw new IllegalArgumentException (
126- "indices merge max headroom setting is set, but disk watermark value is not a relative value"
138+ "indices merge max headroom setting is set, but indices merge disk watermark value is not a relative value ["
139+ + highWatermark .getStringRep ()
140+ + "]"
127141 );
128142 }
129143 }
@@ -194,9 +208,27 @@ public Iterator<Setting<?>> settings() {
194208 NodeEnvironment nodeEnvironment
195209 ) {
196210 if (clusterSettings .get (USE_THREAD_POOL_MERGE_SCHEDULER_SETTING )) {
197- return new ThreadPoolMergeExecutorService (threadPool , clusterSettings , nodeEnvironment );
211+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = new ThreadPoolMergeExecutorService (
212+ threadPool ,
213+ clusterSettings ,
214+ nodeEnvironment
215+ );
216+ clusterSettings .addSettingsUpdateConsumer (
217+ INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING ,
218+ threadPoolMergeExecutorService .getAvailableDiskSpacePeriodicMonitor ()::setHighStageWatermark
219+ );
220+ clusterSettings .addSettingsUpdateConsumer (
221+ INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING ,
222+ threadPoolMergeExecutorService .getAvailableDiskSpacePeriodicMonitor ()::setHighStageMaxHeadroom
223+ );
224+ clusterSettings .addSettingsUpdateConsumer (
225+ INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING ,
226+ threadPoolMergeExecutorService .getAvailableDiskSpacePeriodicMonitor ()::setCheckInterval
227+ );
228+ return threadPoolMergeExecutorService ;
198229 } else {
199- // register no-op settings consumer so that settings validations works properly,
230+ // register no-op setting update consumers so that setting validations work properly
231+ // (some validations are bypassed if there are no update consumers registered),
200232 // i.e. to reject watermark and max headroom updates if the thread pool merge scheduler is disabled
201233 clusterSettings .addSettingsUpdateConsumer (INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING , (ignored ) -> {});
202234 clusterSettings .addSettingsUpdateConsumer (INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING , (ignored ) -> {});
@@ -226,18 +258,6 @@ private ThreadPoolMergeExecutorService(ThreadPool threadPool, ClusterSettings cl
226258 clusterSettings .get (INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING ),
227259 (availableDiskSpaceByteSize ) -> queuedMergeTasks .updateBudget (availableDiskSpaceByteSize .getBytes ())
228260 );
229- clusterSettings .addSettingsUpdateConsumer (
230- INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING ,
231- this .availableDiskSpacePeriodicMonitor ::setHighStageWatermark
232- );
233- clusterSettings .addSettingsUpdateConsumer (
234- INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING ,
235- this .availableDiskSpacePeriodicMonitor ::setHighStageMaxHeadroom
236- );
237- clusterSettings .addSettingsUpdateConsumer (
238- INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING ,
239- this .availableDiskSpacePeriodicMonitor ::setCheckInterval
240- );
241261 }
242262
243263 boolean submitMergeTask (MergeTask mergeTask ) {
@@ -284,6 +304,10 @@ void reEnqueueBackloggedMergeTask(MergeTask mergeTask) {
284304 enqueueMergeTask (mergeTask );
285305 }
286306
307+ AvailableDiskSpacePeriodicMonitor getAvailableDiskSpacePeriodicMonitor () {
308+ return this .availableDiskSpacePeriodicMonitor ;
309+ }
310+
287311 private void enqueueMergeTask (MergeTask mergeTask ) {
288312 // To ensure that for a given merge onMergeQueued is called before onMergeAborted or onMergeCompleted, we call onMergeQueued
289313 // before adding the merge task to the queue. Adding to the queue should not fail.
@@ -413,16 +437,16 @@ static class AvailableDiskSpacePeriodicMonitor implements Closeable {
413437 reschedule ();
414438 }
415439
416- public void setCheckInterval (TimeValue checkInterval ) {
440+ void setCheckInterval (TimeValue checkInterval ) {
417441 this .checkInterval = checkInterval ;
418442 reschedule ();
419443 }
420444
421- public void setHighStageWatermark (RelativeByteSizeValue highStageWatermark ) {
445+ void setHighStageWatermark (RelativeByteSizeValue highStageWatermark ) {
422446 this .highStageWatermark = highStageWatermark ;
423447 }
424448
425- public void setHighStageMaxHeadroom (ByteSizeValue highStageMaxHeadroom ) {
449+ void setHighStageMaxHeadroom (ByteSizeValue highStageMaxHeadroom ) {
426450 this .highStageMaxHeadroom = highStageMaxHeadroom ;
427451 }
428452
0 commit comments