Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new EnableAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new IndexVersionAllocationDecider());
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider(settings));
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new NodeShutdownAllocationDecider());
addAllocationDecider(deciders, new NodeReplacementAllocationDecider());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,25 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.FixForMultiProject;

import java.util.Objects;

import static org.elasticsearch.snapshots.SnapshotsService.STATELESS_SNAPSHOT_ENABLED_SETTING_NAME;

/**
* This {@link org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider} prevents shards that
* are currently been snapshotted to be moved to other nodes.
*/
public class SnapshotInProgressAllocationDecider extends AllocationDecider {

public static final String NAME = "snapshot_in_progress";
private final boolean statelessSnapshotEnabled;

public SnapshotInProgressAllocationDecider(Settings settings) {
this.statelessSnapshotEnabled = settings.getAsBoolean(STATELESS_SNAPSHOT_ENABLED_SETTING_NAME, false);
}

/**
* Returns a {@link Decision} whether the given shard routing can be
Expand All @@ -50,10 +58,19 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
return canAllocate(shardRouting, node, allocation);
}

private static final Decision YES_STATELESS_SNAPSHOT_ENABLED = Decision.single(
Decision.Type.YES,
NAME,
"stateless snapshot is enabled"
);
private static final Decision YES_NOT_RUNNING = Decision.single(Decision.Type.YES, NAME, "no snapshots are currently running");
private static final Decision YES_NOT_SNAPSHOTTED = Decision.single(Decision.Type.YES, NAME, "the shard is not being snapshotted");

private static Decision canMove(ShardRouting shardRouting, RoutingAllocation allocation) {
private Decision canMove(ShardRouting shardRouting, RoutingAllocation allocation) {
if (statelessSnapshotEnabled) {
return YES_STATELESS_SNAPSHOT_ENABLED;
}

if (allocation.isSimulating()) {
return allocation.decision(Decision.YES, NAME, "allocation is always enabled when simulating");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@
import org.elasticsearch.plugins.internal.RestExtension;
import org.elasticsearch.plugins.internal.SettingsExtension;
import org.elasticsearch.readiness.ReadinessService;
import org.elasticsearch.repositories.LocalPrimarySnapshotShardContextFactory;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.SnapshotShardContextFactory;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.ReservedProjectStateHandler;
import org.elasticsearch.reservedstate.ReservedStateHandlerProvider;
Expand Down Expand Up @@ -251,6 +253,7 @@
import static java.util.Collections.newSetFromMap;
import static java.util.function.Predicate.not;
import static org.elasticsearch.core.Types.forciblyCast;
import static org.elasticsearch.snapshots.SnapshotsService.STATELESS_SNAPSHOT_ENABLED_SETTING_NAME;

/**
* Class uses to perform all the operations needed to construct a {@link Node} instance.
Expand Down Expand Up @@ -1128,6 +1131,18 @@ public Map<String, String> queryFields() {
);
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);

final boolean statelessSnapshotEnabled = settings.getAsBoolean(STATELESS_SNAPSHOT_ENABLED_SETTING_NAME, false);
final SnapshotShardContextFactory snapshotShardContextFactory;
if (statelessSnapshotEnabled) {
snapshotShardContextFactory = pluginsService.loadSingletonServiceProvider(SnapshotShardContextFactory.class, () -> {
throw new IllegalStateException(
STATELESS_SNAPSHOT_ENABLED_SETTING_NAME + " is enabled, but no SnapshotShardContextFactory is registered"
);
});
} else {
snapshotShardContextFactory = new LocalPrimarySnapshotShardContextFactory(indicesService);
}

SnapshotsService snapshotsService = new SnapshotsService(
settings,
clusterService,
Expand All @@ -1137,14 +1152,16 @@ public Map<String, String> queryFields() {
transportService,
actionModule.getActionFilters(),
systemIndices,
projectResolver.supportsMultipleProjects()
projectResolver.supportsMultipleProjects(),
statelessSnapshotEnabled
);
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
settings,
clusterService,
repositoriesService,
transportService,
indicesService
indicesService,
snapshotShardContextFactory
);

actionModule.getReservedClusterStateService().installProjectStateHandler(new ReservedRepositoryAction(repositoriesService));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories;

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.snapshots.AbortedSnapshotException;
import org.elasticsearch.snapshots.SnapshotId;

import java.io.IOException;
import java.util.Collection;

/**
* Context holding the state for creating a shard snapshot via {@link Repository#snapshotShard(SnapshotShardContext)}.
* Wraps a {@link org.elasticsearch.index.engine.Engine.IndexCommitRef} that is released once this instances is completed by invoking
* either its {@link #onResponse(ShardSnapshotResult)} or {@link #onFailure(Exception)} callback.
*/
public final class LocalPrimarySnapshotShardContext extends SnapshotShardContext {

private static final Logger logger = LogManager.getLogger(LocalPrimarySnapshotShardContext.class);

private final Store store;
private final MapperService mapperService;
private final SnapshotIndexCommit commitRef;

/**
* @param store store to be snapshotted
* @param mapperService the shards mapper service
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param commitRef commit point reference
* @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used
* to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
* snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
* @param snapshotStatus snapshot status
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param snapshotStartTime start time of the snapshot found in
* {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#startTime()}
* @param listener listener invoked on completion
*/
public LocalPrimarySnapshotShardContext(
Store store,
MapperService mapperService,
SnapshotId snapshotId,
IndexId indexId,
SnapshotIndexCommit commitRef,
@Nullable String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
IndexVersion repositoryMetaVersion,
final long snapshotStartTime,
ActionListener<ShardSnapshotResult> listener
) {
super(
snapshotId,
indexId,
shardStateIdentifier,
snapshotStatus,
repositoryMetaVersion,
snapshotStartTime,
commitRef.closingBefore(listener)
);
this.store = store;
this.mapperService = mapperService;
this.commitRef = commitRef;
}

@Override
public ShardId shardId() {
return store.shardId();
}

@Override
public Store store() {
return store;
}

@Override
public MapperService mapperService() {
return mapperService;
}

@Override
public IndexCommit indexCommit() {
return commitRef.indexCommit();
}

@Override
public Releasable withCommitRef() {
status().ensureNotAborted(); // check this first to avoid acquiring a ref when aborted even if refs are available
if (commitRef.tryIncRef()) {
return Releasables.releaseOnce(commitRef::decRef);
} else {
status().ensureNotAborted();
assert false : "commit ref closed early in state " + status();
throw new IndexShardSnapshotFailedException(shardId(), "Store got closed concurrently");
}
}

@Override
public boolean isSearchableSnapshot() {
return store.indexSettings().getIndexMetadata().isSearchableSnapshot();
}

@Override
public Store.MetadataSnapshot metadataSnapshot() {
final IndexCommit snapshotIndexCommit = indexCommit();
logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId(), snapshotId(), snapshotIndexCommit);
try {
return store.getMetadata(snapshotIndexCommit);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId(), "Failed to get store file metadata", e);
}
}

@Override
public Collection<String> fileNames() {
final IndexCommit snapshotIndexCommit = indexCommit();
try {
return snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId(), "Failed to get store file names", e);
}
}

@Override
public boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
if (store.tryIncRef()) {
try (IndexInput indexInput = store.openVerifyingInput(fileInfo.physicalName(), IOContext.READONCE, fileInfo.metadata())) {
final byte[] tmp = new byte[Math.toIntExact(fileInfo.metadata().length())];
indexInput.readBytes(tmp, 0, tmp.length);
assert fileInfo.metadata().hash().bytesEquals(new BytesRef(tmp));
} catch (IOException e) {
throw new AssertionError(e);
} finally {
store.decRef();
}
} else {
try {
status().ensureNotAborted();
assert false : "if the store is already closed we must have been aborted";
} catch (Exception e) {
assert e instanceof AbortedSnapshotException : e;
}
}
return true;
}

@Override
public void failStoreIfCorrupted(Exception e) {
if (Lucene.isCorruptionException(e)) {
try {
store.markStoreCorrupted((IOException) e);
} catch (IOException inner) {
inner.addSuppressed(e);
logger.warn("store cannot be marked as corrupted", inner);
}
}
}

@Override
public SnapshotShardContext.FileReader fileReader(String file, StoreFileMetadata metadata) throws IOException {
Releasable commitRefReleasable = null;
IndexInput indexInput = null;
try {
commitRefReleasable = withCommitRef();
indexInput = store.openVerifyingInput(file, IOContext.DEFAULT, metadata);
return new IndexInputFileReader(commitRefReleasable, indexInput);
} catch (Exception e) {
IOUtils.close(e, indexInput, commitRefReleasable);
throw e;
}
}
}
Loading