From 99fa485ad7741181ebf0d5372fbb4e3ed3a3e9c5 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Tue, 25 Mar 2025 17:51:26 +0200 Subject: [PATCH] Tweaks --- .../org/elasticsearch/index/MergeSchedulerConfig.java | 10 +++++++--- .../index/engine/ThreadPoolMergeExecutorService.java | 4 ++-- .../index/engine/ThreadPoolMergeScheduler.java | 2 +- .../threadpool/DefaultBuiltInExecutorBuilders.java | 9 ++++++++- .../java/org/elasticsearch/threadpool/ThreadPool.java | 4 ++-- 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/MergeSchedulerConfig.java b/server/src/main/java/org/elasticsearch/index/MergeSchedulerConfig.java index ff6987d7d7ff1..709d98a380002 100644 --- a/server/src/main/java/org/elasticsearch/index/MergeSchedulerConfig.java +++ b/server/src/main/java/org/elasticsearch/index/MergeSchedulerConfig.java @@ -15,6 +15,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import static org.elasticsearch.common.util.concurrent.EsExecutors.allocatedProcessors; +import static org.elasticsearch.threadpool.ThreadPool.halfAllocatedProcessorsMaxFive; +import static org.elasticsearch.threadpool.ThreadPool.twiceAllocatedProcessors; + /** * The merge scheduler (ConcurrentMergeScheduler) controls the execution of * merge operations once they are needed (according to the merge policy). Merges @@ -27,7 +31,7 @@ *
  • index.merge.scheduler.max_thread_count: * * The maximum number of threads that may be merging at once. Defaults to - * Math.max(1, Math.min(4, {@link EsExecutors#allocatedProcessors(Settings)} / 2)) + * Math.max(1, Math.min(5, {@link EsExecutors#allocatedProcessors(Settings)} / 2)) * which works well for a good solid-state-disk (SSD). If your index is on * spinning platter drives instead, decrease this to 1. * @@ -45,14 +49,14 @@ public final class MergeSchedulerConfig { public static final Setting MAX_THREAD_COUNT_SETTING = new Setting<>( "index.merge.scheduler.max_thread_count", - (s) -> Integer.toString(Math.max(1, Math.min(4, EsExecutors.allocatedProcessors(s) / 2))), + (s) -> Integer.toString(halfAllocatedProcessorsMaxFive(allocatedProcessors(s))), (s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_thread_count"), Property.Dynamic, Property.IndexScope ); public static final Setting MAX_MERGE_COUNT_SETTING = new Setting<>( "index.merge.scheduler.max_merge_count", - (s) -> Integer.toString(MAX_THREAD_COUNT_SETTING.get(s) + 5), + (s) -> Integer.toString(MAX_THREAD_COUNT_SETTING.get(s) + twiceAllocatedProcessors(allocatedProcessors(s))), (s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_merge_count"), Property.Dynamic, Property.IndexScope diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 5217edb5490dc..33749d3d4a47a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -33,7 +33,7 @@ public class ThreadPoolMergeExecutorService { /** * Floor for IO write rate limit of individual merge tasks (we will never go any lower than this) */ - static final ByteSizeValue MIN_IO_RATE = ByteSizeValue.ofMb(5L); + static final ByteSizeValue MIN_IO_RATE = ByteSizeValue.ofMb(10L); /** * Ceiling for IO write rate limit of individual merge tasks (we will never go any higher than this) */ @@ -41,7 +41,7 @@ public class ThreadPoolMergeExecutorService { /** * Initial value for IO write rate limit of individual merge tasks when doAutoIOThrottle is true */ - static final ByteSizeValue START_IO_RATE = ByteSizeValue.ofMb(20L); + static final ByteSizeValue START_IO_RATE = ByteSizeValue.ofMb(40L); /** * Total number of submitted merge tasks that support IO auto throttling and that have not yet been run (or aborted). * This includes merge tasks that are currently running and that are backlogged (by their respective merge schedulers). diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 8cfdc59268365..7d0a5adf6ef2d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -50,7 +50,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics ); private final ShardId shardId; private final MergeSchedulerConfig config; - private final Logger logger; + protected final Logger logger; private final MergeTracking mergeTracking; private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final PriorityQueue backloggedMergeTasks = new PriorityQueue<>( diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index b8dddc20cc51d..e2d14be6db91f 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -21,6 +21,7 @@ import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING; +import static org.elasticsearch.threadpool.ThreadPool.halfAllocatedProcessors; import static org.elasticsearch.threadpool.ThreadPool.searchAutoscalingEWMA; public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders { @@ -145,7 +146,13 @@ public Map getBuilders(Settings settings, int allocated if (ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.get(settings)) { result.put( ThreadPool.Names.MERGE, - new ScalingExecutorBuilder(ThreadPool.Names.MERGE, 1, allocatedProcessors, TimeValue.timeValueMinutes(5), true) + new ScalingExecutorBuilder( + ThreadPool.Names.MERGE, + 1, + halfAllocatedProcessors(allocatedProcessors), + TimeValue.timeValueMinutes(5), + true + ) ); } result.put( diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 85ee02b6db856..302a5d5c6d235 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -655,7 +655,7 @@ static int halfAllocatedProcessors(final int allocatedProcessors) { return (allocatedProcessors + 1) / 2; } - static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) { + public static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) { return boundedBy(halfAllocatedProcessors(allocatedProcessors), 1, 5); } @@ -663,7 +663,7 @@ static int halfAllocatedProcessorsMaxTen(final int allocatedProcessors) { return boundedBy(halfAllocatedProcessors(allocatedProcessors), 1, 10); } - static int twiceAllocatedProcessors(final int allocatedProcessors) { + public static int twiceAllocatedProcessors(final int allocatedProcessors) { return boundedBy(2 * allocatedProcessors, 2, Integer.MAX_VALUE); }