Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThrottledIterator;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
Expand Down Expand Up @@ -1733,6 +1732,7 @@ int sizeInBytes() {

@Override
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations();
final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
Expand Down Expand Up @@ -1762,14 +1762,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
SubscribableListener

// Get the current RepositoryData
.<RepositoryData>newForked(
listener -> getRepositoryData(
// TODO we might already be on a SNAPSHOT thread, make it so that we're always on a SNAPSHOT thread here and then we
// can avoid a little more forking below
EsExecutors.DIRECT_EXECUTOR_SERVICE,
listener
)
)
.<RepositoryData>newForked(listener -> getRepositoryData(executor, listener))

// Identify and write the missing metadata
.<MetadataWriteResult>andThen((l, existingRepositoryData) -> {
Expand Down Expand Up @@ -1837,13 +1830,12 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
}));
}

// Write the SnapshotInfo blob
executor.execute(
ActionRunnable.run(
allMetaListeners.acquire(),
() -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
)
);
// Write the SnapshotInfo blob to the repo (we're already on a SNAPSHOT thread so no need to fork this)
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
ActionListener.completeWith(allMetaListeners.acquire(), () -> {
SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress);
return null;
});

// TODO fail fast if any metadata write fails
// TODO clean up successful metadata writes on failure (needs care, we must not clobber another node concurrently
Expand All @@ -1853,7 +1845,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new

// Update the root blob
.<RootBlobUpdateResult>andThen((l, metadataWriteResult) -> {
// unlikely, but in theory we could still be on the thread which called finalizeSnapshot - TODO must fork to SNAPSHOT here
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
final var snapshotDetails = SnapshotDetails.fromSnapshotInfo(snapshotInfo);
final var existingRepositoryData = metadataWriteResult.existingRepositoryData();
writeIndexGen(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Predicates;
Expand Down Expand Up @@ -1399,9 +1401,34 @@ private void leaveRepoLoop(String repository) {
}

private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
assert currentlyFinalizing.contains(snapshot.getRepository());
assert repositoryOperations.assertNotQueued(snapshot);
try {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new SnapshotFinalization(snapshot, metadata, repositoryData));
}

/**
* Implements the finalization process for a snapshot: does some preparatory calculations, builds a {@link SnapshotInfo} and a
* {@link FinalizeSnapshotContext}, calls {@link Repository#finalizeSnapshot} and handles the outcome by notifying waiting listeners
* and triggering the next snapshot-related activity (another finalization, a batch of deletes, etc.)
*/
// This only really makes sense to run against a BlobStoreRepository, and the division of work between this class and
// BlobStoreRepository#finalizeSnapshot is kind of awkward and artificial; TODO consolidate all this stuff into one place and simplify
private class SnapshotFinalization extends AbstractRunnable {

private final Snapshot snapshot;
private final Metadata metadata;
private final RepositoryData repositoryData;

SnapshotFinalization(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
this.snapshot = snapshot;
this.metadata = metadata;
this.repositoryData = repositoryData;
}

@Override
protected void doRun() {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
assert currentlyFinalizing.contains(snapshot.getRepository());
assert repositoryOperations.assertNotQueued(snapshot);

SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
final String failure = entry.failure();
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
Expand Down Expand Up @@ -1429,7 +1456,9 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
final ListenableFuture<Metadata> metadataListener = new ListenableFuture<>();
final Repository repo = repositoriesService.repository(snapshot.getRepository());
if (entry.isClone()) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
// This listener is kinda unnecessary since we now always complete it synchronously. It's only here to catch exceptions.
// TODO simplify this.
ActionListener.completeWith(metadataListener, () -> {
final Metadata existing = repo.getSnapshotGlobalMetadata(entry.source());
final Metadata.Builder metaBuilder = Metadata.builder(existing);
final Set<Index> existingIndices = new HashSet<>();
Expand All @@ -1451,11 +1480,12 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
);
metaBuilder.dataStreams(dataStreamsToCopy, dataStreamAliasesToCopy);
return metaBuilder.build();
}));
});
} else {
metadataListener.onResponse(metadata);
}
metadataListener.addListener(ActionListener.wrap(meta -> {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
final Metadata metaForSnapshot = metadataForSnapshot(entry, meta);

final Map<String, SnapshotInfo.IndexSnapshotDetails> indexSnapshotDetails = Maps.newMapWithExpectedSize(
Expand Down Expand Up @@ -1555,7 +1585,20 @@ public void onFailure(Exception e) {
shardGenerations
)
));
} catch (Exception e) {
}

@Override
public void onRejection(Exception e) {
if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
logger.debug("failing finalization of {} due to shutdown", snapshot);
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
} else {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e);
assert false : new AssertionError("unexpected failure finalizing " + snapshot, e);
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
Expand Down