Skip to content

Commit 0d57996

Browse files
Extract UpdateSnapshotStatusAction to its own file (#132522)
Pull a Transport*Action out of SnapshotsService in order to simplify and focus the SnapshotsService code. Relates ES-11650
1 parent 6196ef5 commit 0d57996

File tree

12 files changed

+100
-79
lines changed

12 files changed

+100
-79
lines changed

server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2154,7 +2154,7 @@ public void testDeleteIndexWithOutOfOrderFinalization() {
21542154
final Map<String, SubscribableListener<Void>> otherIndexSnapshotListeners = indexNames.stream()
21552155
.collect(Collectors.toMap(k -> k, k -> new SubscribableListener<>()));
21562156
masterTransportService.<UpdateIndexShardSnapshotStatusRequest>addRequestHandlingBehavior(
2157-
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
2157+
TransportUpdateSnapshotStatusAction.NAME,
21582158
(handler, request, channel, task) -> {
21592159
final var indexName = request.shardId().getIndexName();
21602160
if (indexName.equals(indexToDelete)) {

server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,7 @@ public void onRequestSent(
10631063
TransportRequest request,
10641064
TransportRequestOptions finalOptions
10651065
) {
1066-
if (blocked.get() && action.equals(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME)) {
1066+
if (blocked.get() && action.equals(TransportUpdateSnapshotStatusAction.NAME)) {
10671067
throw new AssertionError("Node had no assigned shard snapshots so it shouldn't send out shard state updates");
10681068
}
10691069
}

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void testConsistencyCheckerLogging() {
127127
// allow cluster states to go through normally until we see a shard snapshot update
128128
final var shardUpdateSeen = new AtomicBoolean();
129129
MockTransportService.getInstance(masterNode)
130-
.addRequestHandlingBehavior(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, (handler, request, channel, task) -> {
130+
.addRequestHandlingBehavior(TransportUpdateSnapshotStatusAction.NAME, (handler, request, channel, task) -> {
131131
shardUpdateSeen.set(true);
132132
handler.messageReceived(request, channel, task);
133133
});

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public void testRemoveNodeAndFailoverMasterDuringSnapshot() throws Exception {
207207
final var masterName = internalCluster().getMasterName();
208208
final var masterTransportService = MockTransportService.getInstance(masterName);
209209
masterTransportService.addRequestHandlingBehavior(
210-
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
210+
TransportUpdateSnapshotStatusAction.NAME,
211211
(handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> {
212212
safeAwait(snapshotStatusUpdateBarrier);
213213
safeAwait(snapshotStatusUpdateBarrier);
@@ -405,7 +405,7 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception {
405405
final var updateSnapshotStatusBarrier = new CyclicBarrier(2);
406406
final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
407407
masterTransportService.addRequestHandlingBehavior(
408-
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
408+
TransportUpdateSnapshotStatusAction.NAME,
409409
(handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> {
410410
safeAwait(updateSnapshotStatusBarrier);
411411
safeAwait(updateSnapshotStatusBarrier);
@@ -455,7 +455,7 @@ public void testShutdownWhileSuccessInFlight() throws Exception {
455455
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
456456
final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
457457
masterTransportService.addRequestHandlingBehavior(
458-
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
458+
TransportUpdateSnapshotStatusAction.NAME,
459459
(handler, request, channel, task) -> putShutdownForRemovalMetadata(
460460
clusterService,
461461
primaryNode,
@@ -559,7 +559,7 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
559559
final CountDownLatch snapshotStatusUpdateLatch = new CountDownLatch(1);
560560
final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
561561
masterTransportService.addRequestHandlingBehavior(
562-
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
562+
TransportUpdateSnapshotStatusAction.NAME,
563563
(handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> {
564564
safeAwait(snapshotStatusUpdateLatch);
565565
try {

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@
405405
import org.elasticsearch.rest.action.synonyms.RestGetSynonymsSetsAction;
406406
import org.elasticsearch.rest.action.synonyms.RestPutSynonymRuleAction;
407407
import org.elasticsearch.rest.action.synonyms.RestPutSynonymsAction;
408+
import org.elasticsearch.snapshots.TransportUpdateSnapshotStatusAction;
408409
import org.elasticsearch.tasks.Task;
409410
import org.elasticsearch.telemetry.TelemetryProvider;
410411
import org.elasticsearch.threadpool.ThreadPool;
@@ -672,6 +673,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
672673
actions.register(TransportDeleteSnapshotAction.TYPE, TransportDeleteSnapshotAction.class);
673674
actions.register(TransportCreateSnapshotAction.TYPE, TransportCreateSnapshotAction.class);
674675
actions.register(TransportCloneSnapshotAction.TYPE, TransportCloneSnapshotAction.class);
676+
actions.register(TransportUpdateSnapshotStatusAction.TYPE, TransportUpdateSnapshotStatusAction.class);
675677
actions.register(TransportRestoreSnapshotAction.TYPE, TransportRestoreSnapshotAction.class);
676678
actions.register(TransportSnapshotsStatusAction.TYPE, TransportSnapshotsStatusAction.class);
677679
actions.register(SnapshottableFeaturesAction.INSTANCE, TransportSnapshottableFeaturesAction.class);

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1135,7 +1135,6 @@ public Map<String, String> queryFields() {
11351135
clusterModule.getIndexNameExpressionResolver(),
11361136
repositoriesService,
11371137
transportService,
1138-
actionModule.getActionFilters(),
11391138
systemIndices,
11401139
projectResolver.supportsMultipleProjects()
11411140
);

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -905,7 +905,7 @@ public void onFailure(Exception e) {
905905
updateResultListener,
906906
(req, reqListener) -> transportService.sendRequest(
907907
transportService.getLocalNode(),
908-
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
908+
TransportUpdateSnapshotStatusAction.NAME,
909909
req,
910910
new ActionListenerResponseHandler<>(
911911
reqListener.map(res -> null),

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 4 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,14 @@
1414
import org.apache.logging.log4j.Logger;
1515
import org.elasticsearch.ExceptionsHelper;
1616
import org.elasticsearch.action.ActionListener;
17-
import org.elasticsearch.action.ActionResponse;
1817
import org.elasticsearch.action.ActionRunnable;
1918
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
2019
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
2120
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
22-
import org.elasticsearch.action.support.ActionFilters;
2321
import org.elasticsearch.action.support.ContextPreservingActionListener;
2422
import org.elasticsearch.action.support.GroupedActionListener;
2523
import org.elasticsearch.action.support.RefCountingRunnable;
2624
import org.elasticsearch.action.support.SubscribableListener;
27-
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2825
import org.elasticsearch.cluster.ClusterChangedEvent;
2926
import org.elasticsearch.cluster.ClusterState;
3027
import org.elasticsearch.cluster.ClusterStateApplier;
@@ -37,7 +34,6 @@
3734
import org.elasticsearch.cluster.SnapshotsInProgress;
3835
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
3936
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
40-
import org.elasticsearch.cluster.block.ClusterBlockException;
4137
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
4238
import org.elasticsearch.cluster.metadata.DataStream;
4339
import org.elasticsearch.cluster.metadata.DataStreamAlias;
@@ -93,7 +89,6 @@
9389
import org.elasticsearch.repositories.ShardGeneration;
9490
import org.elasticsearch.repositories.ShardGenerations;
9591
import org.elasticsearch.repositories.ShardSnapshotResult;
96-
import org.elasticsearch.tasks.Task;
9792
import org.elasticsearch.threadpool.ThreadPool;
9893
import org.elasticsearch.transport.TransportService;
9994

@@ -150,8 +145,6 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
150145

151146
private static final Logger logger = LogManager.getLogger(SnapshotsService.class);
152147

153-
public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status";
154-
155148
public static final String NO_FEATURE_STATES_VALUE = "none";
156149

157150
private final ClusterService clusterService;
@@ -180,10 +173,6 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
180173
/** Set of currently initializing clone operations */
181174
private final Set<Snapshot> initializingClones = Collections.synchronizedSet(new HashSet<>());
182175

183-
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;
184-
185-
private final TransportService transportService;
186-
187176
private final OngoingRepositoryOperations repositoryOperations = new OngoingRepositoryOperations();
188177

189178
private final SystemIndices systemIndices;
@@ -216,7 +205,6 @@ public SnapshotsService(
216205
IndexNameExpressionResolver indexNameExpressionResolver,
217206
RepositoriesService repositoriesService,
218207
TransportService transportService,
219-
ActionFilters actionFilters,
220208
SystemIndices systemIndices,
221209
boolean serializeProjectMetadata
222210
) {
@@ -225,10 +213,7 @@ public SnapshotsService(
225213
this.indexNameExpressionResolver = indexNameExpressionResolver;
226214
this.repositoriesService = repositoriesService;
227215
this.threadPool = transportService.getThreadPool();
228-
this.transportService = transportService;
229216

230-
// The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
231-
this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(transportService, clusterService, threadPool, actionFilters);
232217
if (DiscoveryNode.isMasterNode(settings)) {
233218
// addLowPriorityApplier to make sure that Repository will be created before snapshot
234219
clusterService.addLowPriorityApplier(this);
@@ -592,7 +577,7 @@ private void runReadyClone(
592577
repoShardId,
593578
shardStatusBefore.generation(),
594579
ActionListener.wrap(
595-
shardSnapshotResult -> innerUpdateSnapshotState(
580+
shardSnapshotResult -> createAndSubmitRequestToUpdateSnapshotState(
596581
target,
597582
null,
598583
repoShardId,
@@ -613,7 +598,7 @@ private void runReadyClone(
613598
() -> currentlyCloning.remove(repoShardId)
614599
)
615600
),
616-
e -> innerUpdateSnapshotState(
601+
e -> createAndSubmitRequestToUpdateSnapshotState(
617602
target,
618603
null,
619604
repoShardId,
@@ -2256,8 +2241,7 @@ private void addListener(Snapshot snapshot, ActionListener<SnapshotInfo> listene
22562241

22572242
@Override
22582243
protected void doStart() {
2259-
assert this.updateSnapshotStatusHandler != null;
2260-
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
2244+
22612245
}
22622246

22632247
@Override
@@ -2834,7 +2818,7 @@ public String toString() {
28342818
}
28352819
}
28362820

2837-
private void innerUpdateSnapshotState(
2821+
public void createAndSubmitRequestToUpdateSnapshotState(
28382822
Snapshot snapshot,
28392823
ShardId shardId,
28402824
RepositoryShardId repoShardId,
@@ -2896,50 +2880,6 @@ private void startExecutableClones(List<SnapshotsInProgress.Entry> entries) {
28962880
}
28972881
}
28982882

2899-
private class UpdateSnapshotStatusAction extends TransportMasterNodeAction<
2900-
UpdateIndexShardSnapshotStatusRequest,
2901-
ActionResponse.Empty> {
2902-
UpdateSnapshotStatusAction(
2903-
TransportService transportService,
2904-
ClusterService clusterService,
2905-
ThreadPool threadPool,
2906-
ActionFilters actionFilters
2907-
) {
2908-
super(
2909-
UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
2910-
false,
2911-
transportService,
2912-
clusterService,
2913-
threadPool,
2914-
actionFilters,
2915-
UpdateIndexShardSnapshotStatusRequest::new,
2916-
in -> ActionResponse.Empty.INSTANCE,
2917-
EsExecutors.DIRECT_EXECUTOR_SERVICE
2918-
);
2919-
}
2920-
2921-
@Override
2922-
protected void masterOperation(
2923-
Task task,
2924-
UpdateIndexShardSnapshotStatusRequest request,
2925-
ClusterState state,
2926-
ActionListener<ActionResponse.Empty> listener
2927-
) {
2928-
innerUpdateSnapshotState(
2929-
request.snapshot(),
2930-
request.shardId(),
2931-
null,
2932-
request.status(),
2933-
listener.map(v -> ActionResponse.Empty.INSTANCE)
2934-
);
2935-
}
2936-
2937-
@Override
2938-
protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) {
2939-
return null;
2940-
}
2941-
}
2942-
29432883
/**
29442884
* Cluster state update task that removes all {@link SnapshotsInProgress.Entry} and {@link SnapshotDeletionsInProgress.Entry} for a
29452885
* given repository from the cluster state and afterwards fails all relevant listeners in {@link #snapshotCompletionListeners} and
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.snapshots;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.ActionResponse;
14+
import org.elasticsearch.action.ActionType;
15+
import org.elasticsearch.action.support.ActionFilters;
16+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
17+
import org.elasticsearch.cluster.ClusterState;
18+
import org.elasticsearch.cluster.block.ClusterBlockException;
19+
import org.elasticsearch.cluster.service.ClusterService;
20+
import org.elasticsearch.common.util.concurrent.EsExecutors;
21+
import org.elasticsearch.injection.guice.Inject;
22+
import org.elasticsearch.tasks.Task;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
import org.elasticsearch.transport.TransportService;
25+
26+
public final class TransportUpdateSnapshotStatusAction extends TransportMasterNodeAction<
27+
UpdateIndexShardSnapshotStatusRequest,
28+
ActionResponse.Empty> {
29+
public static final String NAME = "internal:cluster/snapshot/update_snapshot_status";
30+
public static final ActionType<ActionResponse.Empty> TYPE = new ActionType<>(NAME);
31+
32+
private final SnapshotsService snapshotsService;
33+
34+
@Inject
35+
public TransportUpdateSnapshotStatusAction(
36+
TransportService transportService,
37+
ClusterService clusterService,
38+
ThreadPool threadPool,
39+
SnapshotsService snapshotsService,
40+
ActionFilters actionFilters
41+
) {
42+
super(
43+
NAME,
44+
false,
45+
transportService,
46+
clusterService,
47+
threadPool,
48+
actionFilters,
49+
UpdateIndexShardSnapshotStatusRequest::new,
50+
in -> ActionResponse.Empty.INSTANCE,
51+
EsExecutors.DIRECT_EXECUTOR_SERVICE
52+
);
53+
this.snapshotsService = snapshotsService;
54+
}
55+
56+
@Override
57+
protected void masterOperation(
58+
Task task,
59+
UpdateIndexShardSnapshotStatusRequest request,
60+
ClusterState state,
61+
ActionListener<ActionResponse.Empty> listener
62+
) {
63+
snapshotsService.createAndSubmitRequestToUpdateSnapshotState(
64+
request.snapshot(),
65+
request.shardId(),
66+
null,
67+
request.status(),
68+
listener.map(v -> ActionResponse.Empty.INSTANCE)
69+
);
70+
}
71+
72+
@Override
73+
protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) {
74+
return null;
75+
}
76+
}

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1050,7 +1050,7 @@ private void sendLocalRequest(long requestId, final String action, final Transpo
10501050
@SuppressWarnings("unchecked")
10511051
final RequestHandlerRegistry<TransportRequest> reg = (RequestHandlerRegistry<TransportRequest>) getRequestHandler(action);
10521052
if (reg == null) {
1053-
assert false : action;
1053+
assert false : "Action [" + action + "] not found";
10541054
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
10551055
}
10561056
final Executor executor = reg.getExecutor();

0 commit comments

Comments
 (0)