Skip to content

Commit ba7ebb5

Browse files
committed
call listener before adding to queue
1 parent 5395c0a commit ba7ebb5

File tree

1 file changed

+7
-17
lines changed

1 file changed

+7
-17
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,6 @@ public class ThreadPoolMergeExecutorService {
7676
private final int concurrentMergesCeilLimitForThrottling;
7777

7878
private final List<MergeEventListener> mergeEventListeners = new CopyOnWriteArrayList<>();
79-
/**
80-
* To ensure that for a given merge {@link org.elasticsearch.index.engine.MergeEventListener#onMergeAborted} or
81-
* {@link org.elasticsearch.index.engine.MergeEventListener#onMergeCompleted} is not called before
82-
* {@link org.elasticsearch.index.engine.MergeEventListener#onMergeQueued}.
83-
*/
84-
private final Object mergeEventsMutex = new Object();
8579

8680
public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
8781
ThreadPool threadPool,
@@ -147,11 +141,11 @@ void reEnqueueBackloggedMergeTask(MergeTask mergeTask) {
147141
}
148142

149143
private void enqueueMergeTask(MergeTask mergeTask) {
150-
synchronized (mergeEventsMutex) {
151-
if (queuedMergeTasks.add(mergeTask)) {
152-
mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes()));
153-
}
154-
}
144+
// To ensure that for a given merge onMergeQueued is called before onMergeAborted or onMergeCompleted, we call onMergeQueued
145+
// before adding the merge task to the queue. Adding to the queue should not fail.
146+
mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes()));
147+
boolean added = queuedMergeTasks.add(mergeTask);
148+
assert added;
155149
}
156150

157151
public boolean allDone() {
@@ -219,9 +213,7 @@ private void runMergeTask(MergeTask mergeTask) {
219213
if (mergeTask.supportsIOThrottling()) {
220214
ioThrottledMergeTasksCount.decrementAndGet();
221215
}
222-
synchronized (mergeEventsMutex) {
223-
mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge()));
224-
}
216+
mergeEventListeners.forEach(l -> l.onMergeCompleted(mergeTask.getOnGoingMerge()));
225217
}
226218
}
227219

@@ -234,9 +226,7 @@ private void abortMergeTask(MergeTask mergeTask) {
234226
if (mergeTask.supportsIOThrottling()) {
235227
ioThrottledMergeTasksCount.decrementAndGet();
236228
}
237-
synchronized (mergeEventsMutex) {
238-
mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge()));
239-
}
229+
mergeEventListeners.forEach(l -> l.onMergeAborted(mergeTask.getOnGoingMerge()));
240230
}
241231
}
242232

0 commit comments

Comments
 (0)