Skip to content

Commit b461baf

Browse files
authored
Skip translog creation and Lucene commits when recovering searchable snapshot shards (#118606)
In order to leverage Lucene N-2 version support for searchable snapshots, we'd like to avoid executing Lucene commits during searchable snapshots shards recovery. This is because Lucene commits require to open an IndexWriter, something that Lucene does not support for N-2 versions. Today when searchable snapshot shards are recovering they create a translog on disk as well as a Lucene commit: - the translog is created as an empty translog with a new UUID and an initial global checkpoint value that is the same as the LOCAL_CHECKPOINT_KEY stored in the last Lucene commit data from the snapshot. - a Lucene commit is executed to associate the translog with the Lucene index by storing new translog UUID in the Lucene commit data. - later during recovery, the replication tracker is initialized with a global checkpoint value equals to the LOCAL_CHECKPOINT_KEY stored in the Lucene commit. We can skip the creation of the translog because searchable snapshot shard do not need one, and it's only use to store the local checkpoint locally to be read later during recovery. If we don't have a translog then we don't need to associate it with the Lucene index, so we can skip the commit too. This change introduce an hasTranslog method that is used to know when it is safe to NOT create a translog, in which case the global checkpoint is read from the last Lucene commit during primary shard recovery from snapshot, peer-recovery and recovery from existing store. In case an existing translog exist on disk, it will be cleaned up. They are also few discoveries around some assertions introduced with snapshot based recoveries, as well as a cached estimation of the size of directories that was refreshed due to Lucene commit but now requires to be "marked as stale".
1 parent cf7edbb commit b461baf

File tree

15 files changed

+175
-70
lines changed

15 files changed

+175
-70
lines changed

server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public final class NoOpEngine extends ReadOnlyEngine {
4848
public NoOpEngine(EngineConfig config) {
4949
this(
5050
config,
51-
config.isPromotableToPrimary() ? null : new TranslogStats(0, 0, 0, 0, 0),
51+
config.isPromotableToPrimary() && config.getTranslogConfig().hasTranslog() ? null : new TranslogStats(0, 0, 0, 0, 0),
5252
config.isPromotableToPrimary()
5353
? null
5454
: new SeqNoStats(

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public class ReadOnlyEngine extends Engine {
9898
public ReadOnlyEngine(
9999
EngineConfig config,
100100
SeqNoStats seqNoStats,
101-
TranslogStats translogStats,
101+
@Nullable TranslogStats translogStats,
102102
boolean obtainLock,
103103
Function<DirectoryReader, DirectoryReader> readerWrapperFunction,
104104
boolean requireCompleteHistory,
@@ -251,6 +251,7 @@ private static SeqNoStats buildSeqNoStats(EngineConfig config, SegmentInfos info
251251
}
252252

253253
private static TranslogStats translogStats(final EngineConfig config, final SegmentInfos infos) throws IOException {
254+
assert config.getTranslogConfig().hasTranslog();
254255
final String translogUuid = infos.getUserData().get(Translog.TRANSLOG_UUID_KEY);
255256
if (translogUuid == null) {
256257
throw new IllegalStateException("commit doesn't contain translog unique id");

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,6 +1487,27 @@ public void flush(FlushRequest request, ActionListener<Boolean> listener) {
14871487
});
14881488
}
14891489

1490+
/**
1491+
* @return true the shard has a translog.
1492+
*/
1493+
public boolean hasTranslog() {
1494+
return translogConfig.hasTranslog();
1495+
}
1496+
1497+
/**
1498+
* Reads the global checkpoint from the translog checkpoint file if the shard has a translog. Otherwise, reads the local checkpoint from
1499+
* the provided commit user data.
1500+
*
1501+
* @return the global checkpoint to use for recovery
1502+
* @throws IOException
1503+
*/
1504+
public long readGlobalCheckpointForRecovery(Map<String, String> commitUserData) throws IOException {
1505+
if (hasTranslog()) {
1506+
return Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), commitUserData.get(Translog.TRANSLOG_UUID_KEY));
1507+
}
1508+
return Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
1509+
}
1510+
14901511
/**
14911512
* checks and removes translog files that no longer need to be retained. See
14921513
* {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details
@@ -1859,8 +1880,7 @@ public void recoverLocallyUpToGlobalCheckpoint(ActionListener<Long> recoveryStar
18591880
}
18601881
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
18611882
try {
1862-
final var translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
1863-
final var globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
1883+
final var globalCheckpoint = readGlobalCheckpointForRecovery(store.readLastCommittedSegmentsInfo().getUserData());
18641884
final var safeCommit = store.findSafeIndexCommit(globalCheckpoint);
18651885
ActionListener.run(recoveryStartingSeqNoListener.delegateResponse((l, e) -> {
18661886
logger.debug(() -> format("failed to recover shard locally up to global checkpoint %s", globalCheckpoint), e);
@@ -2084,8 +2104,7 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
20842104
// we have to set it before we open an engine and recover from the translog because
20852105
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
20862106
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
2087-
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
2088-
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
2107+
final long globalCheckpoint = readGlobalCheckpointForRecovery(store.readLastCommittedSegmentsInfo().getUserData());
20892108
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
20902109
} else {
20912110
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckPointIfUnpromotable, "from CleanFilesRequest");
@@ -2162,7 +2181,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21622181
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
21632182
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
21642183
onSettingsChanged();
2165-
assert assertSequenceNumbersInCommit();
2184+
assert assertLastestCommitUserData();
21662185
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
21672186
checkAndCallWaitForEngineOrClosedShardListeners();
21682187
}
@@ -2183,9 +2202,13 @@ private Engine createEngine(EngineConfig config) {
21832202
}
21842203
}
21852204

2186-
private boolean assertSequenceNumbersInCommit() throws IOException {
2205+
/**
2206+
* Asserts that the latest Lucene commit contains expected information about sequence numbers or ES version.
2207+
*/
2208+
private boolean assertLastestCommitUserData() throws IOException {
21872209
final SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(store.directory());
21882210
final Map<String, String> userData = segmentCommitInfos.getUserData();
2211+
// Ensure sequence numbers are present in commit data
21892212
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
21902213
assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
21912214
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
@@ -2195,10 +2218,16 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
21952218
+ "] is different than engine ["
21962219
+ getHistoryUUID()
21972220
+ "]";
2221+
21982222
assert userData.containsKey(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)
21992223
: "opening index which was created post 5.5.0 but " + Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit";
2224+
2225+
// From 7.16.0, the ES version is included in the Lucene commit user data as well as in the snapshot metadata in the repository.
2226+
// This is used during primary failover to detect if the latest snapshot can be used to recover the new primary, because the failed
2227+
// primary may have created new segments in a more recent Lucene version, that may have been later snapshotted, meaning that the
2228+
// snapshotted files cannot be recovered on a node with a less recent Lucene version. Note that for versions <= 7.15 this assertion
2229+
// relies in the previous minor having a different lucene version.
22002230
final org.apache.lucene.util.Version commitLuceneVersion = segmentCommitInfos.getCommitLuceneVersion();
2201-
// This relies in the previous minor having another lucene version
22022231
assert commitLuceneVersion.onOrAfter(RecoverySettings.SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION.luceneVersion()) == false
22032232
|| userData.containsKey(Engine.ES_VERSION)
22042233
&& Engine.readIndexVersion(userData.get(Engine.ES_VERSION)).onOrBefore(IndexVersion.current())

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ void recoverFromRepository(final IndexShard indexShard, Repository repository, A
315315
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
316316
assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType;
317317
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource();
318-
restore(indexShard, repository, recoverySource, recoveryListener(indexShard, listener).map(ignored -> true));
318+
recoverFromRepository(indexShard, repository, recoverySource, recoveryListener(indexShard, listener).map(ignored -> true));
319319
} else {
320320
listener.onResponse(false);
321321
}
@@ -459,7 +459,7 @@ private void internalRecoverFromStore(IndexShard indexShard, ActionListener<Void
459459
}
460460
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
461461
assert indexShouldExists;
462-
bootstrap(indexShard, store);
462+
bootstrap(indexShard);
463463
writeEmptyRetentionLeasesFile(indexShard);
464464
} else if (indexShouldExists) {
465465
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
@@ -523,12 +523,13 @@ private static void addRecoveredFileDetails(SegmentInfos si, Store store, Recove
523523
/**
524524
* Restores shard from {@link SnapshotRecoverySource} associated with this shard in routing table
525525
*/
526-
private void restore(
526+
private void recoverFromRepository(
527527
IndexShard indexShard,
528528
Repository repository,
529529
SnapshotRecoverySource restoreSource,
530530
ActionListener<Void> outerListener
531531
) {
532+
assert indexShard.shardRouting.primary() : "only primary shards can recover from snapshot";
532533
logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource());
533534

534535
record ShardAndIndexIds(IndexId indexId, ShardId shardId) {}
@@ -538,13 +539,13 @@ record ShardAndIndexIds(IndexId indexId, ShardId shardId) {}
538539
.newForked(indexShard::preRecovery)
539540

540541
.<ShardAndIndexIds>andThen(shardAndIndexIdsListener -> {
541-
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
542542
if (restoreSource == null) {
543543
throw new IndexShardRestoreFailedException(shardId, "empty restore source");
544544
}
545545
if (logger.isTraceEnabled()) {
546546
logger.trace("[{}] restoring shard [{}]", restoreSource.snapshot(), shardId);
547547
}
548+
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
548549
translogState.totalOperations(0);
549550
translogState.totalOperationsOnStart(0);
550551
indexShard.prepareForIndexRecovery();
@@ -588,9 +589,7 @@ record ShardAndIndexIds(IndexId indexId, ShardId shardId) {}
588589

589590
.<Void>andThen(l -> {
590591
indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
591-
final Store store = indexShard.store();
592-
bootstrap(indexShard, store);
593-
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
592+
bootstrap(indexShard);
594593
writeEmptyRetentionLeasesFile(indexShard);
595594
indexShard.openEngineAndRecoverFromTranslog(l);
596595
})
@@ -610,19 +609,37 @@ record ShardAndIndexIds(IndexId indexId, ShardId shardId) {}
610609
}));
611610
}
612611

612+
/**
613+
* @deprecated use {@link #bootstrap(IndexShard)} instead
614+
*/
615+
@Deprecated(forRemoval = true)
613616
public static void bootstrap(final IndexShard indexShard, final Store store) throws IOException {
614-
if (indexShard.indexSettings.getIndexMetadata().isSearchableSnapshot() == false) {
615-
// not bootstrapping new history for searchable snapshots (which are read-only) allows sequence-number based peer recoveries
617+
assert indexShard.store() == store;
618+
bootstrap(indexShard);
619+
}
620+
621+
private static void bootstrap(final IndexShard indexShard) throws IOException {
622+
assert indexShard.routingEntry().primary();
623+
final var store = indexShard.store();
624+
store.incRef();
625+
try {
626+
final var translogLocation = indexShard.shardPath().resolveTranslog();
627+
if (indexShard.hasTranslog() == false) {
628+
Translog.deleteAll(translogLocation);
629+
return;
630+
}
616631
store.bootstrapNewHistory();
632+
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
633+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
634+
final String translogUUID = Translog.createEmptyTranslog(
635+
translogLocation,
636+
localCheckpoint,
637+
indexShard.shardId(),
638+
indexShard.getPendingPrimaryTerm()
639+
);
640+
store.associateIndexWithNewTranslog(translogUUID);
641+
} finally {
642+
store.decRef();
617643
}
618-
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
619-
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
620-
final String translogUUID = Translog.createEmptyTranslog(
621-
indexShard.shardPath().resolveTranslog(),
622-
localCheckpoint,
623-
indexShard.shardId(),
624-
indexShard.getPendingPrimaryTerm()
625-
);
626-
store.associateIndexWithNewTranslog(translogUUID);
627644
}
628645
}

server/src/main/java/org/elasticsearch/index/store/ByteSizeCachingDirectory.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.index.store;
1111

1212
import org.apache.lucene.store.Directory;
13+
import org.apache.lucene.store.FilterDirectory;
1314
import org.apache.lucene.store.IOContext;
1415
import org.apache.lucene.store.IndexOutput;
1516
import org.elasticsearch.common.lucene.store.FilterIndexOutput;
@@ -19,7 +20,7 @@
1920
import java.io.IOException;
2021
import java.io.UncheckedIOException;
2122

22-
final class ByteSizeCachingDirectory extends ByteSizeDirectory {
23+
public final class ByteSizeCachingDirectory extends ByteSizeDirectory {
2324

2425
private static class SizeAndModCount {
2526
final long size;
@@ -174,9 +175,29 @@ public void deleteFile(String name) throws IOException {
174175
try {
175176
super.deleteFile(name);
176177
} finally {
177-
synchronized (this) {
178-
modCount++;
178+
markEstimatedSizeAsStale();
179+
}
180+
}
181+
182+
/**
183+
* Mark the cached size as stale so that it is guaranteed to be refreshed the next time.
184+
*/
185+
public void markEstimatedSizeAsStale() {
186+
synchronized (this) {
187+
modCount++;
188+
}
189+
}
190+
191+
public static ByteSizeCachingDirectory unwrapDirectory(Directory dir) {
192+
while (dir != null) {
193+
if (dir instanceof ByteSizeCachingDirectory) {
194+
return (ByteSizeCachingDirectory) dir;
195+
} else if (dir instanceof FilterDirectory) {
196+
dir = ((FilterDirectory) dir).getDelegate();
197+
} else {
198+
dir = null;
179199
}
180200
}
201+
return null;
181202
}
182203
}

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1449,6 +1449,7 @@ public void bootstrapNewHistory() throws IOException {
14491449
* @see SequenceNumbers#MAX_SEQ_NO
14501450
*/
14511451
public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException {
1452+
assert indexSettings.getIndexMetadata().isSearchableSnapshot() == false;
14521453
metadataLock.writeLock().lock();
14531454
try (IndexWriter writer = newTemporaryAppendingIndexWriter(directory, null)) {
14541455
final Map<String, String> map = new HashMap<>();
@@ -1572,6 +1573,7 @@ private IndexWriter newTemporaryEmptyIndexWriter(final Directory dir, final Vers
15721573
}
15731574

15741575
private IndexWriterConfig newTemporaryIndexWriterConfig() {
1576+
assert indexSettings.getIndexMetadata().isSearchableSnapshot() == false;
15751577
// this config is only used for temporary IndexWriter instances, used to initialize the index or update the commit data,
15761578
// so we don't want any merges to happen
15771579
var iwc = indexWriterConfigWithNoMerging(null).setSoftDeletesField(Lucene.SOFT_DELETES_FIELD).setCommitOnClose(false);

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ public Translog(
220220
}
221221
}
222222

223+
public static void deleteAll(Path translogLocation) throws IOException {
224+
IOUtils.rm(translogLocation);
225+
}
226+
223227
/** recover all translog files found on disk */
224228
private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws IOException {
225229
boolean success = false;

server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,13 @@ public OperationListener getOperationListener() {
143143
public boolean fsync() {
144144
return fsync;
145145
}
146+
147+
/**
148+
* @return {@code true} if the configuration allows the Translog files to exist, {@code false} otherwise. In the case there is no
149+
* translog, the shard is not writeable.
150+
*/
151+
public boolean hasTranslog() {
152+
// Expect no translog files to exist for searchable snapshots
153+
return false == indexSettings.getIndexMetadata().isSearchableSnapshot();
154+
}
146155
}

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,7 @@
4949
import org.elasticsearch.index.shard.ShardId;
5050
import org.elasticsearch.index.shard.ShardLongFieldRange;
5151
import org.elasticsearch.index.shard.ShardNotFoundException;
52-
import org.elasticsearch.index.shard.StoreRecovery;
5352
import org.elasticsearch.index.store.Store;
54-
import org.elasticsearch.index.translog.Translog;
5553
import org.elasticsearch.index.translog.TranslogCorruptedException;
5654
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
5755
import org.elasticsearch.tasks.Task;
@@ -385,15 +383,8 @@ record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, Str
385383
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
386384
indexShard.prepareForIndexRecovery();
387385
if (indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot()) {
388-
// for searchable snapshots, peer recovery is treated similarly to recovery from snapshot
386+
// for archives indices mounted as searchable snapshots, we need to call this
389387
indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
390-
final Store store = indexShard.store();
391-
store.incRef();
392-
try {
393-
StoreRecovery.bootstrap(indexShard, store);
394-
} finally {
395-
store.decRef();
396-
}
397388
}
398389
indexShard.recoverLocallyUpToGlobalCheckpoint(ActionListener.assertOnce(l));
399390
})
@@ -488,8 +479,8 @@ public static StartRecoveryRequest getStartRecoveryRequest(
488479
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene
489480
// index.
490481
try {
491-
final String expectedTranslogUUID = metadataSnapshot.commitUserData().get(Translog.TRANSLOG_UUID_KEY);
492-
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
482+
final long globalCheckpoint = recoveryTarget.indexShard()
483+
.readGlobalCheckpointForRecovery(metadataSnapshot.commitUserData());
493484
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
494485
} catch (IOException | TranslogCorruptedException e) {
495486
logGlobalCheckpointWarning(logger, startingSeqNo, e);

0 commit comments

Comments
 (0)