Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
bf557d2
ExecutorMergeScheduler
albertzaharovits Jan 16, 2025
a3f87df
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 16, 2025
f5a1a8d
[CI] Auto commit changes from spotless
Jan 16, 2025
f0b72fe
wrap for merge in the executor merge scheduler
albertzaharovits Jan 16, 2025
9b03950
spotless
albertzaharovits Jan 16, 2025
26e4043
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 17, 2025
aba69d0
Fix InternalEngineTests
albertzaharovits Jan 17, 2025
52796b5
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 17, 2025
c0667bf
implemented Throttling
albertzaharovits Jan 18, 2025
2da753f
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 18, 2025
2c8dc7f
[CI] Auto commit changes from spotless
Jan 18, 2025
81cc0f1
Checkstyle
albertzaharovits Jan 18, 2025
f58120f
Fix threadpool size for SnapshotResiliencyTests
albertzaharovits Jan 19, 2025
5ca992d
Spotless
albertzaharovits Jan 19, 2025
3c203cb
Nit
albertzaharovits Jan 19, 2025
6c21654
Implemented max thread setting
albertzaharovits Jan 19, 2025
68079d9
Throttling ?
albertzaharovits Jan 19, 2025
7b68ba9
Checkstyle
albertzaharovits Jan 19, 2025
9e467a1
Indexing throttling !
albertzaharovits Jan 19, 2025
a8f5297
Better throttling logging
albertzaharovits Jan 19, 2025
928fd32
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 20, 2025
3f5b4a8
Don't wrap errors during merging
albertzaharovits Jan 20, 2025
0e714a1
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 20, 2025
0297cce
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 21, 2025
2b79809
Refresh config
albertzaharovits Jan 21, 2025
57c2a5c
Nit
albertzaharovits Jan 21, 2025
60a71b8
WIP
albertzaharovits Jan 23, 2025
68db209
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges
albertzaharovits Jan 23, 2025
4099ac5
IO throttling
albertzaharovits Jan 24, 2025
5554bc2
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges
albertzaharovits Jan 24, 2025
17b682f
[CI] Auto commit changes from spotless
Jan 24, 2025
f3506da
this-escape
albertzaharovits Jan 24, 2025
6a3911a
Unregister
albertzaharovits Jan 24, 2025
00070cf
Sort merge schedulers
albertzaharovits Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ public static final IndexShard newIndexShard(
indexService.getIndexEventListener(),
wrapper,
indexService.getThreadPool(),
indexService.getThreadPoolMergeExecutor(),
indexService.getBigArrays(),
null,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
return new EngineConfig(
config.getShardId(),
config.getThreadPool(),
config.getThreadPoolMergeExecutor(),
indexSettings,
config.getWarmer(),
config.getStore(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutor;
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
Expand Down Expand Up @@ -154,6 +155,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust

private final AsyncTrimTranslogTask trimTranslogTask;
private final ThreadPool threadPool;
private final ThreadPoolMergeExecutor threadPoolMergeExecutor;
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final ClusterService clusterService;
Expand Down Expand Up @@ -260,6 +262,7 @@ public IndexService(
this.indexFoldersDeletionListener = indexFoldersDeletionListener;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
this.threadPoolMergeExecutor = new ThreadPoolMergeExecutor(threadPool);
this.scriptService = scriptService;
this.clusterService = clusterService;
this.client = client;
Expand Down Expand Up @@ -555,6 +558,7 @@ public synchronized IndexShard createShard(
eventListener,
readerWrapper,
threadPool,
threadPoolMergeExecutor,
bigArrays,
engineWarmer,
searchOperationListeners,
Expand Down Expand Up @@ -819,6 +823,10 @@ public ThreadPool getThreadPool() {
return threadPool;
}

public ThreadPoolMergeExecutor getThreadPoolMergeExecutor() {
return threadPoolMergeExecutor;
}

/**
* The {@link BigArrays} to use for this index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public final class EngineConfig {
private final MapperService mapperService;
private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
private final ThreadPool threadPool;
private final ThreadPoolMergeExecutor threadPoolMergeExecutor;
private final Engine.Warmer warmer;
private final Store store;
private final MergePolicy mergePolicy;
Expand Down Expand Up @@ -150,6 +151,7 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
public EngineConfig(
ShardId shardId,
ThreadPool threadPool,
ThreadPoolMergeExecutor threadPoolMergeExecutor,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
Expand Down Expand Up @@ -179,6 +181,7 @@ public EngineConfig(
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
this.threadPoolMergeExecutor = threadPoolMergeExecutor;
this.warmer = warmer == null ? (a) -> {} : warmer;
this.store = store;
this.mergePolicy = mergePolicy;
Expand Down Expand Up @@ -287,6 +290,10 @@ public ThreadPool getThreadPool() {
return threadPool;
}

public ThreadPoolMergeExecutor getThreadPoolMergeExecutor() {
return threadPoolMergeExecutor;
}

/**
* Returns an {@link org.elasticsearch.index.engine.Engine.Warmer} used to warm new searchers before they are used for searching.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,11 @@ public InternalEngine(EngineConfig engineConfig) {
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
mergeScheduler = createMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
mergeScheduler = createMergeScheduler(
engineConfig.getShardId(),
engineConfig.getIndexSettings(),
engineConfig.getThreadPoolMergeExecutor()
);
scheduler = mergeScheduler.getMergeScheduler();
throttle = new IndexThrottle();
try {
Expand Down Expand Up @@ -2823,8 +2827,72 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() {
return indexWriter.getConfig();
}

protected ElasticsearchMergeScheduler createMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
return new EngineMergeScheduler(shardId, indexSettings);
protected ElasticsearchMergeScheduler createMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutor threadPoolMergeExecutor
) {
// return new EngineMergeScheduler(shardId, indexSettings);
return new ThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutor) {

@Override
protected synchronized void activateThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {
logger.info(
"now throttling indexing: numRunningMerges={}, numQueuedMerges={}, maxNumMergesConfigured={}",
numRunningMerges,
numQueuedMerges,
configuredMaxMergeCount
);
InternalEngine.this.activateThrottling();
}

@Override
protected synchronized void deactivateThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {
logger.info(
"stop throttling indexing: numRunningMerges={}, numQueuedMerges={}, maxNumMergesConfigured={}",
numRunningMerges,
numQueuedMerges,
configuredMaxMergeCount
);
InternalEngine.this.deactivateThrottling();
}

@Override
public synchronized void afterMerge(OnGoingMerge merge) {
if (indexWriter.hasPendingMerges() == false
&& System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer
// we deadlock on engine#close for instance.
engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (isClosed.get() == false) {
logger.warn("failed to flush after merge has finished", e);
} else {
logger.info("failed to flush after merge has finished during shard close");
}
}

@Override
protected void doRun() {
// if we have no pending merges and we are supposed to flush once merges have finished to
// free up transient disk usage of the (presumably biggish) segments that were just merged
flush();
}
});
} else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) {
// we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change
// we should execute a flush on the next operation if that's a flush after inactive or indexing a document.
// we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events.
shouldPeriodicallyFlushAfterBigMerge.set(true);
}
}

@Override
protected void handleMergeException(final Throwable exc) {
mergeException(exc);
}
};
}

private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeScheduler {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

public class ThreadPoolMergeExecutor {
/**
* Floor for IO write rate limit (we will never go any lower than this)
*/
private static final double MIN_MERGE_MB_PER_SEC = 5.0;
/**
* Ceiling for IO write rate limit (we will never go any higher than this)
*/
private static final double MAX_MERGE_MB_PER_SEC = 10240.0;
/**
* Initial value for IO write rate limit when doAutoIOThrottle is true
*/
private static final double START_MB_PER_SEC = 20.0;
/**
* Current IO write throttle rate, for all merge, across all merge schedulers (shards) on the node
*/
private double targetMBPerSec = START_MB_PER_SEC;
private final SortedSet<ThreadPoolMergeScheduler> registeredMergeSchedulers = new TreeSet<>(new Comparator<ThreadPoolMergeScheduler>() {
@Override
public int compare(ThreadPoolMergeScheduler tpms1, ThreadPoolMergeScheduler tpms2) {
MergeTask mergeTask1 = tpms1.peekMergeTaskToExecute();
MergeTask mergeTask2 = tpms2.peekMergeTaskToExecute();
if (mergeTask1 == null && mergeTask2 == null) {
// arbitrary order between schedulers that cannot run any merge right now
return System.identityHashCode(tpms1) - System.identityHashCode(tpms2);
} else if (mergeTask1 == null) {
// "merge task 2" can run because "merge scheduler 1" cannot run any merges
return 1;
} else if (mergeTask2 == null) {
// "merge task 1" can run because "merge scheduler 2" cannot run any merges
return -1;
} else {
// run smaller merge task first
return mergeTask1.compareTo(mergeTask2);
}
}
});
private final ExecutorService executorService;
private final int maxConcurrentMerges;
private int currentlyExecutingMergesCount;
private int currentlyActiveIOThrottledMergesCount;

public ThreadPoolMergeExecutor(ThreadPool threadPool) {
this.executorService = threadPool.executor(ThreadPool.Names.MERGE);
this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax();
}

public double getTargetMBPerSec() {
return targetMBPerSec;
}

public synchronized void registerMergeScheduler(ThreadPoolMergeScheduler threadPoolMergeScheduler) {
if (registeredMergeSchedulers.add(threadPoolMergeScheduler) == false) {
throw new IllegalStateException("cannot register the same scheduler multiple times");
}
}

public synchronized void unregisterMergeScheduler(ThreadPoolMergeScheduler threadPoolMergeScheduler) {
if (registeredMergeSchedulers.remove(threadPoolMergeScheduler) == false) {
throw new IllegalStateException("cannot unregister if the scheduler has not been registered");
}
}

public synchronized void updateMergeScheduler(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some perhaps unqualified anxiety around this central mutex across the node. Probably not a real issue but noting it anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is an issue, but I don't see a better way in the current design.

ThreadPoolMergeScheduler threadPoolMergeScheduler,
Consumer<ThreadPoolMergeScheduler> updater
) {
boolean removed = registeredMergeSchedulers.remove(threadPoolMergeScheduler);
if (false == removed) {
throw new IllegalStateException("Cannot update a merge scheduler that is not registered");
}
currentlyExecutingMergesCount -= threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size();
currentlyActiveIOThrottledMergesCount -= getIOThrottledMergeTasksCount(threadPoolMergeScheduler);
updater.accept(threadPoolMergeScheduler);
boolean added = registeredMergeSchedulers.add(threadPoolMergeScheduler);
if (false == added) {
throw new IllegalStateException("Found duplicate registered merge scheduler");
}
currentlyExecutingMergesCount += threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size();
currentlyActiveIOThrottledMergesCount += getIOThrottledMergeTasksCount(threadPoolMergeScheduler);
double newTargetMBPerSec = maybeUpdateTargetMBPerSec();
if (newTargetMBPerSec != targetMBPerSec) {
targetMBPerSec = newTargetMBPerSec;
threadPoolMergeScheduler.setIORateLimitForAllMergeTasks(newTargetMBPerSec);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not set it for all merge schedulers or running tasks?

I wonder if we should keep a list of the running tasks in this class so we can easily set it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right it should be set for all merge schedulers, and, as you say, it would better be set for the running tasks only (and set it when they start).

}
maybeExecuteNextMerges();
}
Comment on lines +84 to +107
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An to a threadpool has the potential to reorder it in the sorted set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also has the potential to change the stats related to IO throttling, because that is done "globally" i.e. across all shards on the node.


public synchronized void maybeExecuteNextMerges() {
while (true) {
if (currentlyExecutingMergesCount >= maxConcurrentMerges) {
// all merge threads are busy
return;
}
if (registeredMergeSchedulers.first().peekMergeTaskToExecute() == null) {
// no merges available to run
return;
}
ThreadPoolMergeScheduler threadPoolMergeScheduler = registeredMergeSchedulers.removeFirst();
currentlyExecutingMergesCount -= threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size();
MergeTask mergeTask = threadPoolMergeScheduler.executeNextMergeTask();
assert mergeTask != null;
executorService.execute(mergeTask);
registeredMergeSchedulers.add(threadPoolMergeScheduler);
currentlyExecutingMergesCount += threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size();
}
}

private int getIOThrottledMergeTasksCount(ThreadPoolMergeScheduler mergeScheduler) {
if (mergeScheduler.shouldIOThrottleMergeTasks() == false) {
return 0;
} else {
int count = 0;
for (MergeTask runningMergeTask : mergeScheduler.getCurrentlyRunningMergeTasks()) {
if (runningMergeTask.supportsIOThrottling()) {
count++;
}
}
for (MergeTask queuedMergeTask : mergeScheduler.getQueuedMergeTasks()) {
if (queuedMergeTask.supportsIOThrottling()) {
count++;
}
}
return count;
}
}

private double maybeUpdateTargetMBPerSec() {
if (currentlyActiveIOThrottledMergesCount < maxConcurrentMerges * 2 && targetMBPerSec > MIN_MERGE_MB_PER_SEC) {
return Math.max(MIN_MERGE_MB_PER_SEC, targetMBPerSec / 1.1);
} else if (currentlyActiveIOThrottledMergesCount > maxConcurrentMerges * 4 && targetMBPerSec < MAX_MERGE_MB_PER_SEC) {
return Math.min(MAX_MERGE_MB_PER_SEC, targetMBPerSec * 1.1);
}
return targetMBPerSec;
}
}
Loading