Skip to content

Commit c85c69f

Browse files
Return MergeObserver from IndexWriter.forceMergeDeletes() (#15378)
Return MergeObserver from IndexWriter.forceMergeDeletes() IndexWriter.forceMergeDeletes() now returns MergePolicy.MergeObserver instead of void, allowing applications to monitor merge progress and wait for completion. Addresses (#14515).
1 parent d0e7099 commit c85c69f

File tree

5 files changed

+370
-7
lines changed

5 files changed

+370
-7
lines changed

lucene/CHANGES.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,12 @@ API Changes
145145
* GITHUB#15234: Remove unused method which does unsafe XML parsing.
146146
(Uwe Schindler, Isaac David)
147147

148+
* GITHUB#14515: IndexWriter.forceMergeDeletes() now returns MergePolicy.MergeObserver,
149+
allowing applications to monitor merge progress and wait for completion. The observer
150+
provides methods to query merge count, check completion status, and wait synchronously
151+
(with optional timeout) or asynchronously via CompletableFuture. Backward compatible -
152+
existing code that ignores the return value works unchanged. (Salvatore Campagna)
153+
148154
New Features
149155
---------------------
150156
* GITHUB#15328: VectorSimilarityFunction.getValues() now implements doubleVal allowing its

lucene/core/src/java/org/apache/lucene/index/IndexWriter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2221,8 +2221,10 @@ private synchronized boolean maxNumSegmentsMergesPending() {
22212221
* Just like {@link #forceMergeDeletes()}, except you can specify whether the call should block
22222222
* until the operation completes. This is only meaningful with a {@link MergeScheduler} that is
22232223
* able to run merges in background threads.
2224+
*
2225+
* @return a {@link MergePolicy.MergeObserver} to monitor merge progress and wait for completion
22242226
*/
2225-
public void forceMergeDeletes(boolean doWait) throws IOException {
2227+
public MergePolicy.MergeObserver forceMergeDeletes(boolean doWait) throws IOException {
22262228
ensureOpen();
22272229

22282230
flush(true, true);
@@ -2282,6 +2284,7 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
22822284
// NOTE: in the ConcurrentMergeScheduler case, when
22832285
// doWait is false, we can return immediately while
22842286
// background threads accomplish the merging
2287+
return new MergePolicy.MergeObserver(spec);
22852288
}
22862289

22872290
/**
@@ -2296,9 +2299,12 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
22962299
*
22972300
* <p><b>NOTE</b>: this method first flushes a new segment (if there are indexed documents), and
22982301
* applies all buffered deletes.
2302+
*
2303+
* @return a {@link MergePolicy.MergeObserver} to monitor merge progress. Since this method blocks
2304+
* until completion, merges will already be complete when it returns.
22992305
*/
2300-
public void forceMergeDeletes() throws IOException {
2301-
forceMergeDeletes(true);
2306+
public MergePolicy.MergeObserver forceMergeDeletes() throws IOException {
2307+
return forceMergeDeletes(true);
23022308
}
23032309

23042310
/**

lucene/core/src/java/org/apache/lucene/index/MergePolicy.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.EnumMap;
2222
import java.util.List;
23+
import java.util.Locale;
2324
import java.util.Map;
2425
import java.util.Map.Entry;
2526
import java.util.Optional;
@@ -939,4 +940,94 @@ static final class MergeReader {
939940
this.hardLiveDocs = hardLiveDocs;
940941
}
941942
}
943+
944+
/**
945+
* Observer for merge operations returned by {@link IndexWriter#forceMergeDeletes(boolean)}.
946+
* Provides methods to query merge status and wait for completion.
947+
*
948+
* <p>When no merges are needed, {@link #numMerges()} returns 0. In this case, {@link #await()}
949+
* returns {@code true} immediately since there is nothing to wait for.
950+
*
951+
* @lucene.experimental
952+
*/
953+
public static final class MergeObserver {
954+
private final MergePolicy.MergeSpecification spec;
955+
956+
MergeObserver(MergePolicy.MergeSpecification spec) {
957+
this.spec = spec;
958+
}
959+
960+
/**
961+
* Returns the number of merges in this specification.
962+
*
963+
* @return number of merges, or 0 if no merges were scheduled
964+
*/
965+
public int numMerges() {
966+
return spec == null ? 0 : spec.merges.size();
967+
}
968+
969+
/**
970+
* Returns the number of completed merges in this specification. Useful for tracking merge
971+
* progress: {@code numCompletedMerges() / numMerges()}.
972+
*
973+
* @return number of completed merges
974+
*/
975+
public int numCompletedMerges() {
976+
if (spec == null) {
977+
return 0;
978+
}
979+
int completed = 0;
980+
for (OneMerge merge : spec.merges) {
981+
if (merge.mergeCompleted.isDone()) {
982+
completed++;
983+
}
984+
}
985+
return completed;
986+
}
987+
988+
/**
989+
* Waits for all merges in this specification to complete. Returns immediately if no merges were
990+
* scheduled.
991+
*
992+
* @return {@code true} if all merges completed successfully or no merges were needed, {@code
993+
* false} on error
994+
*/
995+
public boolean await() {
996+
return spec == null || spec.await();
997+
}
998+
999+
/**
1000+
* Waits for all merges in this specification to complete, with timeout. Returns immediately if
1001+
* no merges were scheduled.
1002+
*
1003+
* @param timeout maximum time to wait
1004+
* @param unit time unit for timeout
1005+
* @return {@code true} if all merges completed within timeout or no merges were needed, {@code
1006+
* false} on timeout or error
1007+
*/
1008+
public boolean await(long timeout, TimeUnit unit) {
1009+
return spec == null || spec.await(timeout, unit);
1010+
}
1011+
1012+
/**
1013+
* Returns a {@link CompletableFuture} that completes when all merges finish. Returns an
1014+
* already-completed future if no merges were scheduled.
1015+
*
1016+
* @return future that completes when merges finish
1017+
*/
1018+
public CompletableFuture<Void> awaitAsync() {
1019+
return spec == null
1020+
? CompletableFuture.completedFuture(null)
1021+
: spec.getMergeCompletedFutures();
1022+
}
1023+
1024+
@Override
1025+
public String toString() {
1026+
if (spec == null) {
1027+
return "MergeObserver: no merges";
1028+
}
1029+
return String.format(
1030+
Locale.ROOT, "MergeObserver: %d merges\n%s", numMerges(), spec.toString());
1031+
}
1032+
}
9421033
}

0 commit comments

Comments
 (0)