Skip to content

Commit ad6cf4c

Browse files
Trimming code
1 parent 023f042 commit ad6cf4c

File tree

12 files changed

+55
-138
lines changed

12 files changed

+55
-138
lines changed

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,8 @@
8888
import org.elasticsearch.index.IndexSettings;
8989
import org.elasticsearch.index.IndexingPressure;
9090
import org.elasticsearch.index.MergePolicyConfig;
91-
import org.elasticsearch.index.engine.MergeDiskSpaceMonitor;
92-
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
9391
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
92+
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
9493
import org.elasticsearch.index.shard.IndexingStatsSettings;
9594
import org.elasticsearch.indices.IndexingMemoryController;
9695
import org.elasticsearch.indices.IndicesQueryCache;
@@ -630,9 +629,9 @@ public void apply(Settings value, Settings current, Settings previous) {
630629
MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT_SETTING,
631630
MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING,
632631
ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING,
633-
MergeDiskSpaceMonitor.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
634-
MergeDiskSpaceMonitor.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
635-
MergeDiskSpaceMonitor.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING,
632+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
633+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
634+
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING,
636635
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
637636
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
638637
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,

server/src/main/java/org/elasticsearch/index/engine/MergeDiskSpaceMonitor.java

Lines changed: 0 additions & 114 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.PriorityQueue;
3434
import java.util.Set;
3535
import java.util.concurrent.ExecutorService;
36-
import java.util.concurrent.PriorityBlockingQueue;
3736
import java.util.concurrent.RejectedExecutionException;
3837
import java.util.concurrent.atomic.AtomicInteger;
3938
import java.util.concurrent.atomic.AtomicLong;
@@ -348,6 +347,10 @@ class DiskSpaceMonitor implements Runnable {
348347
public void run() {
349348
FsInfo.Path leastAvailablePath = null;
350349
IOException fsInfoException = null;
350+
if (dataPaths == null) {
351+
LOGGER.warn("Cannot read filesystem info because data path is not set in the env");
352+
return;
353+
}
351354
for (NodeEnvironment.DataPath dataPath : dataPaths) {
352355
try {
353356
FsInfo.Path fsInfo = getFSInfo(dataPath); // uncached
@@ -450,6 +453,10 @@ void updateMaxPriorityLimit(long maxPriorityLimit) {
450453
boolean isEmpty() {
451454
return priorityQueue.isEmpty();
452455
}
456+
457+
int size() {
458+
return priorityQueue.size();
459+
}
453460
}
454461

455462
@Override
@@ -525,8 +532,8 @@ Set<MergeTask> getRunningMergeTasks() {
525532
}
526533

527534
// exposed for tests
528-
PriorityBlockingQueue<MergeTask> getQueuedMergeTasks() {
529-
return queuedMergeTasks;
535+
int getMergeTasksQueueLength() {
536+
return queuedMergeTasks.size();
530537
}
531538

532539
// exposed for tests and stats

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -443,14 +443,12 @@ void abort() {
443443
}
444444
}
445445

446-
// TODO javadoc
447446
long estimatedMergeSize() {
448447
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
449448
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
450449
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
451450
}
452451

453-
// TODO javadoc
454452
long estimatedRemainingMergeSize() {
455453
return Math.max(0L, estimatedMergeSize() - rateLimiter.getTotalBytesWritten());
456454
}

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,12 +293,13 @@ protected void doStart() {
293293
IndicesService(IndicesServiceBuilder builder) {
294294
this.settings = builder.settings;
295295
this.threadPool = builder.threadPool;
296+
this.pluginsService = builder.pluginsService;
297+
this.nodeEnv = builder.nodeEnv;
296298
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
297299
threadPool,
298-
settings
300+
settings,
301+
nodeEnv
299302
);
300-
this.pluginsService = builder.pluginsService;
301-
this.nodeEnv = builder.nodeEnv;
302303
this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE)
303304
.withRegistry(builder.xContentRegistry);
304305
this.valuesSourceRegistry = builder.valuesSourceRegistry;

server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,17 @@ public void setUp() throws Exception {
194194
emptyMap()
195195
);
196196
threadPool = new TestThreadPool("test");
197-
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings);
198197
circuitBreakerService = new NoneCircuitBreakerService();
199198
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
200199
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
201200
scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap(), () -> 1L);
202201
clusterService = ClusterServiceUtils.createClusterService(threadPool);
203202
nodeEnvironment = new NodeEnvironment(settings, environment);
203+
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
204+
threadPool,
205+
settings,
206+
nodeEnvironment
207+
);
204208
mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
205209
indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext());
206210
}

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1414
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
1515
import org.elasticsearch.common.util.concurrent.EsExecutors;
16+
import org.elasticsearch.env.NodeEnvironment;
1617
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
1718
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule;
1819
import org.elasticsearch.test.ESTestCase;
@@ -450,7 +451,7 @@ public void testMergeTasksRunConcurrently() throws Exception {
450451
assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(mergeExecutorThreadCount));
451452
// with the other merge tasks enqueued
452453
assertThat(
453-
threadPoolMergeExecutorService.getQueuedMergeTasks().size(),
454+
threadPoolMergeExecutorService.getMergeTasksQueueLength(),
454455
is(totalMergeTasksCount - mergeExecutorThreadCount - finalCompletedTasksCount)
455456
);
456457
// also check thread-pool stats for the same
@@ -470,7 +471,7 @@ public void testMergeTasksRunConcurrently() throws Exception {
470471
// there are fewer available merges than available threads
471472
assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(finalRemainingMergeTasksCount));
472473
// no more merges enqueued
473-
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
474+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0));
474475
// also check thread-pool stats for the same
475476
assertThat(threadPoolExecutor.getActiveCount(), is(finalRemainingMergeTasksCount));
476477
assertThat(threadPoolExecutor.getQueue().size(), is(0));
@@ -518,7 +519,7 @@ public void testThreadPoolStatsWithBackloggedMergeTasks() throws Exception {
518519
assertThat(threadPoolExecutor.getActiveCount(), is(backloggedMergeTasksList.size()));
519520
assertThat(threadPoolExecutor.getQueue().size(), is(0));
520521
}
521-
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
522+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0));
522523
});
523524
// re-enqueue backlogged merge tasks
524525
for (MergeTask backloggedMergeTask : backloggedMergeTasksList) {
@@ -666,7 +667,8 @@ static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPo
666667
threadPool,
667668
randomBoolean()
668669
? Settings.EMPTY
669-
: Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true).build()
670+
: Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true).build(),
671+
mock(NodeEnvironment.class)
670672
);
671673
assertNotNull(threadPoolMergeExecutorService);
672674
assertTrue(threadPoolMergeExecutorService.allDone());

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ public void testMergesRunConcurrently() throws Exception {
455455
// also check the same for the thread-pool executor
456456
assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(mergeSchedulerMaxThreadCount));
457457
// queued merge tasks do not include backlogged merges
458-
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
458+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength().size(), is(0));
459459
// also check thread-pool stats for the same
460460
// there are active thread-pool threads waiting for the backlogged merge tasks to be re-enqueued
461461
int activeMergeThreads = Math.min(mergeCount - finalCompletedMergesCount, mergeExecutorThreadCount);
@@ -476,7 +476,7 @@ public void testMergesRunConcurrently() throws Exception {
476476
// also check thread-pool executor for the same
477477
assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(finalRemainingMergesCount));
478478
// no more backlogged merges
479-
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
479+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength().size(), is(0));
480480
// also check thread-pool stats for the same
481481
assertThat(threadPoolExecutor.getActiveCount(), is(finalRemainingMergesCount));
482482
assertThat(threadPoolExecutor.getQueue().size(), is(0));

server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.core.Releasable;
3434
import org.elasticsearch.core.Strings;
3535
import org.elasticsearch.core.TimeValue;
36+
import org.elasticsearch.env.NodeEnvironment;
3637
import org.elasticsearch.index.Index;
3738
import org.elasticsearch.index.IndexModule;
3839
import org.elasticsearch.index.IndexSettings;
@@ -82,6 +83,7 @@
8283
import static org.hamcrest.Matchers.arrayContaining;
8384
import static org.hamcrest.Matchers.equalTo;
8485
import static org.hamcrest.Matchers.instanceOf;
86+
import static org.mockito.Mockito.mock;
8587

8688
/**
8789
* Tests how {@linkplain RefreshListeners} interacts with {@linkplain InternalEngine}.
@@ -104,7 +106,11 @@ public void setupListeners() throws Exception {
104106
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), randomBoolean())
105107
.build();
106108
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", settings);
107-
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings);
109+
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
110+
threadPool,
111+
settings,
112+
mock(NodeEnvironment.class)
113+
);
108114
listeners = new RefreshListeners(
109115
() -> maxListeners,
110116
() -> engine.refresh("too-many-listeners"),

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.elasticsearch.core.IOUtils;
6969
import org.elasticsearch.core.Nullable;
7070
import org.elasticsearch.core.TimeValue;
71+
import org.elasticsearch.env.NodeEnvironment;
7172
import org.elasticsearch.index.Index;
7273
import org.elasticsearch.index.IndexModule;
7374
import org.elasticsearch.index.IndexSettings;
@@ -147,6 +148,7 @@
147148
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
148149
import static org.hamcrest.Matchers.lessThanOrEqualTo;
149150
import static org.hamcrest.Matchers.notNullValue;
151+
import static org.mockito.Mockito.mock;
150152

151153
@SuppressWarnings("HiddenField")
152154
public abstract class EngineTestCase extends ESTestCase {
@@ -246,7 +248,8 @@ public void setUp() throws Exception {
246248
threadPool = new TestThreadPool(getClass().getName());
247249
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
248250
threadPool,
249-
defaultSettings.getNodeSettings()
251+
defaultSettings.getNodeSettings(),
252+
mock(NodeEnvironment.class)
250253
);
251254

252255
store = createStore();

0 commit comments

Comments
 (0)