Skip to content

Commit 6a0aa1e

Browse files
Use protected methods instead of new constructors
1 parent f0b027a commit 6a0aa1e

File tree

6 files changed

+57
-74
lines changed

6 files changed

+57
-74
lines changed

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,13 +1053,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
10531053
flushThresholdAge = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_AGE_SETTING);
10541054
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
10551055
flushAfterMergeThresholdSize = scopedSettings.get(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
1056-
if (DiscoveryNode.isStateless(nodeSettings)) {
1057-
// disable IO throttling, and thread and merge count throttling for stateless. By setting max merge threads and count to
1058-
// Integer.MAX_VALUE we effectively disable them
1059-
mergeSchedulerConfig = new MergeSchedulerConfig(false, Integer.MAX_VALUE, Integer.MAX_VALUE);
1060-
} else {
1061-
mergeSchedulerConfig = new MergeSchedulerConfig(this);
1062-
}
1056+
mergeSchedulerConfig = new MergeSchedulerConfig(this);
10631057
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
10641058
softDeleteEnabled = scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
10651059
assert softDeleteEnabled || version.before(IndexVersions.V_8_0_0) : "soft deletes must be enabled in version " + version;

server/src/main/java/org/elasticsearch/index/MergeSchedulerConfig.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,6 @@ public final class MergeSchedulerConfig {
7575
this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING);
7676
}
7777

78-
MergeSchedulerConfig(boolean autoThrottle, int maxThreadCount, int maxMergeCount) {
79-
this.autoThrottle = autoThrottle;
80-
this.maxThreadCount = maxThreadCount;
81-
this.maxMergeCount = maxMergeCount;
82-
setMaxThreadAndMergeCount(maxThreadCount, maxMergeCount);
83-
}
84-
8578
/**
8679
* Returns <code>true</code> iff auto throttle is enabled.
8780
*

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2909,7 +2909,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
29092909
IndexSettings indexSettings,
29102910
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
29112911
) {
2912-
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes, () -> false);
2912+
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
29132913
}
29142914

29152915
@Override

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import java.util.concurrent.TimeUnit;
4242
import java.util.concurrent.atomic.AtomicBoolean;
4343
import java.util.concurrent.atomic.AtomicLong;
44-
import java.util.function.BooleanSupplier;
4544

4645
public class ThreadPoolMergeScheduler extends MergeScheduler implements ElasticsearchMergeScheduler {
4746
public static final Setting<Boolean> USE_THREAD_POOL_MERGE_SCHEDULER_SETTING = Setting.boolSetting(
@@ -67,24 +66,20 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
6766
private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1);
6867
private volatile boolean closed = false;
6968
private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;
70-
// if true, scheduled merges will be aborted
71-
private final BooleanSupplier shouldSkipMerge;
7269

7370
/**
7471
* Creates a thread-pool-based merge scheduler that runs merges in a thread pool.
7572
*
76-
* @param shardId the shard id associated with this merge scheduler
77-
* @param indexSettings used to obtain the {@link MergeSchedulerConfig}
73+
* @param shardId the shard id associated with this merge scheduler
74+
* @param indexSettings used to obtain the {@link MergeSchedulerConfig}
7875
* @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler
79-
* @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take
80-
* @param shouldSkipMerge if true, scheduled merges will be skipped i.e. aborted by the executor
76+
* @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take
8177
*/
8278
public ThreadPoolMergeScheduler(
8379
ShardId shardId,
8480
IndexSettings indexSettings,
8581
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
86-
MergeMemoryEstimateProvider mergeMemoryEstimateProvider,
87-
BooleanSupplier shouldSkipMerge
82+
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
8883
) {
8984
this.shardId = shardId;
9085
this.config = indexSettings.getMergeSchedulerConfig();
@@ -97,7 +92,6 @@ public ThreadPoolMergeScheduler(
9792
);
9893
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
9994
this.mergeMemoryEstimateProvider = mergeMemoryEstimateProvider;
100-
this.shouldSkipMerge = shouldSkipMerge;
10195
}
10296

10397
@Override
@@ -181,6 +175,34 @@ protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerge
181175
*/
182176
protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {}
183177

178+
/**
179+
* Returns true if scheduled merges should be skipped (aborted)
180+
*/
181+
protected boolean shouldSkipMerge() {
182+
return false;
183+
}
184+
185+
/**
186+
* Returns true if IO-throttling is enabled
187+
*/
188+
protected boolean isAutoThrottle() {
189+
return config.isAutoThrottle();
190+
}
191+
192+
/**
193+
* Returns the maximum number of active merges before being throttled
194+
*/
195+
protected int getMaxMergeCount() {
196+
return config.getMaxMergeCount();
197+
}
198+
199+
/**
200+
* Returns the maximum number of threads running merges before being throttled
201+
*/
202+
protected int getMaxThreadCount() {
203+
return config.getMaxThreadCount();
204+
}
205+
184206
/**
185207
* A callback for exceptions thrown while merging.
186208
*/
@@ -208,7 +230,7 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg
208230
return new MergeTask(
209231
mergeSource,
210232
merge,
211-
isAutoThrottle && config.isAutoThrottle(),
233+
isAutoThrottle && isAutoThrottle(),
212234
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId,
213235
estimateMergeMemoryBytes
214236
);
@@ -218,7 +240,7 @@ private void checkMergeTaskThrottling() {
218240
long submittedMergesCount = submittedMergeTaskCount.get();
219241
long doneMergesCount = doneMergeTaskCount.get();
220242
int runningMergesCount = runningMergeTasks.size();
221-
int configuredMaxMergeCount = config.getMaxMergeCount();
243+
int configuredMaxMergeCount = getMaxMergeCount();
222244
// both currently running and enqueued merge tasks are considered "active" for throttling purposes
223245
int activeMerges = (int) (submittedMergesCount - doneMergesCount);
224246
if (activeMerges > configuredMaxMergeCount
@@ -248,12 +270,12 @@ synchronized Schedule schedule(MergeTask mergeTask) {
248270
if (closed) {
249271
// do not run or backlog tasks when closing the merge scheduler, instead abort them
250272
return Schedule.ABORT;
251-
} else if (shouldSkipMerge.getAsBoolean()) {
273+
} else if (shouldSkipMerge()) {
252274
if (verbose()) {
253275
message(String.format(Locale.ROOT, "skipping merge task %s", mergeTask));
254276
}
255277
return Schedule.ABORT;
256-
} else if (runningMergeTasks.size() < config.getMaxThreadCount()) {
278+
} else if (runningMergeTasks.size() < getMaxThreadCount()) {
257279
boolean added = runningMergeTasks.put(mergeTask.onGoingMerge.getMerge(), mergeTask) == null;
258280
assert added : "starting merge task [" + mergeTask + "] registered as already running";
259281
return Schedule.RUN;
@@ -286,7 +308,7 @@ private synchronized void maybeSignalAllMergesDoneAfterClose() {
286308
}
287309

288310
private synchronized void enqueueBackloggedTasks() {
289-
int maxBackloggedTasksToEnqueue = config.getMaxThreadCount() - runningMergeTasks.size();
311+
int maxBackloggedTasksToEnqueue = getMaxThreadCount() - runningMergeTasks.size();
290312
// enqueue all backlogged tasks when closing, as the queue expects all backlogged tasks to always be enqueued back
291313
while (closed || maxBackloggedTasksToEnqueue-- > 0) {
292314
MergeTask backloggedMergeTask = backloggedMergeTasks.poll();

server/src/test/java/org/elasticsearch/index/IndexSettingsTests.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -442,26 +442,6 @@ public void testStatelessFastRefreshDisableRefreshInterval() {
442442
assertEquals(TimeValue.MINUS_ONE, settings.getRefreshInterval());
443443
}
444444

445-
public void testStatelessMergeSchedulerConfig() {
446-
IndexMetadata metadata = IndexMetadata.builder("index")
447-
.system(true)
448-
.settings(
449-
indexSettings(IndexVersion.current(), 1, 1).put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
450-
// set auto throttling to true and set merge/thread count to a low number
451-
.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), true)
452-
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), 2)
453-
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), 2)
454-
.build()
455-
)
456-
.build();
457-
IndexSettings settings = new IndexSettings(metadata, Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, true).build());
458-
// for stateless we want to ensure that auto throttling, merge count throttling, and thread throttling are all effectively
459-
// disabled, ignoring the specified settings
460-
assertFalse(settings.getMergeSchedulerConfig().isAutoThrottle());
461-
assertEquals(Integer.MAX_VALUE, settings.getMergeSchedulerConfig().getMaxMergeCount());
462-
assertEquals(Integer.MAX_VALUE, settings.getMergeSchedulerConfig().getMaxThreadCount());
463-
}
464-
465445
private String getRandomTimeString() {
466446
int refreshIntervalInt = randomFrom(-1, Math.abs(randomInt()));
467447
String refreshInterval = Integer.toString(refreshIntervalInt);

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ public void testMergesExecuteInSizeOrder() throws IOException {
6363
new ShardId("index", "_na_", 1),
6464
IndexSettingsModule.newIndexSettings("index", Settings.EMPTY),
6565
threadPoolMergeExecutorService,
66-
merge -> 0,
67-
() -> false
66+
merge -> 0
6867
)
6968
) {
7069
List<OneMerge> executedMergesList = new ArrayList<>();
@@ -107,8 +106,7 @@ public void testSimpleMergeTaskBacklogging() {
107106
new ShardId("index", "_na_", 1),
108107
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
109108
threadPoolMergeExecutorService,
110-
merge -> 0,
111-
() -> false
109+
merge -> 0
112110
);
113111
// more merge tasks than merge threads
114112
int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5);
@@ -142,8 +140,7 @@ public void testSimpleMergeTaskReEnqueueingBySize() {
142140
new ShardId("index", "_na_", 1),
143141
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
144142
threadPoolMergeExecutorService,
145-
merge -> 0,
146-
() -> false
143+
merge -> 0
147144
);
148145
// sort backlogged merges by size
149146
PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(16, Comparator.comparingLong(MergeTask::estimatedMergeSize));
@@ -357,8 +354,7 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception
357354
new ShardId("index", "_na_", 1),
358355
IndexSettingsModule.newIndexSettings("index", settings),
359356
threadPoolMergeExecutorService,
360-
merge -> 0,
361-
() -> false
357+
merge -> 0
362358
)
363359
) {
364360
MergeSource mergeSource = mock(MergeSource.class);
@@ -432,8 +428,7 @@ public void testMergesRunConcurrently() throws Exception {
432428
new ShardId("index", "_na_", 1),
433429
IndexSettingsModule.newIndexSettings("index", settings),
434430
threadPoolMergeExecutorService,
435-
merge -> 0,
436-
() -> false
431+
merge -> 0
437432
)
438433
) {
439434
// at least 1 extra merge than there are concurrently allowed
@@ -518,8 +513,7 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception {
518513
new ShardId("index", "_na_", 1),
519514
IndexSettingsModule.newIndexSettings("index", settings),
520515
threadPoolMergeExecutorService,
521-
merge -> 0,
522-
() -> false
516+
merge -> 0
523517
)
524518
) {
525519
CountDownLatch mergeDoneLatch = new CountDownLatch(1);
@@ -592,8 +586,7 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce
592586
new ShardId("index", "_na_", 1),
593587
indexSettings,
594588
threadPoolMergeExecutorService,
595-
merge -> 0,
596-
() -> false
589+
merge -> 0
597590
)
598591
) {
599592
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
@@ -623,8 +616,7 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
623616
new ShardId("index", "_na_", 1),
624617
indexSettings,
625618
threadPoolMergeExecutorService,
626-
merge -> 0,
627-
() -> false
619+
merge -> 0
628620
)
629621
) {
630622
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
@@ -641,8 +633,7 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
641633
new ShardId("index", "_na_", 1),
642634
indexSettings,
643635
threadPoolMergeExecutorService,
644-
merge -> 0,
645-
() -> false
636+
merge -> 0
646637
)
647638
) {
648639
// merge submitted upon closing
@@ -659,8 +650,7 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
659650
new ShardId("index", "_na_", 1),
660651
indexSettings,
661652
threadPoolMergeExecutorService,
662-
merge -> 0,
663-
() -> false
653+
merge -> 0
664654
)
665655
) {
666656
// merge submitted upon closing
@@ -682,9 +672,13 @@ public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() {
682672
new ShardId("index", "_na_", 1),
683673
IndexSettingsModule.newIndexSettings("index", Settings.builder().build()),
684674
threadPoolMergeExecutorService,
685-
merge -> 0,
686-
() -> true
687-
);
675+
merge -> 0
676+
) {
677+
@Override
678+
protected boolean shouldSkipMerge() {
679+
return true;
680+
}
681+
};
688682
MergeSource mergeSource = mock(MergeSource.class);
689683
OneMerge oneMerge = mock(OneMerge.class);
690684
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
@@ -713,7 +707,7 @@ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler {
713707
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
714708
BooleanSupplier shouldSkipMerge
715709
) {
716-
super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0, shouldSkipMerge);
710+
super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0);
717711
}
718712

719713
@Override

0 commit comments

Comments
 (0)