Skip to content

Commit bf557d2

Browse files
ExecutorMergeScheduler
1 parent 7253cb6 commit bf557d2

File tree

4 files changed

+330
-1
lines changed

4 files changed

+330
-1
lines changed
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.index.MergePolicy;
14+
import org.apache.lucene.index.MergeRateLimiter;
15+
import org.apache.lucene.index.MergeScheduler;
16+
import org.apache.lucene.index.MergeTrigger;
17+
import org.elasticsearch.common.logging.Loggers;
18+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
19+
import org.elasticsearch.core.TimeValue;
20+
import org.elasticsearch.index.IndexSettings;
21+
import org.elasticsearch.index.MergeSchedulerConfig;
22+
import org.elasticsearch.index.merge.MergeStats;
23+
import org.elasticsearch.index.merge.OnGoingMerge;
24+
import org.elasticsearch.index.shard.ShardId;
25+
26+
import java.io.IOException;
27+
import java.util.Locale;
28+
import java.util.Set;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.TimeUnit;
31+
32+
public class ExecutorMergeScheduler extends MergeScheduler implements ElasticsearchMergeScheduler {
33+
34+
private final MergeSchedulerConfig config;
35+
private final Logger logger;
36+
private final MergeTracking mergeTracking;
37+
private final ExecutorService executorService;
38+
private final ThreadLocal<MergeRateLimiter> onGoingMergeRateLimiter = new ThreadLocal<>();
39+
40+
public ExecutorMergeScheduler(ShardId shardId, IndexSettings indexSettings, ExecutorService executorService) {
41+
this.config = indexSettings.getMergeSchedulerConfig();
42+
this.logger = Loggers.getLogger(getClass(), shardId);
43+
// TODO: use real IO rate here
44+
this.mergeTracking = new MergeTracking(logger, () -> Double.POSITIVE_INFINITY);
45+
this.executorService = executorService;
46+
}
47+
@Override
48+
public Set<OnGoingMerge> onGoingMerges() {
49+
return mergeTracking.onGoingMerges();
50+
}
51+
52+
@Override
53+
public MergeStats stats() {
54+
return mergeTracking.stats();
55+
}
56+
57+
@Override
58+
public MergeScheduler getMergeScheduler() {
59+
return this;
60+
}
61+
62+
@Override
63+
public void refreshConfig() {
64+
// No-op
65+
}
66+
67+
@Override
68+
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
69+
MergePolicy.OneMerge merge = mergeSource.getNextMerge();
70+
if (merge != null) {
71+
submitNewMergeTask(mergeSource, merge);
72+
}
73+
}
74+
75+
/**
76+
* A callback allowing for custom logic before an actual merge starts.
77+
*/
78+
protected void beforeMerge(OnGoingMerge merge) {}
79+
80+
/**
81+
* A callback allowing for custom logic after an actual merge starts.
82+
*/
83+
protected void afterMerge(OnGoingMerge merge) {}
84+
85+
public synchronized int getMaxMergeCount() {
86+
return config.getMaxMergeCount();
87+
}
88+
89+
@Override
90+
public MergeScheduler clone() {
91+
// Lucene IW makes a clone internally but since we hold on to this instance
92+
// the clone will just be the identity.
93+
return this;
94+
}
95+
96+
protected void handleMergeException(Throwable exc) {
97+
if (exc instanceof MergePolicy.MergeException mergeException) {
98+
throw mergeException;
99+
} else {
100+
throw new MergePolicy.MergeException(exc);
101+
}
102+
}
103+
104+
private void submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge) {
105+
MergeTask mergeTask = mergeTask(mergeSource, merge);
106+
executorService.execute(mergeTask);
107+
}
108+
109+
private MergeTask mergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge) {
110+
return new MergeTask(mergeSource, merge, "TODO");
111+
}
112+
113+
@Override
114+
/** Overridden to route messages to our logger too, in addition to the {@link org.apache.lucene.util.InfoStream} that lucene uses. */
115+
protected boolean verbose() {
116+
if (logger.isTraceEnabled()) {
117+
return true;
118+
}
119+
return super.verbose();
120+
}
121+
122+
@Override
123+
/** Overridden to route messages to our logger too, in addition to the {@link org.apache.lucene.util.InfoStream} that lucene uses. */
124+
protected void message(String message) {
125+
if (logger.isTraceEnabled()) {
126+
logger.trace("{}", message);
127+
}
128+
super.message(message);
129+
}
130+
131+
/**
132+
* Does the actual merge, by calling {@link org.apache.lucene.index.MergeScheduler.MergeSource#merge}
133+
*/
134+
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
135+
mergeSource.merge(merge);
136+
}
137+
138+
protected class MergeTask extends AbstractRunnable implements Comparable<MergeTask> {
139+
private final MergeSource mergeSource;
140+
private final OnGoingMerge onGoingMerge;
141+
private final MergeRateLimiter rateLimiter;
142+
private final String name;
143+
144+
public MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, String name) {
145+
this.mergeSource = mergeSource;
146+
this.onGoingMerge = new OnGoingMerge(merge);
147+
this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
148+
this.name = name;
149+
}
150+
151+
@Override
152+
public int compareTo(MergeTask other) {
153+
// sort smaller merges (per shard) first, so they are completed before larger ones
154+
return Long.compare(onGoingMerge.getMerge().estimatedMergeBytes, other.onGoingMerge.getMerge().estimatedMergeBytes);
155+
}
156+
157+
@Override
158+
public void doRun() {
159+
final long startTimeNS = System.nanoTime();
160+
try {
161+
onGoingMergeRateLimiter.set(this.rateLimiter);
162+
beforeMerge(onGoingMerge);
163+
mergeTracking.mergeStarted(onGoingMerge);
164+
if (verbose()) {
165+
message(String.format(Locale.ROOT, "merge task %s start", getName()));
166+
}
167+
doMerge(mergeSource, onGoingMerge.getMerge());
168+
if (verbose()) {
169+
message(
170+
String.format(
171+
Locale.ROOT,
172+
"merge task %s merge segment [%s] done estSize=%.1f MB (written=%.1f MB) runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s",
173+
getName(),
174+
getSegmentName(onGoingMerge.getMerge()),
175+
bytesToMB(onGoingMerge.getMerge().estimatedMergeBytes),
176+
bytesToMB(rateLimiter.getTotalBytesWritten()),
177+
nsToSec(System.nanoTime() - startTimeNS),
178+
nsToSec(rateLimiter.getTotalStoppedNS()),
179+
nsToSec(rateLimiter.getTotalPausedNS()),
180+
rateToString(rateLimiter.getMBPerSec())));
181+
}
182+
if (verbose()) {
183+
message(String.format(Locale.ROOT, "merge task %s end", getName()));
184+
}
185+
} catch (Throwable exc) {
186+
if (exc instanceof MergePolicy.MergeAbortedException) {
187+
// OK to ignore. This is what Lucene's ConcurrentMergeScheduler does
188+
} else {
189+
handleMergeException(exc);
190+
}
191+
} finally {
192+
try {
193+
afterMerge(onGoingMerge);
194+
} finally {
195+
onGoingMergeRateLimiter.remove();
196+
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
197+
mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS);
198+
}
199+
}
200+
}
201+
202+
@Override
203+
public void onAfter() {
204+
MergePolicy.OneMerge nextMerge;
205+
try {
206+
nextMerge = mergeSource.getNextMerge();
207+
} catch (IllegalStateException e) {
208+
if (verbose()) {
209+
message("merge task poll failed, likely that index writer is failed");
210+
}
211+
return; // ignore exception, we expect the IW failure to be logged elsewhere
212+
}
213+
if (nextMerge != null) {
214+
submitNewMergeTask(mergeSource, nextMerge);
215+
}
216+
}
217+
218+
@Override
219+
public void onFailure(Exception e) {
220+
// doRun already handles exceptions, this is just to be extra defensive from any future code modifications
221+
handleMergeException(e);
222+
}
223+
224+
@Override
225+
public void onRejection(Exception e) {
226+
if (verbose()) {
227+
message(String.format(Locale.ROOT, "merge task [%s] rejected by thread pool, aborting", onGoingMerge.getId()));
228+
}
229+
abortOnGoingMerge();
230+
}
231+
232+
private void abortOnGoingMerge() {
233+
// This would interrupt an IndexWriter if it were actually performing the merge. We just set this here because it seems
234+
// appropriate as we are not going to move forward with the merge.
235+
onGoingMerge.getMerge().setAborted();
236+
// It is fine to mark this merge as finished. Lucene will eventually produce a new merge including this segment even if
237+
// this merge did not actually execute.
238+
mergeSource.onMergeFinished(onGoingMerge.getMerge());
239+
}
240+
241+
private String getName() {
242+
return name;
243+
}
244+
}
245+
246+
private static double nsToSec(long ns) {
247+
return ns / (double) TimeUnit.SECONDS.toNanos(1);
248+
}
249+
250+
private static double bytesToMB(long bytes) {
251+
return bytes / 1024. / 1024.;
252+
}
253+
254+
private static String getSegmentName(MergePolicy.OneMerge merge) {
255+
return merge.getMergeInfo() != null ? merge.getMergeInfo().info.name : "_na_";
256+
}
257+
258+
private static String rateToString(double mbPerSec) {
259+
if (mbPerSec == 0.0) {
260+
return "stopped";
261+
} else if (mbPerSec == Double.POSITIVE_INFINITY) {
262+
return "unlimited";
263+
} else {
264+
return String.format(Locale.ROOT, "%.1f MB/sec", mbPerSec);
265+
}
266+
}
267+
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2824,7 +2824,64 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() {
28242824
}
28252825

28262826
protected ElasticsearchMergeScheduler createMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
2827-
return new EngineMergeScheduler(shardId, indexSettings);
2827+
// return new EngineMergeScheduler(shardId, indexSettings);
2828+
return new ExecutorMergeScheduler(shardId, indexSettings, engineConfig.getThreadPool().executor(ThreadPool.Names.MERGE)) {
2829+
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
2830+
private final AtomicBoolean isThrottling = new AtomicBoolean();
2831+
@Override
2832+
public synchronized void beforeMerge(OnGoingMerge merge) {
2833+
int maxNumMerges = getMaxMergeCount();
2834+
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
2835+
if (isThrottling.getAndSet(true) == false) {
2836+
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
2837+
activateThrottling();
2838+
}
2839+
}
2840+
}
2841+
2842+
@Override
2843+
public synchronized void afterMerge(OnGoingMerge merge) {
2844+
int maxNumMerges = getMaxMergeCount();
2845+
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
2846+
if (isThrottling.getAndSet(false)) {
2847+
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
2848+
deactivateThrottling();
2849+
}
2850+
}
2851+
if (indexWriter.hasPendingMerges() == false
2852+
&& System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
2853+
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer
2854+
// we deadlock on engine#close for instance.
2855+
engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
2856+
@Override
2857+
public void onFailure(Exception e) {
2858+
if (isClosed.get() == false) {
2859+
logger.warn("failed to flush after merge has finished", e);
2860+
} else {
2861+
logger.info("failed to flush after merge has finished during shard close");
2862+
}
2863+
}
2864+
2865+
@Override
2866+
protected void doRun() {
2867+
// if we have no pending merges and we are supposed to flush once merges have finished to
2868+
// free up transient disk usage of the (presumably biggish) segments that were just merged
2869+
flush();
2870+
}
2871+
});
2872+
} else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) {
2873+
// we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change
2874+
// we should execute a flush on the next operation if that's a flush after inactive or indexing a document.
2875+
// we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events.
2876+
shouldPeriodicallyFlushAfterBigMerge.set(true);
2877+
}
2878+
}
2879+
2880+
@Override
2881+
protected void handleMergeException(final Throwable exc) {
2882+
mergeException(exc);
2883+
}
2884+
};
28282885
}
28292886

28302887
private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeScheduler {

server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
145145
false
146146
)
147147
);
148+
result.put(
149+
ThreadPool.Names.MERGE,
150+
new ScalingExecutorBuilder(ThreadPool.Names.MERGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5), false)
151+
);
148152
result.put(
149153
ThreadPool.Names.FORCE_MERGE,
150154
new FixedExecutorBuilder(

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public static class Names {
101101
public static final String WARMER = "warmer";
102102
public static final String SNAPSHOT = "snapshot";
103103
public static final String SNAPSHOT_META = "snapshot_meta";
104+
public static final String MERGE = "merge";
104105
public static final String FORCE_MERGE = "force_merge";
105106
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
106107
public static final String FETCH_SHARD_STORE = "fetch_shard_store";

0 commit comments

Comments
 (0)