Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bf557d2
ExecutorMergeScheduler
albertzaharovits Jan 16, 2025
a3f87df
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 16, 2025
f5a1a8d
[CI] Auto commit changes from spotless
Jan 16, 2025
f0b72fe
wrap for merge in the executor merge scheduler
albertzaharovits Jan 16, 2025
9b03950
spotless
albertzaharovits Jan 16, 2025
26e4043
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 17, 2025
aba69d0
Fix InternalEngineTests
albertzaharovits Jan 17, 2025
52796b5
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 17, 2025
c0667bf
implemented Throttling
albertzaharovits Jan 18, 2025
2da753f
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 18, 2025
2c8dc7f
[CI] Auto commit changes from spotless
Jan 18, 2025
81cc0f1
Checkstyle
albertzaharovits Jan 18, 2025
f58120f
Fix threadpool size for SnapshotResiliencyTests
albertzaharovits Jan 19, 2025
5ca992d
Spotless
albertzaharovits Jan 19, 2025
3c203cb
Nit
albertzaharovits Jan 19, 2025
6c21654
Implemented max thread setting
albertzaharovits Jan 19, 2025
68079d9
Throttling ?
albertzaharovits Jan 19, 2025
7b68ba9
Checkstyle
albertzaharovits Jan 19, 2025
9e467a1
Indexing throttling !
albertzaharovits Jan 19, 2025
a8f5297
Better throttling logging
albertzaharovits Jan 19, 2025
928fd32
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 20, 2025
3f5b4a8
Don't wrap errors during merging
albertzaharovits Jan 20, 2025
0e714a1
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 20, 2025
0297cce
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 21, 2025
2b79809
Refresh config
albertzaharovits Jan 21, 2025
537bf66
[CI] Auto commit changes from spotless
Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2824,7 +2824,67 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() {
}

protected ElasticsearchMergeScheduler createMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
return new EngineMergeScheduler(shardId, indexSettings);
// return new EngineMergeScheduler(shardId, indexSettings);
return new ThreadPoolMergeScheduler(shardId, indexSettings, engineConfig.getThreadPool()) {

@Override
protected synchronized void activateThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {
logger.info(
"now throttling indexing: numRunningMerges={}, numQueuedMerges={}, maxNumMergesConfigured={}",
numRunningMerges,
numQueuedMerges,
configuredMaxMergeCount
);
InternalEngine.this.activateThrottling();
}

@Override
protected synchronized void deactivateThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {
logger.info(
"stop throttling indexing: numRunningMerges={}, numQueuedMerges={}, maxNumMergesConfigured={}",
numRunningMerges,
numQueuedMerges,
configuredMaxMergeCount
);
InternalEngine.this.deactivateThrottling();
}

@Override
public synchronized void afterMerge(OnGoingMerge merge) {
if (indexWriter.hasPendingMerges() == false
&& System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer
// we deadlock on engine#close for instance.
engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (isClosed.get() == false) {
logger.warn("failed to flush after merge has finished", e);
} else {
logger.info("failed to flush after merge has finished during shard close");
}
}

@Override
protected void doRun() {
// if we have no pending merges and we are supposed to flush once merges have finished to
// free up transient disk usage of the (presumably biggish) segments that were just merged
flush();
}
});
} else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) {
// we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change
// we should execute a flush on the next operation if that's a flush after inactive or indexing a document.
// we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events.
shouldPeriodicallyFlushAfterBigMerge.set(true);
}
}

@Override
protected void handleMergeException(final Throwable exc) {
mergeException(exc);
}
};
}

private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeScheduler {
Expand Down
Loading