diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 5633bd8b89e1e..9dc500eef26fd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -443,7 +443,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new EnableAllocationDecider(clusterSettings)); addAllocationDecider(deciders, new IndexVersionAllocationDecider()); addAllocationDecider(deciders, new NodeVersionAllocationDecider()); - addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); + addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider(settings)); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); addAllocationDecider(deciders, new NodeShutdownAllocationDecider()); addAllocationDecider(deciders, new NodeReplacementAllocationDecider()); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java index 03ae7c6f1d32c..455abe2c8917e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java @@ -14,10 +14,13 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.FixForMultiProject; import java.util.Objects; +import static org.elasticsearch.snapshots.SnapshotsService.STATELESS_SNAPSHOT_ENABLED_SETTING_NAME; + /** * This {@link org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider} prevents shards that * are currently been snapshotted to be moved to other nodes. @@ -25,6 +28,11 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider { public static final String NAME = "snapshot_in_progress"; + private final boolean statelessSnapshotEnabled; + + public SnapshotInProgressAllocationDecider(Settings settings) { + this.statelessSnapshotEnabled = settings.getAsBoolean(STATELESS_SNAPSHOT_ENABLED_SETTING_NAME, false); + } /** * Returns a {@link Decision} whether the given shard routing can be @@ -50,10 +58,19 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing return canAllocate(shardRouting, node, allocation); } + private static final Decision YES_STATELESS_SNAPSHOT_ENABLED = Decision.single( + Decision.Type.YES, + NAME, + "stateless snapshot is enabled" + ); private static final Decision YES_NOT_RUNNING = Decision.single(Decision.Type.YES, NAME, "no snapshots are currently running"); private static final Decision YES_NOT_SNAPSHOTTED = Decision.single(Decision.Type.YES, NAME, "the shard is not being snapshotted"); - private static Decision canMove(ShardRouting shardRouting, RoutingAllocation allocation) { + private Decision canMove(ShardRouting shardRouting, RoutingAllocation allocation) { + if (statelessSnapshotEnabled) { + return YES_STATELESS_SNAPSHOT_ENABLED; + } + if (allocation.isSimulating()) { return allocation.decision(Decision.YES, NAME, "allocation is always enabled when simulating"); } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index d2fd9b8ad0b58..3816fe7eaaaac 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -187,8 +187,10 @@ import org.elasticsearch.plugins.internal.RestExtension; import org.elasticsearch.plugins.internal.SettingsExtension; import org.elasticsearch.readiness.ReadinessService; +import org.elasticsearch.repositories.LocalPrimarySnapshotShardContextFactory; import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.SnapshotShardContextFactory; import org.elasticsearch.reservedstate.ReservedClusterStateHandler; import org.elasticsearch.reservedstate.ReservedProjectStateHandler; import org.elasticsearch.reservedstate.ReservedStateHandlerProvider; @@ -251,6 +253,7 @@ import static java.util.Collections.newSetFromMap; import static java.util.function.Predicate.not; import static org.elasticsearch.core.Types.forciblyCast; +import static org.elasticsearch.snapshots.SnapshotsService.STATELESS_SNAPSHOT_ENABLED_SETTING_NAME; /** * Class uses to perform all the operations needed to construct a {@link Node} instance. @@ -1128,6 +1131,18 @@ public Map queryFields() { ); final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule); + final boolean statelessSnapshotEnabled = settings.getAsBoolean(STATELESS_SNAPSHOT_ENABLED_SETTING_NAME, false); + final SnapshotShardContextFactory snapshotShardContextFactory; + if (statelessSnapshotEnabled) { + snapshotShardContextFactory = pluginsService.loadSingletonServiceProvider(SnapshotShardContextFactory.class, () -> { + throw new IllegalStateException( + STATELESS_SNAPSHOT_ENABLED_SETTING_NAME + " is enabled, but no SnapshotShardContextFactory is registered" + ); + }); + } else { + snapshotShardContextFactory = new LocalPrimarySnapshotShardContextFactory(indicesService); + } + SnapshotsService snapshotsService = new SnapshotsService( settings, clusterService, @@ -1137,14 +1152,16 @@ public Map queryFields() { transportService, actionModule.getActionFilters(), systemIndices, - projectResolver.supportsMultipleProjects() + projectResolver.supportsMultipleProjects(), + statelessSnapshotEnabled ); SnapshotShardsService snapshotShardsService = new SnapshotShardsService( settings, clusterService, repositoriesService, transportService, - indicesService + indicesService, + snapshotShardContextFactory ); actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedRepositoryAction(repositoriesService)); diff --git a/server/src/main/java/org/elasticsearch/repositories/LocalPrimarySnapshotShardContext.java b/server/src/main/java/org/elasticsearch/repositories/LocalPrimarySnapshotShardContext.java new file mode 100644 index 0000000000000..105d0ceae46d4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/LocalPrimarySnapshotShardContext.java @@ -0,0 +1,198 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.snapshots.AbortedSnapshotException; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.IOException; +import java.util.Collection; + +/** + * Context holding the state for creating a shard snapshot via {@link Repository#snapshotShard(SnapshotShardContext)}. + * Wraps a {@link org.elasticsearch.index.engine.Engine.IndexCommitRef} that is released once this instances is completed by invoking + * either its {@link #onResponse(ShardSnapshotResult)} or {@link #onFailure(Exception)} callback. + */ +public final class LocalPrimarySnapshotShardContext extends SnapshotShardContext { + + private static final Logger logger = LogManager.getLogger(LocalPrimarySnapshotShardContext.class); + + private final Store store; + private final MapperService mapperService; + private final SnapshotIndexCommit commitRef; + + /** + * @param store store to be snapshotted + * @param mapperService the shards mapper service + * @param snapshotId snapshot id + * @param indexId id for the index being snapshotted + * @param commitRef commit point reference + * @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used + * to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier + * snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit} + * @param snapshotStatus snapshot status + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param snapshotStartTime start time of the snapshot found in + * {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#startTime()} + * @param listener listener invoked on completion + */ + public LocalPrimarySnapshotShardContext( + Store store, + MapperService mapperService, + SnapshotId snapshotId, + IndexId indexId, + SnapshotIndexCommit commitRef, + @Nullable String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + IndexVersion repositoryMetaVersion, + final long snapshotStartTime, + ActionListener listener + ) { + super( + snapshotId, + indexId, + shardStateIdentifier, + snapshotStatus, + repositoryMetaVersion, + snapshotStartTime, + commitRef.closingBefore(listener) + ); + this.store = store; + this.mapperService = mapperService; + this.commitRef = commitRef; + } + + @Override + public ShardId shardId() { + return store.shardId(); + } + + @Override + public Store store() { + return store; + } + + @Override + public MapperService mapperService() { + return mapperService; + } + + @Override + public IndexCommit indexCommit() { + return commitRef.indexCommit(); + } + + @Override + public Releasable withCommitRef() { + status().ensureNotAborted(); // check this first to avoid acquiring a ref when aborted even if refs are available + if (commitRef.tryIncRef()) { + return Releasables.releaseOnce(commitRef::decRef); + } else { + status().ensureNotAborted(); + assert false : "commit ref closed early in state " + status(); + throw new IndexShardSnapshotFailedException(shardId(), "Store got closed concurrently"); + } + } + + @Override + public boolean isSearchableSnapshot() { + return store.indexSettings().getIndexMetadata().isSearchableSnapshot(); + } + + @Override + public Store.MetadataSnapshot metadataSnapshot() { + final IndexCommit snapshotIndexCommit = indexCommit(); + logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId(), snapshotId(), snapshotIndexCommit); + try { + return store.getMetadata(snapshotIndexCommit); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId(), "Failed to get store file metadata", e); + } + } + + @Override + public Collection fileNames() { + final IndexCommit snapshotIndexCommit = indexCommit(); + try { + return snapshotIndexCommit.getFileNames(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId(), "Failed to get store file names", e); + } + } + + @Override + public boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { + if (store.tryIncRef()) { + try (IndexInput indexInput = store.openVerifyingInput(fileInfo.physicalName(), IOContext.READONCE, fileInfo.metadata())) { + final byte[] tmp = new byte[Math.toIntExact(fileInfo.metadata().length())]; + indexInput.readBytes(tmp, 0, tmp.length); + assert fileInfo.metadata().hash().bytesEquals(new BytesRef(tmp)); + } catch (IOException e) { + throw new AssertionError(e); + } finally { + store.decRef(); + } + } else { + try { + status().ensureNotAborted(); + assert false : "if the store is already closed we must have been aborted"; + } catch (Exception e) { + assert e instanceof AbortedSnapshotException : e; + } + } + return true; + } + + @Override + public void failStoreIfCorrupted(Exception e) { + if (Lucene.isCorruptionException(e)) { + try { + store.markStoreCorrupted((IOException) e); + } catch (IOException inner) { + inner.addSuppressed(e); + logger.warn("store cannot be marked as corrupted", inner); + } + } + } + + @Override + public SnapshotShardContext.FileReader fileReader(String file, StoreFileMetadata metadata) throws IOException { + Releasable commitRefReleasable = null; + IndexInput indexInput = null; + try { + commitRefReleasable = withCommitRef(); + indexInput = store.openVerifyingInput(file, IOContext.DEFAULT, metadata); + return new IndexInputFileReader(commitRefReleasable, indexInput); + } catch (Exception e) { + IOUtils.close(e, indexInput, commitRefReleasable); + throw e; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/LocalPrimarySnapshotShardContextFactory.java b/server/src/main/java/org/elasticsearch/repositories/LocalPrimarySnapshotShardContextFactory.java new file mode 100644 index 0000000000000..5aec229b9919d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/LocalPrimarySnapshotShardContextFactory.java @@ -0,0 +1,137 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.Strings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.function.Consumer; + +import static org.elasticsearch.snapshots.SnapshotShardsService.getShardStateId; + +public class LocalPrimarySnapshotShardContextFactory implements SnapshotShardContextFactory { + + private static final Logger logger = LogManager.getLogger(LocalPrimarySnapshotShardContextFactory.class); + + private final IndicesService indicesService; + + public LocalPrimarySnapshotShardContextFactory(IndicesService indicesService) { + this.indicesService = indicesService; + } + + @Override + public void asyncCreate( + ShardId shardId, + Snapshot snapshot, + IndexId indexId, + IndexShardSnapshotStatus snapshotStatus, + IndexVersion repositoryMetaVersion, + long snapshotStartTime, + ActionListener listener, + Consumer snapshotShardContextConsumer + ) throws IOException { + + final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); + if (indexShard.routingEntry().primary() == false) { + throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); + } + if (indexShard.routingEntry().relocating()) { + // do not snapshot when in the process of relocation of primaries so we won't get conflicts + throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating"); + } + + final IndexShardState indexShardState = indexShard.state(); + if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) { + // shard has just been created, or still recovering + throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); + } + + SnapshotIndexCommit snapshotIndexCommit = null; + try { + snapshotStatus.updateStatusDescription("acquiring commit reference from IndexShard: triggers a shard flush"); + snapshotIndexCommit = new SnapshotIndexCommit(indexShard.acquireIndexCommitForSnapshot()); + snapshotStatus.updateStatusDescription("commit reference acquired, proceeding with snapshot"); + final var shardStateId = getShardStateId(indexShard, snapshotIndexCommit.indexCommit()); // not aborted so indexCommit() ok + snapshotStatus.addAbortListener(makeAbortListener(indexShard.shardId(), snapshot, snapshotIndexCommit)); + snapshotStatus.ensureNotAborted(); + snapshotShardContextConsumer.accept( + new LocalPrimarySnapshotShardContext( + indexShard.store(), + indexShard.mapperService(), + snapshot.getSnapshotId(), + indexId, + snapshotIndexCommit, + shardStateId, + snapshotStatus, + repositoryMetaVersion, + snapshotStartTime, + listener + ) + ); + snapshotIndexCommit = null; + } finally { + if (snapshotIndexCommit != null) { + snapshotIndexCommit.closingBefore(new ActionListener() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + // we're already failing exceptionally, and prefer to propagate the original exception instead of this one + logger.warn(Strings.format("exception closing commit for [%s] in [%s]", shardId, snapshot), e); + } + }).onResponse(null); + } + } + } + + static ActionListener makeAbortListener( + ShardId shardId, + Snapshot snapshot, + SnapshotIndexCommit snapshotIndexCommit + ) { + return new ActionListener<>() { + @Override + public void onResponse(IndexShardSnapshotStatus.AbortStatus abortStatus) { + if (abortStatus == IndexShardSnapshotStatus.AbortStatus.ABORTED) { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.GENERIC, ThreadPool.Names.SNAPSHOT); + snapshotIndexCommit.onAbort(); + } + } + + @Override + public void onFailure(Exception e) { + logger.error(() -> Strings.format("unexpected failure in %s", description()), e); + assert false : e; + } + + @Override + public String toString() { + return description(); + } + + private String description() { + return Strings.format("abort listener for [%s] in [%s]", shardId, snapshot); + } + }; + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/SnapshotShardContext.java b/server/src/main/java/org/elasticsearch/repositories/SnapshotShardContext.java index 85692603b1e14..d87500aece156 100644 --- a/server/src/main/java/org/elasticsearch/repositories/SnapshotShardContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/SnapshotShardContext.java @@ -10,83 +10,55 @@ package org.elasticsearch.repositories; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.store.IndexInput; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DelegatingActionListener; +import org.elasticsearch.common.lucene.store.InputStreamIndexInput; +import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.snapshots.SnapshotId; -/** - * Context holding the state for creating a shard snapshot via {@link Repository#snapshotShard(SnapshotShardContext)}. - * Wraps a {@link org.elasticsearch.index.engine.Engine.IndexCommitRef} that is released once this instances is completed by invoking - * either its {@link #onResponse(ShardSnapshotResult)} or {@link #onFailure(Exception)} callback. - */ -public final class SnapshotShardContext extends DelegatingActionListener { +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; + +public abstract class SnapshotShardContext extends DelegatingActionListener { - private final Store store; - private final MapperService mapperService; private final SnapshotId snapshotId; private final IndexId indexId; - private final SnapshotIndexCommit commitRef; @Nullable private final String shardStateIdentifier; private final IndexShardSnapshotStatus snapshotStatus; private final IndexVersion repositoryMetaVersion; private final long snapshotStartTime; - /** - * @param store store to be snapshotted - * @param mapperService the shards mapper service - * @param snapshotId snapshot id - * @param indexId id for the index being snapshotted - * @param commitRef commit point reference - * @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used - * to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier - * snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit} - * @param snapshotStatus snapshot status - * @param repositoryMetaVersion version of the updated repository metadata to write - * @param snapshotStartTime start time of the snapshot found in - * {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#startTime()} - * @param listener listener invoked on completion - */ - public SnapshotShardContext( - Store store, - MapperService mapperService, + protected SnapshotShardContext( SnapshotId snapshotId, IndexId indexId, - SnapshotIndexCommit commitRef, @Nullable String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, IndexVersion repositoryMetaVersion, - final long snapshotStartTime, + long snapshotStartTime, ActionListener listener ) { - super(commitRef.closingBefore(listener)); - this.store = store; - this.mapperService = mapperService; + super(listener); this.snapshotId = snapshotId; this.indexId = indexId; - this.commitRef = commitRef; this.shardStateIdentifier = shardStateIdentifier; this.snapshotStatus = snapshotStatus; this.repositoryMetaVersion = repositoryMetaVersion; this.snapshotStartTime = snapshotStartTime; } - public Store store() { - return store; - } - - public MapperService mapperService() { - return mapperService; - } - public SnapshotId snapshotId() { return snapshotId; } @@ -95,10 +67,6 @@ public IndexId indexId() { return indexId; } - public IndexCommit indexCommit() { - return commitRef.indexCommit(); - } - @Nullable public String stateIdentifier() { return shardStateIdentifier; @@ -121,14 +89,58 @@ public void onResponse(ShardSnapshotResult result) { delegate.onResponse(result); } - public Releasable withCommitRef() { - snapshotStatus.ensureNotAborted(); // check this first to avoid acquiring a ref when aborted even if refs are available - if (commitRef.tryIncRef()) { - return Releasables.releaseOnce(commitRef::decRef); - } else { - snapshotStatus.ensureNotAborted(); - assert false : "commit ref closed early in state " + snapshotStatus; - throw new IndexShardSnapshotFailedException(store.shardId(), "Store got closed concurrently"); + public abstract ShardId shardId(); + + public abstract Store store(); + + public abstract MapperService mapperService(); + + public abstract IndexCommit indexCommit(); + + public abstract Releasable withCommitRef(); + + public abstract boolean isSearchableSnapshot(); + + public abstract Store.MetadataSnapshot metadataSnapshot(); + + public abstract Collection fileNames(); + + public abstract boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.FileInfo fileInfo); + + public abstract void failStoreIfCorrupted(Exception e); + + public abstract FileReader fileReader(String file, StoreFileMetadata metadata) throws IOException; + + public interface FileReader extends Closeable { + + InputStream openInput(long limit) throws IOException; + + void verify() throws IOException; + } + + public static class IndexInputFileReader implements FileReader { + + private final Releasable commitRefReleasable; + private final IndexInput indexInput; + + public IndexInputFileReader(Releasable commitRefReleasable, IndexInput indexInput) { + this.commitRefReleasable = commitRefReleasable; + this.indexInput = indexInput; + } + + @Override + public InputStream openInput(long limit) throws IOException { + return new InputStreamIndexInput(indexInput, limit); + } + + @Override + public void close() throws IOException { + IOUtils.close(indexInput, commitRefReleasable); + } + + @Override + public void verify() throws IOException { + Store.verify(indexInput); } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/SnapshotShardContextFactory.java b/server/src/main/java/org/elasticsearch/repositories/SnapshotShardContextFactory.java new file mode 100644 index 0000000000000..20e64b4fe31e6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/SnapshotShardContextFactory.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.snapshots.Snapshot; + +import java.io.IOException; +import java.util.function.Consumer; + +public interface SnapshotShardContextFactory { + + default boolean isStateless() { + return false; + }; + + void asyncCreate( + ShardId shardId, + Snapshot snapshot, + IndexId indexId, + IndexShardSnapshotStatus snapshotStatus, + IndexVersion repositoryMetaVersion, + long snapshotStartTime, + ActionListener listener, + Consumer snapshotShardContextConsumer + ) throws IOException; +} diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 43cb1a7d3caec..74e5fa215bf20 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -12,12 +12,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.BytesRef; @@ -71,8 +69,6 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -3241,8 +3237,7 @@ private void doSnapshotShard(SnapshotShardContext context) { context.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository")); return; } - final Store store = context.store(); - final ShardId shardId = store.shardId(); + final ShardId shardId = context.shardId(); final SnapshotId snapshotId = context.snapshotId(); final IndexShardSnapshotStatus snapshotStatus = context.status(); snapshotStatus.updateStatusDescription("snapshot task runner: setting up shard snapshot"); @@ -3303,7 +3298,7 @@ private void doSnapshotShard(SnapshotShardContext context) { int filesInShardMetadataCount = 0; long filesInShardMetadataSize = 0; - if (store.indexSettings().getIndexMetadata().isSearchableSnapshot()) { + if (context.isSearchableSnapshot()) { indexCommitPointFiles = Collections.emptyList(); } else if (filesFromSegmentInfos == null) { // If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files @@ -3313,14 +3308,8 @@ private void doSnapshotShard(SnapshotShardContext context) { final Store.MetadataSnapshot metadataFromStore; try (Releasable ignored = context.withCommitRef()) { // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should - try { - final IndexCommit snapshotIndexCommit = context.indexCommit(); - logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); - metadataFromStore = store.getMetadata(snapshotIndexCommit); - fileNames = snapshotIndexCommit.getFileNames(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); - } + metadataFromStore = context.metadataSnapshot(); + fileNames = context.fileNames(); } for (String fileName : fileNames) { ensureNotAborted(shardId, snapshotId, snapshotStatus, fileName); @@ -3348,7 +3337,7 @@ private void doSnapshotShard(SnapshotShardContext context) { if (needsWrite) { filesToSnapshot.add(snapshotFileInfo); } else { - assert assertFileContentsMatchHash(snapshotStatus, snapshotFileInfo, store); + assert context.assertFileContentsMatchHash(snapshotFileInfo); filesInShardMetadataCount += 1; filesInShardMetadataSize += md.length(); } @@ -3581,32 +3570,6 @@ protected void snapshotFiles( } } - private static boolean assertFileContentsMatchHash( - IndexShardSnapshotStatus snapshotStatus, - BlobStoreIndexShardSnapshot.FileInfo fileInfo, - Store store - ) { - if (store.tryIncRef()) { - try (IndexInput indexInput = store.openVerifyingInput(fileInfo.physicalName(), IOContext.READONCE, fileInfo.metadata())) { - final byte[] tmp = new byte[Math.toIntExact(fileInfo.metadata().length())]; - indexInput.readBytes(tmp, 0, tmp.length); - assert fileInfo.metadata().hash().bytesEquals(new BytesRef(tmp)); - } catch (IOException e) { - throw new AssertionError(e); - } finally { - store.decRef(); - } - } else { - try { - snapshotStatus.ensureNotAborted(); - assert false : "if the store is already closed we must have been aborted"; - } catch (Exception e) { - assert e instanceof AbortedSnapshotException : e; - } - } - return true; - } - @Override public void restoreShard( Store store, @@ -4119,23 +4082,17 @@ private Tuple buildBlobStoreIndexShardSnapsh */ protected void snapshotFile(SnapshotShardContext context, FileInfo fileInfo) throws IOException { final IndexId indexId = context.indexId(); - final Store store = context.store(); - final ShardId shardId = store.shardId(); + final ShardId shardId = context.shardId(); final IndexShardSnapshotStatus snapshotStatus = context.status(); final SnapshotId snapshotId = context.snapshotId(); final BlobContainer shardContainer = shardContainer(indexId, shardId); final String file = fileInfo.physicalName(); - try ( - Releasable ignored = context.withCommitRef(); - IndexInput indexInput = store.openVerifyingInput(file, IOContext.DEFAULT, fileInfo.metadata()) - ) { + try (var fileReader = context.fileReader(file, fileInfo.metadata())) { for (int i = 0; i < fileInfo.numberOfParts(); i++) { final long partBytes = fileInfo.partBytes(i); // Make reads abortable by mutating the snapshotStatus object - final InputStream inputStream = new FilterInputStream( - maybeRateLimitSnapshots(new InputStreamIndexInput(indexInput, partBytes)) - ) { + final InputStream inputStream = new FilterInputStream(maybeRateLimitSnapshots(fileReader.openInput(partBytes))) { @Override public int read() throws IOException { checkAborted(); @@ -4165,26 +4122,15 @@ private void checkAborted() { threadPool.relativeTimeInMillis() - startMS ); } - Store.verify(indexInput); + fileReader.verify(); snapshotStatus.addProcessedFile(fileInfo.length()); } catch (Exception t) { - failStoreIfCorrupted(store, t); + context.failStoreIfCorrupted(t); snapshotStatus.addProcessedFile(0); throw t; } } - private static void failStoreIfCorrupted(Store store, Exception e) { - if (Lucene.isCorruptionException(e)) { - try { - store.markStoreCorrupted((IOException) e); - } catch (IOException inner) { - inner.addSuppressed(e); - logger.warn("store cannot be marked as corrupted", inner); - } - } - } - public boolean supportURLRepo() { return supportURLRepo; } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunner.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunner.java index 715769665b117..4c7c8097e08e8 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunner.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunner.java @@ -62,7 +62,7 @@ private String snapshotUUID() { @SuppressWarnings("resource") private ShardId shardId() { - return context().store().shardId(); + return context().shardId(); } private static final Comparator COMPARATOR = Comparator diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 3fd5a226936ac..d39ef94c7c5f0 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -37,9 +37,8 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; +import org.elasticsearch.index.snapshots.IndexShardSnapshotException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus.Stage; import org.elasticsearch.indices.IndicesService; @@ -50,8 +49,7 @@ import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.ShardSnapshotResult; -import org.elasticsearch.repositories.SnapshotIndexCommit; -import org.elasticsearch.repositories.SnapshotShardContext; +import org.elasticsearch.repositories.SnapshotShardContextFactory; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -103,18 +101,21 @@ public final class SnapshotShardsService extends AbstractLifecycleComponent impl // Runs the tasks that promptly notify shards of aborted snapshots so that resources can be released ASAP private final ThrottledTaskRunner notifyOnAbortTaskRunner; + private final SnapshotShardContextFactory snapshotShardContextFactory; public SnapshotShardsService( Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService, - IndicesService indicesService + IndicesService indicesService, + SnapshotShardContextFactory snapshotShardContextFactory ) { this.indicesService = indicesService; this.repositoriesService = repositoriesService; this.transportService = transportService; this.clusterService = clusterService; + this.snapshotShardContextFactory = snapshotShardContextFactory; this.threadPool = transportService.getThreadPool(); this.snapshotShutdownProgressTracker = new SnapshotShutdownProgressTracker( () -> clusterService.state().nodes().getLocalNodeId(), @@ -234,6 +235,9 @@ private static boolean isPausingProgressTrackedShutdown(@Nullable SingleNodeShut @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + if (snapshotShardContextFactory.isStateless()) { + return; + } // abort any snapshots occurring on the soon-to-be closed shard synchronized (shardSnapshots) { for (Map.Entry> snapshotShards : shardSnapshots.entrySet()) { @@ -289,6 +293,20 @@ public Map currentSnapshotShards(Snapsho } } + public void ensureShardSnapshotNotAborted(Snapshot snapshot, ShardId shardId) { + synchronized (shardSnapshots) { + final var current = shardSnapshots.get(snapshot); + if (current == null) { + throw new SnapshotMissingException(snapshot.getRepository(), snapshot.getSnapshotId().getName()); + } + final var indexShardSnapshotStatus = current.get(shardId); + if (indexShardSnapshotStatus == null) { + throw new IndexShardSnapshotException(shardId, "shard snapshot [" + snapshot.getSnapshotId() + "] does not exist"); + } + indexShardSnapshotStatus.ensureNotAborted(); + } + } + /** * Cancels any snapshots that have been removed from the given list of SnapshotsInProgress. */ @@ -590,93 +608,21 @@ private void snapshot( ActionListener.run(resultListener, listener -> { snapshotStatus.updateStatusDescription("has started"); snapshotStatus.ensureNotAborted(); - final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); - if (indexShard.routingEntry().primary() == false) { - throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); - } - if (indexShard.routingEntry().relocating()) { - // do not snapshot when in the process of relocation of primaries so we won't get conflicts - throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating"); - } - - final IndexShardState indexShardState = indexShard.state(); - if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) { - // shard has just been created, or still recovering - throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); - } final Repository repository = repositoriesService.repository(snapshot.getProjectId(), snapshot.getRepository()); - SnapshotIndexCommit snapshotIndexCommit = null; - try { - snapshotStatus.updateStatusDescription("acquiring commit reference from IndexShard: triggers a shard flush"); - snapshotIndexCommit = new SnapshotIndexCommit(indexShard.acquireIndexCommitForSnapshot()); - snapshotStatus.updateStatusDescription("commit reference acquired, proceeding with snapshot"); - final var shardStateId = getShardStateId(indexShard, snapshotIndexCommit.indexCommit()); // not aborted so indexCommit() ok - snapshotStatus.addAbortListener(makeAbortListener(indexShard.shardId(), snapshot, snapshotIndexCommit)); - snapshotStatus.ensureNotAborted(); - repository.snapshotShard( - new SnapshotShardContext( - indexShard.store(), - indexShard.mapperService(), - snapshot.getSnapshotId(), - indexId, - snapshotIndexCommit, - shardStateId, - snapshotStatus, - version, - entryStartTime, - listener - ) - ); - snapshotIndexCommit = null; // success - } finally { - if (snapshotIndexCommit != null) { - snapshotIndexCommit.closingBefore(new ActionListener() { - @Override - public void onResponse(Void unused) {} - - @Override - public void onFailure(Exception e) { - // we're already failing exceptionally, and prefer to propagate the original exception instead of this one - logger.warn(Strings.format("exception closing commit for [%s] in [%s]", indexShard.shardId(), snapshot), e); - } - }).onResponse(null); - } - } + snapshotShardContextFactory.asyncCreate( + shardId, + snapshot, + indexId, + snapshotStatus, + version, + entryStartTime, + listener, + repository::snapshotShard + ); }); } - private static ActionListener makeAbortListener( - ShardId shardId, - Snapshot snapshot, - SnapshotIndexCommit snapshotIndexCommit - ) { - return new ActionListener<>() { - @Override - public void onResponse(IndexShardSnapshotStatus.AbortStatus abortStatus) { - if (abortStatus == IndexShardSnapshotStatus.AbortStatus.ABORTED) { - assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.GENERIC, ThreadPool.Names.SNAPSHOT); - snapshotIndexCommit.onAbort(); - } - } - - @Override - public void onFailure(Exception e) { - logger.error(() -> Strings.format("unexpected failure in %s", description()), e); - assert false : e; - } - - @Override - public String toString() { - return description(); - } - - private String description() { - return Strings.format("abort listener for [%s] in [%s]", shardId, snapshot); - } - }; - } - /** * Generates an identifier from the current state of a shard that can be used to detect whether a shard's contents * have changed between two snapshots. diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 68eb8bd8cf2cd..da1793b7c8d69 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -200,6 +200,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement private final boolean serializeProjectMetadata; + private final boolean statelessSnapshotEnabled; + private final MasterServiceTaskQueue masterServiceTaskQueue; private final ShardSnapshotUpdateCompletionHandler shardSnapshotUpdateCompletionHandler; @@ -217,6 +219,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement Setting.Property.Dynamic ); + public static final String STATELESS_SNAPSHOT_ENABLED_SETTING_NAME = "stateless.snapshot.enabled"; + private volatile int maxConcurrentOperations; public SnapshotsService( @@ -228,7 +232,8 @@ public SnapshotsService( TransportService transportService, ActionFilters actionFilters, SystemIndices systemIndices, - boolean serializeProjectMetadata + boolean serializeProjectMetadata, + boolean statelessSnapshotEnabled ) { this.clusterService = clusterService; this.rerouteService = rerouteService; @@ -248,6 +253,7 @@ public SnapshotsService( } this.systemIndices = systemIndices; this.serializeProjectMetadata = serializeProjectMetadata; + this.statelessSnapshotEnabled = statelessSnapshotEnabled; this.masterServiceTaskQueue = clusterService.createTaskQueue("snapshots-service", Priority.NORMAL, new SnapshotTaskExecutor()); this.updateNodeIdsToRemoveQueue = clusterService.createTaskQueue( @@ -1134,7 +1140,8 @@ public ClusterState execute(ClusterState currentState) { currentState.routingTable(projectId), nodes, snapshotsInProgress::isNodeIdForRemoval, - knownFailures + knownFailures, + statelessSnapshotEnabled ); if (shards != null) { final SnapshotsInProgress.Entry updatedSnapshot = snapshotEntry.withShardStates(shards); @@ -1234,7 +1241,8 @@ private static ImmutableOpenMap processWaitingShar RoutingTable routingTable, DiscoveryNodes nodes, Predicate nodeIdRemovalPredicate, - Map knownFailures + Map knownFailures, + boolean statelessSnapshotEnabled ) { assert snapshotEntry.isClone() == false : "clones take a different path"; boolean snapshotChanged = false; @@ -1318,6 +1326,32 @@ private static ImmutableOpenMap processWaitingShar } else { // TODO: Restart snapshot on another node? snapshotChanged = true; + if (statelessSnapshotEnabled && shardStatus.state() == ShardState.INIT) { + // Node shutdown before the shard snapshot can be paused, attempt to reassign the shard snapshot + IndexRoutingTable indexRoutingTable = routingTable.index(shardId.getIndex()); + if (indexRoutingTable != null) { + IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId.id()); + if (indexShardRoutingTable != null) { + final var updatedShardSnapshotStatus = initShardSnapshotStatus( + shardStatus.generation(), + indexShardRoutingTable.primaryShard(), + nodeIdRemovalPredicate + ); + if (updatedShardSnapshotStatus.state().completed() == false) { + logger.info( + "restart snapshot of shard {} from departed node [{}] on node [{}], new state [{}]", + shardId, + shardStatus.nodeId(), + updatedShardSnapshotStatus.nodeId(), + updatedShardSnapshotStatus.state() + ); + shards.put(shardId, updatedShardSnapshotStatus); + continue; + } + // If shard becomes unassigned at the same time. We let it fall through the same failure as node removal + } + } + } logger.warn("failing snapshot of shard [{}] on departed node [{}]", shardId, shardStatus.nodeId()); final ShardSnapshotStatus failedState = new ShardSnapshotStatus( shardStatus.nodeId(), @@ -3610,6 +3644,8 @@ private void applyShardSnapshotUpdate( "snapshot aborted" ); } else { + // TODO: For stateless snapshot, if the update is PAUSED_FOR_NODE_REMOVAL, we can directly move it back to INIT + // if the shard has relocated and started on a different node. updatedShardSnapshotStatus = shardSnapshotStatusUpdate.updatedState; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDeciderTests.java index 8d3a5abb2fb4a..0640d6a933eb2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDeciderTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; @@ -38,7 +39,7 @@ public class SnapshotInProgressAllocationDeciderTests extends ESTestCase { - private final SnapshotInProgressAllocationDecider decider = new SnapshotInProgressAllocationDecider(); + private final SnapshotInProgressAllocationDecider decider = new SnapshotInProgressAllocationDecider(Settings.EMPTY); private final Index index = new Index(randomIdentifier(), randomUUID()); private final ShardId shardId = new ShardId(index, 0); private final String repositoryName = randomIdentifier(); diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunnerTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunnerTests.java index f3c0f419cde7a..ff3308d8e62a9 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunnerTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunnerTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.LocalPrimarySnapshotShardContext; import org.elasticsearch.repositories.SnapshotIndexCommit; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.snapshots.SnapshotId; @@ -125,7 +126,7 @@ public static SnapshotShardContext dummyContext(final SnapshotId snapshotId, fin Settings.EMPTY ); final var dummyStore = new Store(shardId, indexSettings, new ByteBuffersDirectory(), new DummyShardLock(shardId)); - return new SnapshotShardContext( + return new LocalPrimarySnapshotShardContext( dummyStore, null, snapshotId, diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 3b1eb8c1f39d9..11cf6c15f9fa0 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -56,10 +56,10 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.LocalPrimarySnapshotShardContext; import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotIndexCommit; -import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; @@ -125,7 +125,7 @@ public void testSnapshotAndRestore() throws IOException { final PlainActionFuture snapshot1Future = new PlainActionFuture<>(); IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( store, null, snapshotId, @@ -168,7 +168,7 @@ public void testSnapshotAndRestore() throws IOException { final PlainActionFuture snapshot2future = new PlainActionFuture<>(); IndexShardSnapshotStatus snapshotStatus2 = IndexShardSnapshotStatus.newInitializing(shardGeneration); repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( store, null, incSnapshotId, @@ -309,7 +309,7 @@ protected BlobContainer wrapChild(BlobContainer child) { canErrorForWriteBlob.set(true); shouldErrorForWriteMetadataBlob.set(false); repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( store1, null, snapshotId, @@ -344,7 +344,7 @@ protected BlobContainer wrapChild(BlobContainer child) { canErrorForWriteBlob.set(false); shouldErrorForWriteMetadataBlob.set(true); repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( store2, null, snapshotId, diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 2fe0071bf53b2..ab74c35aac8c9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -171,6 +171,7 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.internal.DocumentParsingProvider; import org.elasticsearch.plugins.scanners.StablePluginsRegistry; +import org.elasticsearch.repositories.LocalPrimarySnapshotShardContextFactory; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -2423,6 +2424,7 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { transportService, actionFilters, EmptySystemIndices.INSTANCE, + false, false ); nodeEnv = new NodeEnvironment(settings, environment); @@ -2500,7 +2502,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { clusterService, repositoriesService, transportService, - indicesService + indicesService, + new LocalPrimarySnapshotShardContextFactory(indicesService) ); final ShardStateAction shardStateAction = new ShardStateAction( clusterService, diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index aed1d10342768..be6f410f91d02 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -83,11 +83,11 @@ import org.elasticsearch.indices.recovery.plan.PeerOnlyRecoveryPlannerService; import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.LocalPrimarySnapshotShardContext; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotIndexCommit; -import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.test.DummyShardLock; @@ -1167,7 +1167,7 @@ protected ShardGeneration snapshotShard(final IndexShard shard, final Snapshot s final ShardGeneration shardGen; try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( shard.store(), shard.mapperService(), snapshot.getSnapshotId(), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java index c845df8501e45..0479b9f5db192 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java @@ -41,6 +41,7 @@ import org.elasticsearch.repositories.FilterRepository; import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.LocalPrimarySnapshotShardContext; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.SnapshotIndexCommit; import org.elasticsearch.repositories.SnapshotShardContext; @@ -211,7 +212,7 @@ protected void closeInternal() { toClose.add(reader); IndexCommit indexCommit = reader.getIndexCommit(); super.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( tempStore, mapperService, context.snapshotId(), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java index 655547a3f26ea..45b44867123af 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java @@ -70,12 +70,12 @@ import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.LocalPrimarySnapshotShardContext; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotIndexCommit; -import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.elasticsearch.repositories.fs.FsRepository; @@ -135,7 +135,7 @@ public void testSourceIncomplete() throws IOException { runAsSnapshot( shard.getThreadPool(), () -> repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( shard.store(), shard.mapperService(), snapshotId, @@ -186,7 +186,7 @@ public void testSourceIncompleteSyntheticSourceNoDoc() throws IOException { runAsSnapshot( shard.getThreadPool(), () -> repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( shard.store(), shard.mapperService(), snapshotId, @@ -230,7 +230,7 @@ public void testIncrementalSnapshot() throws IOException { runAsSnapshot( shard.getThreadPool(), () -> repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( shard.store(), shard.mapperService(), snapshotId, @@ -261,7 +261,7 @@ public void testIncrementalSnapshot() throws IOException { runAsSnapshot( shard.getThreadPool(), () -> repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( shard.store(), shard.mapperService(), snapshotId, @@ -292,7 +292,7 @@ public void testIncrementalSnapshot() throws IOException { runAsSnapshot( shard.getThreadPool(), () -> repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( shard.store(), shard.mapperService(), snapshotId, @@ -355,7 +355,7 @@ public void testRestoreMinimal() throws IOException { final PlainActionFuture future = new PlainActionFuture<>(); runAsSnapshot(shard.getThreadPool(), () -> { repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( shard.store(), shard.mapperService(), snapshotId, diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java index 35d71ba23e283..dd13dc439c2e4 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java @@ -82,9 +82,9 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.LocalPrimarySnapshotShardContext; import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotIndexCommit; -import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.fs.FsRepository; @@ -620,7 +620,7 @@ private void testDirectories( threadPool.generic().submit(() -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); repository.snapshotShard( - new SnapshotShardContext( + new LocalPrimarySnapshotShardContext( store, null, snapshotId,