Skip to content

Commit 12c32fa

Browse files
authored
Increase store ref before snapshotting index commit (#84776) (#85112) (#85365)
Snapshotted commits should also hold a reference to the store, so they are always usable; otherwise, callers need to manage the store's references manually. This change applies only to InternalEngine as we already do this in ReadOnlyEngine.
1 parent eac9031 commit 12c32fa

File tree

5 files changed

+55
-28
lines changed

5 files changed

+55
-28
lines changed

docs/changelog/84776.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 84776
2+
summary: Increase store ref before snapshotting index commit
3+
area: Engine
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2328,14 +2328,40 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En
23282328
flush(false, true);
23292329
logger.trace("finish flush for snapshot");
23302330
}
2331-
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
2332-
return new Engine.IndexCommitRef(lastCommit, () -> releaseIndexCommit(lastCommit));
2331+
store.incRef();
2332+
boolean success = false;
2333+
try {
2334+
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
2335+
final IndexCommitRef commitRef = new IndexCommitRef(
2336+
lastCommit,
2337+
() -> IOUtils.close(() -> releaseIndexCommit(lastCommit), store::decRef)
2338+
);
2339+
success = true;
2340+
return commitRef;
2341+
} finally {
2342+
if (success == false) {
2343+
store.decRef();
2344+
}
2345+
}
23332346
}
23342347

23352348
@Override
23362349
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
2337-
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
2338-
return new Engine.IndexCommitRef(safeCommit, () -> releaseIndexCommit(safeCommit));
2350+
store.incRef();
2351+
boolean success = false;
2352+
try {
2353+
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
2354+
final IndexCommitRef commitRef = new IndexCommitRef(
2355+
safeCommit,
2356+
() -> IOUtils.close(() -> releaseIndexCommit(safeCommit), store::decRef)
2357+
);
2358+
success = true;
2359+
return commitRef;
2360+
} finally {
2361+
if (success == false) {
2362+
store.decRef();
2363+
}
2364+
}
23392365
}
23402366

23412367
private void releaseIndexCommit(IndexCommit snapshot) throws IOException {

server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,16 @@
2323
import java.io.IOException;
2424
import java.util.Collection;
2525
import java.util.Set;
26-
import java.util.concurrent.atomic.AtomicBoolean;
2726

2827
final class LocalShardSnapshot implements Closeable {
2928
private final IndexShard shard;
3029
private final Store store;
3130
private final Engine.IndexCommitRef indexCommit;
32-
private final AtomicBoolean closed = new AtomicBoolean(false);
3331

3432
LocalShardSnapshot(IndexShard shard) {
3533
this.shard = shard;
36-
store = shard.store();
37-
store.incRef();
38-
boolean success = false;
39-
try {
40-
indexCommit = shard.acquireLastIndexCommit(true);
41-
success = true;
42-
} finally {
43-
if (success == false) {
44-
store.decRef();
45-
}
46-
}
34+
this.store = shard.store();
35+
this.indexCommit = shard.acquireLastIndexCommit(true);
4736
}
4837

4938
Index getIndex() {
@@ -117,13 +106,7 @@ public Set<String> getPendingDeletions() throws IOException {
117106

118107
@Override
119108
public void close() throws IOException {
120-
if (closed.compareAndSet(false, true)) {
121-
try {
122-
indexCommit.close();
123-
} finally {
124-
store.decRef();
125-
}
126-
}
109+
indexCommit.close();
127110
}
128111

129112
IndexMetadata getIndexMetadata() {

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6105,6 +6105,7 @@ public void testAcquireIndexCommit() throws Exception {
61056105
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
61066106
final Engine.IndexCommitRef snapshot;
61076107
final boolean closeSnapshotBeforeEngine = randomBoolean();
6108+
final int expectedDocs;
61086109
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
61096110
int numDocs = between(1, 20);
61106111
for (int i = 0; i < numDocs; i++) {
@@ -6120,6 +6121,7 @@ public void testAcquireIndexCommit() throws Exception {
61206121
} else {
61216122
snapshot = engine.acquireLastIndexCommit(flushFirst);
61226123
}
6124+
expectedDocs = flushFirst && safeCommit == false ? numDocs : 0;
61236125
int moreDocs = between(1, 20);
61246126
for (int i = 0; i < moreDocs; i++) {
61256127
index(engine, numDocs + i);
@@ -6128,7 +6130,7 @@ public void testAcquireIndexCommit() throws Exception {
61286130
engine.flush();
61296131
// check that we can still read the commit that we captured
61306132
try (IndexReader reader = DirectoryReader.open(snapshot.getIndexCommit())) {
6131-
assertThat(reader.numDocs(), equalTo(flushFirst && safeCommit == false ? numDocs : 0));
6133+
assertThat(reader.numDocs(), equalTo(expectedDocs));
61326134
}
61336135
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2));
61346136

@@ -6140,8 +6142,17 @@ public void testAcquireIndexCommit() throws Exception {
61406142
}
61416143
}
61426144

6145+
if (randomBoolean()) {
6146+
IOUtils.close(store);
6147+
}
6148+
61436149
if (closeSnapshotBeforeEngine == false) {
6144-
snapshot.close(); // shouldn't throw AlreadyClosedException
6150+
// check that we can still read the commit that we captured
6151+
try (DirectoryReader reader = DirectoryReader.open(snapshot.getIndexCommit())) {
6152+
assertThat(reader.numDocs(), equalTo(expectedDocs));
6153+
} finally {
6154+
snapshot.close();
6155+
}
61456156
}
61466157
}
61476158

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,10 @@ public void testGetSessionReader() throws IOException {
167167

168168
byte[] expectedBytes = new byte[(int) fileMetadata.length()];
169169
byte[] actualBytes = new byte[(int) fileMetadata.length()];
170-
Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit();
171-
try (IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE)) {
170+
try (
171+
Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit();
172+
IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE)
173+
) {
172174
indexInput.seek(0);
173175
indexInput.readBytes(expectedBytes, 0, (int) fileMetadata.length());
174176
}

0 commit comments

Comments
 (0)