diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index dc8254f751330..d3fae8adb466d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -78,7 +78,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThrottledIterator; import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; @@ -1738,6 +1737,7 @@ int sizeInBytes() { @Override public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); final long repositoryStateId = finalizeSnapshotContext.repositoryStateId(); final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations(); final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo(); @@ -1767,14 +1767,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new SubscribableListener // Get the current RepositoryData - .newForked( - listener -> getRepositoryData( - // TODO we might already be on a SNAPSHOT thread, make it so that we're always on a SNAPSHOT thread here and then we - // can avoid a little more forking below - EsExecutors.DIRECT_EXECUTOR_SERVICE, - listener - ) - ) + .newForked(listener -> getRepositoryData(executor, listener)) // Identify and write the missing metadata .andThen((l, existingRepositoryData) -> { @@ -1842,13 +1835,12 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new })); } - // Write the SnapshotInfo blob - executor.execute( - ActionRunnable.run( - allMetaListeners.acquire(), - () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress) - ) - ); + // Write the SnapshotInfo blob to the repo (we're already on a SNAPSHOT thread so no need to fork this) + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); + ActionListener.completeWith(allMetaListeners.acquire(), () -> { + SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress); + return null; + }); // TODO fail fast if any metadata write fails // TODO clean up successful metadata writes on failure (needs care, we must not clobber another node concurrently @@ -1858,7 +1850,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new // Update the root blob .andThen((l, metadataWriteResult) -> { - // unlikely, but in theory we could still be on the thread which called finalizeSnapshot - TODO must fork to SNAPSHOT here + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); final var snapshotDetails = SnapshotDetails.fromSnapshotInfo(snapshotInfo); final var existingRepositoryData = metadataWriteResult.existingRepositoryData(); writeIndexGen( diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index cce2eaa8d60ed..10ebd2e66c304 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -71,7 +71,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; @@ -1398,9 +1400,34 @@ private void leaveRepoLoop(String repository) { } private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) { - assert currentlyFinalizing.contains(snapshot.getRepository()); - assert repositoryOperations.assertNotQueued(snapshot); - try { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new SnapshotFinalization(snapshot, metadata, repositoryData)); + } + + /** + * Implements the finalization process for a snapshot: does some preparatory calculations, builds a {@link SnapshotInfo} and a + * {@link FinalizeSnapshotContext}, calls {@link Repository#finalizeSnapshot} and handles the outcome by notifying waiting listeners + * and triggering the next snapshot-related activity (another finalization, a batch of deletes, etc.) + */ + // This only really makes sense to run against a BlobStoreRepository, and the division of work between this class and + // BlobStoreRepository#finalizeSnapshot is kind of awkward and artificial; TODO consolidate all this stuff into one place and simplify + private class SnapshotFinalization extends AbstractRunnable { + + private final Snapshot snapshot; + private final Metadata metadata; + private final RepositoryData repositoryData; + + SnapshotFinalization(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) { + this.snapshot = snapshot; + this.metadata = metadata; + this.repositoryData = repositoryData; + } + + @Override + protected void doRun() { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); + assert currentlyFinalizing.contains(snapshot.getRepository()); + assert repositoryOperations.assertNotQueued(snapshot); + SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot); final String failure = entry.failure(); logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); @@ -1428,7 +1455,9 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit final ListenableFuture metadataListener = new ListenableFuture<>(); final Repository repo = repositoriesService.repository(snapshot.getRepository()); if (entry.isClone()) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> { + // This listener is kinda unnecessary since we now always complete it synchronously. It's only here to catch exceptions. + // TODO simplify this. + ActionListener.completeWith(metadataListener, () -> { final Metadata existing = repo.getSnapshotGlobalMetadata(entry.source()); final Metadata.Builder metaBuilder = Metadata.builder(existing); final Set existingIndices = new HashSet<>(); @@ -1450,11 +1479,12 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit ); metaBuilder.dataStreams(dataStreamsToCopy, dataStreamAliasesToCopy); return metaBuilder.build(); - })); + }); } else { metadataListener.onResponse(metadata); } metadataListener.addListener(ActionListener.wrap(meta -> { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); final Metadata metaForSnapshot = metadataForSnapshot(entry, meta); final Map indexSnapshotDetails = Maps.newMapWithExpectedSize( @@ -1554,7 +1584,20 @@ public void onFailure(Exception e) { shardGenerations ) )); - } catch (Exception e) { + } + + @Override + public void onRejection(Exception e) { + if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) { + logger.debug("failing finalization of {} due to shutdown", snapshot); + handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY); + } else { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e); assert false : new AssertionError("unexpected failure finalizing " + snapshot, e); handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);