@@ -43,6 +43,9 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
43
43
private final SoftDeletesPolicy softDeletesPolicy ;
44
44
private final LongSupplier globalCheckpointSupplier ;
45
45
private final Map <IndexCommit , Integer > acquiredIndexCommits ; // Number of references held against each commit point.
46
+ // Index commits internally acquired by the commits listener. We want to track them separately to be able to disregard them
47
+ // when checking for externally acquired index commits that haven't been released
48
+ private final Set <IndexCommit > internallyAcquiredIndexCommits ;
46
49
47
50
interface CommitsListener {
48
51
@@ -72,6 +75,7 @@ interface CommitsListener {
72
75
this .globalCheckpointSupplier = globalCheckpointSupplier ;
73
76
this .commitsListener = commitsListener ;
74
77
this .acquiredIndexCommits = new HashMap <>();
78
+ this .internallyAcquiredIndexCommits = new HashSet <>();
75
79
}
76
80
77
81
@ Override
@@ -114,7 +118,7 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
114
118
this .maxSeqNoOfNextSafeCommit = Long .parseLong (commits .get (keptPosition + 1 ).getUserData ().get (SequenceNumbers .MAX_SEQ_NO ));
115
119
}
116
120
if (commitsListener != null && previousLastCommit != this .lastCommit ) {
117
- newCommit = acquireIndexCommit (false );
121
+ newCommit = acquireIndexCommit (false , true );
118
122
} else {
119
123
newCommit = null ;
120
124
}
@@ -210,15 +214,25 @@ SafeCommitInfo getSafeCommitInfo() {
210
214
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
211
215
*/
212
216
synchronized IndexCommit acquireIndexCommit (boolean acquiringSafeCommit ) {
217
+ return acquireIndexCommit (acquiringSafeCommit , false );
218
+ }
219
+
220
+ private synchronized IndexCommit acquireIndexCommit (boolean acquiringSafeCommit , boolean acquiredInternally ) {
213
221
assert safeCommit != null : "Safe commit is not initialized yet" ;
214
222
assert lastCommit != null : "Last commit is not initialized yet" ;
215
223
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit ;
216
224
acquiredIndexCommits .merge (snapshotting , 1 , Integer ::sum ); // increase refCount
217
- return wrapCommit (snapshotting );
225
+ assert acquiredInternally == false || internallyAcquiredIndexCommits .add (snapshotting )
226
+ : "commit [" + snapshotting + "] already added" ;
227
+ return wrapCommit (snapshotting , acquiredInternally );
218
228
}
219
229
220
230
protected IndexCommit wrapCommit (IndexCommit indexCommit ) {
221
- return new SnapshotIndexCommit (indexCommit );
231
+ return wrapCommit (indexCommit , false );
232
+ }
233
+
234
+ protected IndexCommit wrapCommit (IndexCommit indexCommit , boolean acquiredInternally ) {
235
+ return new SnapshotIndexCommit (indexCommit , acquiredInternally );
222
236
}
223
237
224
238
/**
@@ -227,7 +241,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit) {
227
241
* @return true if the acquired commit can be clean up.
228
242
*/
229
243
synchronized boolean releaseCommit (final IndexCommit acquiredCommit ) {
230
- final IndexCommit releasingCommit = ((SnapshotIndexCommit ) acquiredCommit ).getIndexCommit ();
244
+ final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit ) acquiredCommit ;
245
+ final IndexCommit releasingCommit = snapshotIndexCommit .getIndexCommit ();
231
246
assert acquiredIndexCommits .containsKey (releasingCommit )
232
247
: "Release non-acquired commit;"
233
248
+ "acquired commits ["
@@ -242,6 +257,8 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
242
257
}
243
258
return count - 1 ;
244
259
});
260
+ assert snapshotIndexCommit .acquiredInternally == false || internallyAcquiredIndexCommits .remove (releasingCommit )
261
+ : "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally" ;
245
262
246
263
assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]" ;
247
264
// The commit can be clean up only if no refCount and it is neither the safe commit nor last commit.
@@ -296,10 +313,16 @@ private static Set<String> listOfNewFileNames(IndexCommit previous, IndexCommit
296
313
}
297
314
298
315
/**
299
- * Checks whether the deletion policy is holding on to acquired index commits
316
+ * Checks whether the deletion policy is holding on to externally acquired index commits
300
317
*/
301
- synchronized boolean hasAcquiredIndexCommits () {
302
- return acquiredIndexCommits .isEmpty () == false ;
318
+ synchronized boolean hasAcquiredIndexCommitsForTesting () {
319
+ // We explicitly check only external commits and disregard internal commits acquired by the commits listener
320
+ for (var e : acquiredIndexCommits .entrySet ()) {
321
+ if (internallyAcquiredIndexCommits .contains (e .getKey ()) == false || e .getValue () > 1 ) {
322
+ return true ;
323
+ }
324
+ }
325
+ return false ;
303
326
}
304
327
305
328
/**
@@ -320,8 +343,12 @@ public static String commitDescription(IndexCommit commit) throws IOException {
320
343
* A wrapper of an index commit that prevents it from being deleted.
321
344
*/
322
345
private static class SnapshotIndexCommit extends FilterIndexCommit {
323
- SnapshotIndexCommit (IndexCommit delegate ) {
346
+
347
+ private final boolean acquiredInternally ;
348
+
349
+ SnapshotIndexCommit (IndexCommit delegate , boolean acquiredInternally ) {
324
350
super (delegate );
351
+ this .acquiredInternally = acquiredInternally ;
325
352
}
326
353
327
354
@ Override
0 commit comments