Skip to content

Commit 88eeb8a

Browse files
authored
Refactor SnapshotInfo dataflow in finalization (#124336)
There's no need to have a `SnapshotInfo` consumer to run at the end of finalization, we only pass it the value we already calculated earlier. This replaces it with a bare `Runnable` instead.
1 parent 7949f87 commit 88eeb8a

File tree

8 files changed

+21
-34
lines changed

8 files changed

+21
-34
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,10 +376,10 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
376376
finalizeSnapshotContext.snapshotInfo(),
377377
finalizeSnapshotContext.repositoryMetaVersion(),
378378
wrapWithWeakConsistencyProtection(ActionListener.runAfter(finalizeSnapshotContext, () -> metadataDone.onResponse(null))),
379-
info -> metadataDone.addListener(new ActionListener<>() {
379+
() -> metadataDone.addListener(new ActionListener<>() {
380380
@Override
381381
public void onResponse(Void unused) {
382-
finalizeSnapshotContext.onDone(info);
382+
finalizeSnapshotContext.onDone();
383383
}
384384

385385
@Override

server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import java.util.Map;
2323
import java.util.Set;
24-
import java.util.function.Consumer;
2524

2625
/**
2726
* Context for finalizing a snapshot.
@@ -44,7 +43,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
4443

4544
private final IndexVersion repositoryMetaVersion;
4645

47-
private final Consumer<SnapshotInfo> onDone;
46+
private final Runnable onDone;
4847

4948
/**
5049
* @param updatedShardGenerations updated shard generations
@@ -64,7 +63,7 @@ public FinalizeSnapshotContext(
6463
SnapshotInfo snapshotInfo,
6564
IndexVersion repositoryMetaVersion,
6665
ActionListener<RepositoryData> listener,
67-
Consumer<SnapshotInfo> onDone
66+
Runnable onDone
6867
) {
6968
super(listener);
7069
this.updatedShardGenerations = updatedShardGenerations;
@@ -113,8 +112,8 @@ public ClusterState updatedClusterState(ClusterState state) {
113112
return updatedState;
114113
}
115114

116-
public void onDone(SnapshotInfo snapshotInfo) {
117-
onDone.accept(snapshotInfo);
115+
public void onDone() {
116+
onDone.run();
118117
}
119118

120119
@Override

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1894,7 +1894,6 @@ public String toString() {
18941894
rootBlobUpdateResult.oldRepositoryData(),
18951895
rootBlobUpdateResult.newRepositoryData(),
18961896
finalizeSnapshotContext,
1897-
snapshotInfo,
18981897
writeShardGens
18991898
);
19001899
})
@@ -1913,7 +1912,6 @@ private void cleanupOldMetadata(
19131912
RepositoryData existingRepositoryData,
19141913
RepositoryData updatedRepositoryData,
19151914
FinalizeSnapshotContext finalizeSnapshotContext,
1916-
SnapshotInfo snapshotInfo,
19171915
boolean writeShardGenerations
19181916
) {
19191917
final Set<String> toDelete = new HashSet<>();
@@ -1962,11 +1960,11 @@ public void onFailure(Exception e) {
19621960

19631961
@Override
19641962
public void onAfter() {
1965-
finalizeSnapshotContext.onDone(snapshotInfo);
1963+
finalizeSnapshotContext.onDone();
19661964
}
19671965
});
19681966
} else {
1969-
finalizeSnapshotContext.onDone(snapshotInfo);
1967+
finalizeSnapshotContext.onDone();
19701968
}
19711969
}
19721970

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,11 +1529,11 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
15291529
shardGenerations
15301530
)
15311531
),
1532-
snInfo -> snapshotListeners.addListener(new ActionListener<>() {
1532+
() -> snapshotListeners.addListener(new ActionListener<>() {
15331533
@Override
15341534
public void onResponse(List<ActionListener<SnapshotInfo>> actionListeners) {
1535-
completeListenersIgnoringException(actionListeners, snInfo);
1536-
logger.info("snapshot [{}] completed with state [{}]", snapshot, snInfo.state());
1535+
completeListenersIgnoringException(actionListeners, snapshotInfo);
1536+
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
15371537
}
15381538

15391539
@Override

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public void testSnapshotWithConflictingName() throws Exception {
193193
),
194194
IndexVersion.current(),
195195
listener,
196-
info -> {}
196+
() -> {}
197197
)
198198
)
199199
);

test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map<St
551551
snapshotInfo,
552552
SnapshotsService.OLD_SNAPSHOT_FORMAT,
553553
listener,
554-
info -> {}
554+
() -> {}
555555
)
556556
)
557557
);

x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.apache.lucene.search.TopDocs;
2323
import org.apache.lucene.util.Bits;
2424
import org.elasticsearch.ExceptionsHelper;
25-
import org.elasticsearch.action.ActionListener;
2625
import org.elasticsearch.action.index.IndexRequest;
26+
import org.elasticsearch.action.support.ActionTestUtils;
2727
import org.elasticsearch.action.support.PlainActionFuture;
2828
import org.elasticsearch.cluster.metadata.IndexMetadata;
2929
import org.elasticsearch.cluster.metadata.MappingMetadata;
@@ -68,7 +68,6 @@
6868
import org.elasticsearch.repositories.FinalizeSnapshotContext;
6969
import org.elasticsearch.repositories.IndexId;
7070
import org.elasticsearch.repositories.Repository;
71-
import org.elasticsearch.repositories.RepositoryData;
7271
import org.elasticsearch.repositories.ShardGeneration;
7372
import org.elasticsearch.repositories.ShardGenerations;
7473
import org.elasticsearch.repositories.ShardSnapshotResult;
@@ -87,6 +86,7 @@
8786
import java.nio.file.Path;
8887
import java.util.Collections;
8988
import java.util.concurrent.Callable;
89+
import java.util.concurrent.CountDownLatch;
9090
import java.util.concurrent.ExecutionException;
9191
import java.util.stream.Collectors;
9292

@@ -356,7 +356,7 @@ public void testRestoreMinimal() throws IOException {
356356
)
357357
);
358358
future.actionGet();
359-
final PlainActionFuture<SnapshotInfo> finFuture = new PlainActionFuture<>();
359+
final CountDownLatch finishedLatch = new CountDownLatch(2);
360360
final ShardGenerations shardGenerations = ShardGenerations.builder()
361361
.put(indexId, 0, indexShardSnapshotStatus.generation())
362362
.build();
@@ -380,21 +380,11 @@ public void testRestoreMinimal() throws IOException {
380380
Collections.emptyMap()
381381
),
382382
IndexVersion.current(),
383-
new ActionListener<>() {
384-
@Override
385-
public void onResponse(RepositoryData repositoryData) {
386-
// nothing will resolve in the onDone callback below
387-
}
388-
389-
@Override
390-
public void onFailure(Exception e) {
391-
finFuture.onFailure(e);
392-
}
393-
},
394-
finFuture::onResponse
383+
ActionTestUtils.assertNoFailureListener(ignored -> finishedLatch.countDown()),
384+
finishedLatch::countDown
395385
)
396386
);
397-
finFuture.actionGet();
387+
safeAwait(finishedLatch);
398388
});
399389
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
400390
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());

x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMStatDisruptionIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,11 @@ public void finalizeSnapshot(FinalizeSnapshotContext fsc) {
244244
fsc.snapshotInfo(),
245245
fsc.repositoryMetaVersion(),
246246
fsc,
247-
snapshotInfo -> {
247+
() -> {
248248
// run the passed lambda before calling the usual callback
249249
// this is where the cluster can be restarted before SLM is called back with the snapshotInfo
250250
beforeResponseRunnable.run();
251-
fsc.onDone(snapshotInfo);
251+
fsc.onDone();
252252
}
253253
);
254254
super.finalizeSnapshot(newFinalizeContext);

0 commit comments

Comments
 (0)