Skip to content

Commit ab1041b

Browse files
authored
Tighten up ref counting on recovery target (elastic#113767)
Today we throw an exception in some places if the `RecoveryTarget` is used after all refs are released, but really (a) it's a bug to hit this exception and (b) the consequences of such a bug are mostly not especially severe. This commit replaces the exception with an assertion, and acquires refs properly in the few places that might not hold a ref already today.
1 parent dc62810 commit ab1041b

File tree

8 files changed

+37
-41
lines changed

8 files changed

+37
-41
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/MultiFileWriter.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public class MultiFileWriter extends AbstractRefCounted implements Releasable {
4242
private final RecoveryState.Index indexState;
4343
private final String tempFilePrefix;
4444
private final Logger logger;
45-
private final Runnable ensureOpen;
4645
private final boolean verifyOutput;
4746

4847
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -52,23 +51,15 @@ public class MultiFileWriter extends AbstractRefCounted implements Releasable {
5251

5352
final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
5453

55-
public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
56-
this(store, indexState, tempFilePrefix, logger, ensureOpen, true);
54+
public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger) {
55+
this(store, indexState, tempFilePrefix, logger, true);
5756
}
5857

59-
public MultiFileWriter(
60-
Store store,
61-
RecoveryState.Index indexState,
62-
String tempFilePrefix,
63-
Logger logger,
64-
Runnable ensureOpen,
65-
boolean verifyOutput
66-
) {
58+
public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, boolean verifyOutput) {
6759
this.store = store;
6860
this.indexState = indexState;
6961
this.tempFilePrefix = tempFilePrefix;
7062
this.logger = logger;
71-
this.ensureOpen = ensureOpen;
7263
this.verifyOutput = verifyOutput;
7364
}
7465

@@ -85,7 +76,7 @@ public void writeFileChunk(StoreFileMetadata fileMetadata, long position, Releas
8576
}
8677

8778
public void writeFile(StoreFileMetadata fileMetadata, long readSnapshotFileBufferSize, InputStream stream) throws IOException {
88-
ensureOpen.run();
79+
assert hasReferences();
8980
assert Transports.assertNotTransportThread("multi_file_writer");
9081

9182
String fileName = fileMetadata.name();
@@ -146,13 +137,13 @@ String getTempNameForFile(String origFile) {
146137
}
147138

148139
public IndexOutput getOpenIndexOutput(String key) {
149-
ensureOpen.run();
140+
assert hasReferences();
150141
return openIndexOutputs.get(key);
151142
}
152143

153144
/** remove and {@link IndexOutput} for a given file. It is the caller's responsibility to close it */
154145
public IndexOutput removeOpenIndexOutputs(String name) {
155-
ensureOpen.run();
146+
assert hasReferences();
156147
return openIndexOutputs.remove(name);
157148
}
158149

@@ -164,7 +155,7 @@ public IndexOutput removeOpenIndexOutputs(String name) {
164155
* at a later stage
165156
*/
166157
public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetadata metadata, Store store) throws IOException {
167-
ensureOpen.run();
158+
assert hasReferences();
168159
String tempFileName = getTempNameForFile(fileName);
169160
if (tempFileNames.containsKey(tempFileName)) {
170161
throw new IllegalStateException("output for file [" + fileName + "] has already been created");
@@ -178,6 +169,7 @@ public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetadata meta
178169

179170
private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position, BytesReference content, boolean lastChunk)
180171
throws IOException {
172+
assert hasReferences();
181173
final String name = fileMetadata.name();
182174
IndexOutput indexOutput;
183175
if (position == 0) {
@@ -242,7 +234,7 @@ protected void closeInternal() {
242234

243235
/** renames all temporary files to their true name, potentially overriding existing files */
244236
public void renameAllTempFiles() throws IOException {
245-
ensureOpen.run();
237+
assert hasReferences();
246238
store.renameTempFilesSafe(tempFileNames);
247239
}
248240

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,8 +401,13 @@ record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, Str
401401
.andThenApply(startingSeqNo -> {
402402
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
403403
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
404-
final var startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
405-
return new StartRecoveryRequestToSend(startRequest, PeerRecoverySourceService.Actions.START_RECOVERY, startRequest);
404+
try {
405+
recoveryTarget.incRef();
406+
final var startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
407+
return new StartRecoveryRequestToSend(startRequest, PeerRecoverySourceService.Actions.START_RECOVERY, startRequest);
408+
} finally {
409+
recoveryTarget.decRef();
410+
}
406411
})
407412
// finally send the start-recovery request
408413
.addListener(toSendListener);
@@ -572,7 +577,14 @@ protected void handleRequest(
572577

573578
@Override
574579
protected CheckedFunction<Void, TransportResponse, Exception> responseMapping(RecoveryTarget recoveryTarget) {
575-
return v -> new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint());
580+
return v -> {
581+
try {
582+
recoveryTarget.incRef();
583+
return new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint());
584+
} finally {
585+
recoveryTarget.decRef();
586+
}
587+
};
576588
}
577589

578590
private void performTranslogOps(

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.apache.lucene.index.CorruptIndexException;
1414
import org.apache.lucene.index.IndexFormatTooNewException;
1515
import org.apache.lucene.index.IndexFormatTooOldException;
16-
import org.elasticsearch.ElasticsearchException;
1716
import org.elasticsearch.ExceptionsHelper;
1817
import org.elasticsearch.action.ActionListener;
1918
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@@ -142,7 +141,7 @@ private void recreateMultiFileWriter() {
142141

143142
private MultiFileWriter createMultiFileWriter() {
144143
final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + ".";
145-
return new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger, this::ensureRefCount);
144+
return new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger);
146145
}
147146

148147
/**
@@ -179,7 +178,7 @@ public ShardId shardId() {
179178
}
180179

181180
public IndexShard indexShard() {
182-
ensureRefCount();
181+
assert hasReferences();
183182
return indexShard;
184183
}
185184

@@ -232,7 +231,7 @@ public Releasable disableRecoveryMonitor() {
232231
}
233232

234233
public Store store() {
235-
ensureRefCount();
234+
assert hasReferences();
236235
return store;
237236
}
238237

@@ -361,14 +360,6 @@ public String toString() {
361360
return shardId + " [" + recoveryId + "]";
362361
}
363362

364-
private void ensureRefCount() {
365-
if (refCount() <= 0) {
366-
throw new ElasticsearchException(
367-
"RecoveryStatus is used but it's refcount is 0. Probably a mismatch between incRef/decRef " + "calls"
368-
);
369-
}
370-
}
371-
372363
/*** Implementation of {@link RecoveryTargetHandler } */
373364

374365
@Override

server/src/test/java/org/elasticsearch/indices/recovery/MultiFileWriterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void testWritesFileWithIncorrectChecksumWithoutVerification() throws IOEx
9090
}
9191

9292
private MultiFileWriter createMultiFileWriter(boolean verifyOutput) {
93-
return new MultiFileWriter(store, mock(RecoveryState.Index.class), "temp_", logger, mock(Runnable.class), verifyOutput);
93+
return new MultiFileWriter(store, mock(RecoveryState.Index.class), "temp_", logger, verifyOutput);
9494
}
9595

9696
private record FileAndMetadata(byte[] bytes, StoreFileMetadata metadata) {}

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public void testSendFiles() throws Throwable {
191191
metas.add(md);
192192
}
193193
Store targetStore = newStore(createTempDir());
194-
MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {});
194+
MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger);
195195
RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
196196
@Override
197197
public void writeFileChunk(
@@ -578,7 +578,7 @@ public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
578578
)
579579
);
580580
Store targetStore = newStore(createTempDir(), false);
581-
MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {});
581+
MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger);
582582
RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
583583
@Override
584584
public void writeFileChunk(

server/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ public void testRenameTempFiles() throws IOException {
3636
indexShard.store(),
3737
indexShard.recoveryState().getIndex(),
3838
"recovery.test.",
39-
logger,
40-
() -> {}
39+
logger
4140
);
4241
try (
4342
IndexOutput indexOutput = multiFileWriter.openAndPutIndexOutput(

server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
*/
99
package org.elasticsearch.recovery;
1010

11-
import org.elasticsearch.ElasticsearchException;
1211
import org.elasticsearch.cluster.node.DiscoveryNode;
12+
import org.elasticsearch.core.Assertions;
1313
import org.elasticsearch.core.TimeValue;
1414
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
1515
import org.elasticsearch.index.shard.IndexShard;
@@ -139,8 +139,10 @@ public void testResetRecovery() throws Exception {
139139
assertEquals(referencesToStore, resetRecovery.store().refCount());
140140
assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget());
141141
assertEquals(recoveryTarget.refCount(), 0);
142-
expectThrows(ElasticsearchException.class, () -> recoveryTarget.store());
143-
expectThrows(ElasticsearchException.class, () -> recoveryTarget.indexShard());
142+
if (Assertions.ENABLED) {
143+
expectThrows(AssertionError.class, recoveryTarget::store);
144+
expectThrows(AssertionError.class, recoveryTarget::indexShard);
145+
}
144146
String resetTempFileName = resetRecovery.getTempNameForFile("foobar");
145147
assertNotEquals(tempFileName, resetTempFileName);
146148
assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget());

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store, ActionLi
740740
mds
741741
) {
742742

743-
final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {});
743+
final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger);
744744
long offset = 0;
745745

746746
@Override

0 commit comments

Comments
 (0)