Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ Improvements

* GITHUB#15124: Use RamUsageEstimator to calculate size for non-accountable queries. (Sagar Upadhyaya)

* GITHUB#14515: IndexWriter.forceMergeDeletes() now returns MergePolicy.MergeObserver,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change can go into 10.4. Let's move this entry to the "API Changes" section under 10.4?

Copy link
Contributor Author

@salvatorecampagna salvatorecampagna Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the PR is targeting main (milestone 11.0.0). According to CONTRIBUTING.md

You should open a pull request against the main branch. Committers will backport it to the maintenance branches once the change is merged into main.

Should I keep the CHANGES.txt entry in 11.0.0 for now, and then it gets moved to 10.4 in a backport PR if needed? Or should I change it to 10.4 now and update the milestone as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add the changes entry in the Lucene version we are targeting. All changes go to main branch regardless, but the ones for a specific target branch (like 10.4 in this case) additionally get backported to 10.x.

So you should just add the entry to 10.4 in this PR itself and remove it from 11.0. After I merge this into main, i'll backport the commit to 10.x branch. Will reach out to you if I need any help with backport.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question for future contributions: How do I know upfront which version a change is targeting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally default to the next minor version. If it's a breaking change or a major change in existing behavior, we defer it for a major version release.

allowing applications to monitor merge progress, wait for completion (synchronously
via await() or asynchronously via CompletableFuture), and inspect individual merges.
Backward compatible - existing code that ignores the return value works unchanged.
(Salvatore Campagna)

Optimizations
---------------------
* GITHUB#14011: Reduce allocation rate in HNSW concurrent merge. (Viliam Durina)
Expand Down
28 changes: 17 additions & 11 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2221,8 +2221,10 @@ private synchronized boolean maxNumSegmentsMergesPending() {
* Just like {@link #forceMergeDeletes()}, except you can specify whether the call should block
* until the operation completes. This is only meaningful with a {@link MergeScheduler} that is
* able to run merges in background threads.
*
* @return a {@link MergePolicy.MergeObserver} to monitor merge progress and wait for completion
*/
public void forceMergeDeletes(boolean doWait) throws IOException {
public MergePolicy.MergeObserver forceMergeDeletes(boolean doWait) throws IOException {
ensureOpen();

flush(true, true);
Expand All @@ -2234,20 +2236,20 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
final MergePolicy mergePolicy = config.getMergePolicy();
final CachingMergeContext cachingMergeContext = new CachingMergeContext(this);
MergePolicy.MergeSpecification spec;
boolean newMergesFound = false;
MergePolicy.MergeObserver observer;
synchronized (this) {
spec = mergePolicy.findForcedDeletesMerges(segmentInfos, cachingMergeContext);
newMergesFound = spec != null;
if (newMergesFound) {
final int numMerges = spec.merges.size();
for (int i = 0; i < numMerges; i++) registerMerge(spec.merges.get(i));
observer = new MergePolicy.MergeObserver(spec);
if (observer.hasNewMerges()) {
final int numMerges = observer.numMerges();
for (int i = 0; i < numMerges; i++) registerMerge(observer.getMerge(i));
}
}

mergeScheduler.merge(mergeSource, MergeTrigger.EXPLICIT);

if (spec != null && doWait) {
final int numMerges = spec.merges.size();
if (observer.hasNewMerges() && doWait) {
final int numMerges = observer.numMerges();
synchronized (this) {
boolean running = true;
while (running) {
Expand All @@ -2263,7 +2265,7 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
// if any of them have hit an exception.
running = false;
for (int i = 0; i < numMerges; i++) {
final MergePolicy.OneMerge merge = spec.merges.get(i);
final MergePolicy.OneMerge merge = observer.getMerge(i);
if (pendingMerges.contains(merge) || runningMerges.contains(merge)) {
running = true;
}
Expand All @@ -2282,6 +2284,7 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
// NOTE: in the ConcurrentMergeScheduler case, when
// doWait is false, we can return immediately while
// background threads accomplish the merging
return observer;
}

/**
Expand All @@ -2296,9 +2299,12 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
*
* <p><b>NOTE</b>: this method first flushes a new segment (if there are indexed documents), and
* applies all buffered deletes.
*
* @return a {@link MergePolicy.MergeObserver} to monitor merge progress. Since this method blocks
* until completion, merges will already be complete when it returns.
*/
public void forceMergeDeletes() throws IOException {
forceMergeDeletes(true);
public MergePolicy.MergeObserver forceMergeDeletes() throws IOException {
return forceMergeDeletes(true);
}

/**
Expand Down
92 changes: 92 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -939,4 +939,96 @@ static final class MergeReader {
this.hardLiveDocs = hardLiveDocs;
}
}

/**
* Observer for merge operations returned by {@link IndexWriter#forceMergeDeletes(boolean)}.
* Provides methods to query merge status and wait for completion.
*
* <p>When no merges are needed, {@link #hasNewMerges()} returns {@code false} and {@link
* #numMerges()} returns 0. In this case, {@link #await()} returns {@code true} immediately since
* there is nothing to wait for.
*
* @lucene.experimental
*/
public static final class MergeObserver {
private final MergePolicy.MergeSpecification spec;

MergeObserver(MergePolicy.MergeSpecification spec) {
this.spec = spec;
}

/**
* Returns the number of merges in this specification.
*
* @return number of merges, or 0 if no merges were scheduled
*/
public int numMerges() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we also expose the no. of completed merges? You will need to run through all the OneMerge objects in merges and check for mergeCompleted. It could be useful to track overall merge progress.

Copy link
Contributor Author

@salvatorecampagna salvatorecampagna Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent suggestion! This is a far better way to provide progress tracking without exposing mutable OneMerge objects. I'll add numCompletedMerges() that iterates through the merges and checks mergeCompleted.isDone().

Users can then track progress as: 100 * numCompletedMerges() / numMerges()

I'll also update toString() to show the completion count for better debugging.

Copy link
Contributor Author

@salvatorecampagna salvatorecampagna Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later I decided not to include completed merges in toString for performance implications with a large number of merges.

return spec == null ? 0 : spec.merges.size();
}

/**
* Returns whether any new merges were scheduled.
*
* @return {@code true} if merges were scheduled, {@code false} if no merges needed
*/
public boolean hasNewMerges() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this naming can get confusing: is it still a "new" merge once the merge is underway, does it count completed ones, etc.? Instead of this API, can we just use numMerges() > 0 ? Or how about an isEmpty api?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, the naming is confusing. "New" is ambiguous: does it mean not yetstarted? Does it include in-progress or completed merges?

numMerges() > 0 is much clearer and eliminates the ambiguity.

I'll remove hasNewMerges() and update all test call sites to use numMerges() > 0 instead.
One less method to maintain and better clarity.

return spec != null;
}

/**
* Waits for all merges in this specification to complete. Returns immediately if no merges were
* scheduled.
*
* @return {@code true} if all merges completed successfully or no merges were needed, {@code
* false} on error
*/
public boolean await() {
return spec == null || spec.await();
}

/**
* Waits for all merges in this specification to complete, with timeout. Returns immediately if
* no merges were scheduled.
*
* @param timeout maximum time to wait
* @param unit time unit for timeout
* @return {@code true} if all merges completed within timeout or no merges were needed, {@code
* false} on timeout or error
*/
public boolean await(long timeout, TimeUnit unit) {
return spec == null || spec.await(timeout, unit);
}

/**
* Returns a {@link CompletableFuture} that completes when all merges finish. Returns an
* already-completed future if no merges were scheduled.
*
* @return future that completes when merges finish
*/
public CompletableFuture<Void> awaitAsync() {
return spec == null
? CompletableFuture.completedFuture(null)
: spec.getMergeCompletedFutures();
}

@Override
public String toString() {
return spec == null ? "MergeObserver: no merges" : spec.toString();
}

/**
* Returns the merge at the specified index. Caller must ensure {@link #hasNewMerges()} returns
* {@code true} and index is within bounds.
*
* @param i merge index (0 to {@link #numMerges()} - 1)
* @return the merge at index i
* @throws IndexOutOfBoundsException if index is invalid or no merges exist
*/
public MergePolicy.OneMerge getMerge(int i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if providing this method is really useful. The caller can't know the "index" location of a OneMerge unless they already have access to the MergeSpec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I don't think that the OneMerge object is immutable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right on both points. My original intent was to provide detailed observability, but as you noted, the caller can't meaningfully use index-based access without already having the MergeSpec. And OneMerge is definitely mutable, so exposing it breaks encapsulation.

The umMerges()/numCompletedMerges() approach you suggested is much better, as it provides progress tracking without exposing internals.

if (spec == null) {
throw new IndexOutOfBoundsException("No merges available");
}
return spec.merges.get(i);
}
}
}
Loading
Loading