Skip to content

Commit ca095be

Browse files
committed
draft
1 parent 9b6ce86 commit ca095be

File tree

2 files changed

+23
-0
lines changed

2 files changed

+23
-0
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1515
import org.elasticsearch.core.Nullable;
1616
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
17+
import org.elasticsearch.index.merge.OnGoingMerge;
1718
import org.elasticsearch.threadpool.ThreadPool;
1819

1920
import java.util.Comparator;
@@ -73,6 +74,8 @@ public class ThreadPoolMergeExecutorService {
7374
private final int concurrentMergesFloorLimitForThrottling;
7475
private final int concurrentMergesCeilLimitForThrottling;
7576

77+
private volatile MergeEventConsumer mergeEventConsumer;
78+
7679
public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
7780
ThreadPool threadPool,
7881
Settings settings
@@ -278,6 +281,17 @@ public boolean usingMaxTargetIORateBytesPerSec() {
278281
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
279282
}
280283

284+
public void registerMergeEventConsumer(MergeEventConsumer consumer) {
285+
assert this.mergeEventConsumer == null;
286+
this.mergeEventConsumer = consumer;
287+
}
288+
289+
public interface MergeEventConsumer {
290+
void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes);
291+
292+
void onMergeCompleted(OnGoingMerge merge);
293+
}
294+
281295
// exposed for tests
282296
Set<MergeTask> getRunningMergeTasks() {
283297
return runningMergeTasks;

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
6565
private final AtomicLong doneMergeTaskCount = new AtomicLong();
6666
private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1);
6767
private volatile boolean closed = false;
68+
private final MergeMemoryEstimator mergeMemoryEstimator;
6869

6970
public ThreadPoolMergeScheduler(
7071
ShardId shardId,
@@ -449,6 +450,14 @@ long estimatedMergeSize() {
449450
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
450451
}
451452

453+
public long getEstimateMergeMemoryBytes() {
454+
return mergeMemoryEstimator.estimateMergeMemoryBytes(onGoingMerge.getMerge());
455+
}
456+
457+
public OnGoingMerge getOnGoingMerge() {
458+
return onGoingMerge;
459+
}
460+
452461
@Override
453462
public String toString() {
454463
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");

0 commit comments

Comments
 (0)