diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index ae45491f69195..587d16cb4db41 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -2154,7 +2154,7 @@ public void testDeleteIndexWithOutOfOrderFinalization() { final Map> otherIndexSnapshotListeners = indexNames.stream() .collect(Collectors.toMap(k -> k, k -> new SubscribableListener<>())); masterTransportService.addRequestHandlingBehavior( - SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + TransportUpdateSnapshotStatusAction.NAME, (handler, request, channel, task) -> { final var indexName = request.shardId().getIndexName(); if (indexName.equals(indexToDelete)) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index f052af8b8a651..96a194e54b1d5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -1063,7 +1063,7 @@ public void onRequestSent( TransportRequest request, TransportRequestOptions finalOptions ) { - if (blocked.get() && action.equals(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME)) { + if (blocked.get() && action.equals(TransportUpdateSnapshotStatusAction.NAME)) { throw new AssertionError("Node had no assigned shard snapshots so it shouldn't send out shard state updates"); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java index d14d543da05eb..1641d03970826 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java @@ -127,7 +127,7 @@ public void testConsistencyCheckerLogging() { // allow cluster states to go through normally until we see a shard snapshot update final var shardUpdateSeen = new AtomicBoolean(); MockTransportService.getInstance(masterNode) - .addRequestHandlingBehavior(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, (handler, request, channel, task) -> { + .addRequestHandlingBehavior(TransportUpdateSnapshotStatusAction.NAME, (handler, request, channel, task) -> { shardUpdateSeen.set(true); handler.messageReceived(request, channel, task); }); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java index c1d9977993b49..e8109fe29839f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java @@ -207,7 +207,7 @@ public void testRemoveNodeAndFailoverMasterDuringSnapshot() throws Exception { final var masterName = internalCluster().getMasterName(); final var masterTransportService = MockTransportService.getInstance(masterName); masterTransportService.addRequestHandlingBehavior( - SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + TransportUpdateSnapshotStatusAction.NAME, (handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> { safeAwait(snapshotStatusUpdateBarrier); safeAwait(snapshotStatusUpdateBarrier); @@ -405,7 +405,7 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception { final var updateSnapshotStatusBarrier = new CyclicBarrier(2); final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName()); masterTransportService.addRequestHandlingBehavior( - SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + TransportUpdateSnapshotStatusAction.NAME, (handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> { safeAwait(updateSnapshotStatusBarrier); safeAwait(updateSnapshotStatusBarrier); @@ -455,7 +455,7 @@ public void testShutdownWhileSuccessInFlight() throws Exception { final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName()); masterTransportService.addRequestHandlingBehavior( - SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + TransportUpdateSnapshotStatusAction.NAME, (handler, request, channel, task) -> putShutdownForRemovalMetadata( clusterService, primaryNode, @@ -559,7 +559,7 @@ public void testSnapshotShutdownProgressTracker() throws Exception { final CountDownLatch snapshotStatusUpdateLatch = new CountDownLatch(1); final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName()); masterTransportService.addRequestHandlingBehavior( - SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + TransportUpdateSnapshotStatusAction.NAME, (handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> { safeAwait(snapshotStatusUpdateLatch); try { diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 51091c5f0d886..7a3bbfdf2609d 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -405,6 +405,7 @@ import org.elasticsearch.rest.action.synonyms.RestGetSynonymsSetsAction; import org.elasticsearch.rest.action.synonyms.RestPutSynonymRuleAction; import org.elasticsearch.rest.action.synonyms.RestPutSynonymsAction; +import org.elasticsearch.snapshots.TransportUpdateSnapshotStatusAction; import org.elasticsearch.tasks.Task; import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.threadpool.ThreadPool; @@ -672,6 +673,7 @@ public void reg actions.register(TransportDeleteSnapshotAction.TYPE, TransportDeleteSnapshotAction.class); actions.register(TransportCreateSnapshotAction.TYPE, TransportCreateSnapshotAction.class); actions.register(TransportCloneSnapshotAction.TYPE, TransportCloneSnapshotAction.class); + actions.register(TransportUpdateSnapshotStatusAction.TYPE, TransportUpdateSnapshotStatusAction.class); actions.register(TransportRestoreSnapshotAction.TYPE, TransportRestoreSnapshotAction.class); actions.register(TransportSnapshotsStatusAction.TYPE, TransportSnapshotsStatusAction.class); actions.register(SnapshottableFeaturesAction.INSTANCE, TransportSnapshottableFeaturesAction.class); diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index d2fd9b8ad0b58..7f31bc2264458 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -1135,7 +1135,6 @@ public Map queryFields() { clusterModule.getIndexNameExpressionResolver(), repositoriesService, transportService, - actionModule.getActionFilters(), systemIndices, projectResolver.supportsMultipleProjects() ); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index c4de1c10f0cca..1eb5ede5848a2 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -905,7 +905,7 @@ public void onFailure(Exception e) { updateResultListener, (req, reqListener) -> transportService.sendRequest( transportService.getLocalNode(), - SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + TransportUpdateSnapshotStatusAction.NAME, req, new ActionListenerResponseHandler<>( reqListener.map(res -> null), diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7c64eeefb77c2..66472cbd39cfb 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -14,17 +14,14 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; -import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.action.support.SubscribableListener; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -37,7 +34,6 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; -import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamAlias; @@ -93,7 +89,6 @@ import org.elasticsearch.repositories.ShardGeneration; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.ShardSnapshotResult; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -150,8 +145,6 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement private static final Logger logger = LogManager.getLogger(SnapshotsService.class); - public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; - public static final String NO_FEATURE_STATES_VALUE = "none"; private final ClusterService clusterService; @@ -180,10 +173,6 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement /** Set of currently initializing clone operations */ private final Set initializingClones = Collections.synchronizedSet(new HashSet<>()); - private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; - - private final TransportService transportService; - private final OngoingRepositoryOperations repositoryOperations = new OngoingRepositoryOperations(); private final SystemIndices systemIndices; @@ -216,7 +205,6 @@ public SnapshotsService( IndexNameExpressionResolver indexNameExpressionResolver, RepositoriesService repositoriesService, TransportService transportService, - ActionFilters actionFilters, SystemIndices systemIndices, boolean serializeProjectMetadata ) { @@ -225,10 +213,7 @@ public SnapshotsService( this.indexNameExpressionResolver = indexNameExpressionResolver; this.repositoriesService = repositoriesService; this.threadPool = transportService.getThreadPool(); - this.transportService = transportService; - // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. - this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(transportService, clusterService, threadPool, actionFilters); if (DiscoveryNode.isMasterNode(settings)) { // addLowPriorityApplier to make sure that Repository will be created before snapshot clusterService.addLowPriorityApplier(this); @@ -592,7 +577,7 @@ private void runReadyClone( repoShardId, shardStatusBefore.generation(), ActionListener.wrap( - shardSnapshotResult -> innerUpdateSnapshotState( + shardSnapshotResult -> createAndSubmitRequestToUpdateSnapshotState( target, null, repoShardId, @@ -613,7 +598,7 @@ private void runReadyClone( () -> currentlyCloning.remove(repoShardId) ) ), - e -> innerUpdateSnapshotState( + e -> createAndSubmitRequestToUpdateSnapshotState( target, null, repoShardId, @@ -2256,8 +2241,7 @@ private void addListener(Snapshot snapshot, ActionListener listene @Override protected void doStart() { - assert this.updateSnapshotStatusHandler != null; - assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; + } @Override @@ -2834,7 +2818,7 @@ public String toString() { } } - private void innerUpdateSnapshotState( + public void createAndSubmitRequestToUpdateSnapshotState( Snapshot snapshot, ShardId shardId, RepositoryShardId repoShardId, @@ -2896,50 +2880,6 @@ private void startExecutableClones(List entries) { } } - private class UpdateSnapshotStatusAction extends TransportMasterNodeAction< - UpdateIndexShardSnapshotStatusRequest, - ActionResponse.Empty> { - UpdateSnapshotStatusAction( - TransportService transportService, - ClusterService clusterService, - ThreadPool threadPool, - ActionFilters actionFilters - ) { - super( - UPDATE_SNAPSHOT_STATUS_ACTION_NAME, - false, - transportService, - clusterService, - threadPool, - actionFilters, - UpdateIndexShardSnapshotStatusRequest::new, - in -> ActionResponse.Empty.INSTANCE, - EsExecutors.DIRECT_EXECUTOR_SERVICE - ); - } - - @Override - protected void masterOperation( - Task task, - UpdateIndexShardSnapshotStatusRequest request, - ClusterState state, - ActionListener listener - ) { - innerUpdateSnapshotState( - request.snapshot(), - request.shardId(), - null, - request.status(), - listener.map(v -> ActionResponse.Empty.INSTANCE) - ); - } - - @Override - protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) { - return null; - } - } - /** * Cluster state update task that removes all {@link SnapshotsInProgress.Entry} and {@link SnapshotDeletionsInProgress.Entry} for a * given repository from the cluster state and afterwards fails all relevant listeners in {@link #snapshotCompletionListeners} and diff --git a/server/src/main/java/org/elasticsearch/snapshots/TransportUpdateSnapshotStatusAction.java b/server/src/main/java/org/elasticsearch/snapshots/TransportUpdateSnapshotStatusAction.java new file mode 100644 index 0000000000000..192151a0371be --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/TransportUpdateSnapshotStatusAction.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +public final class TransportUpdateSnapshotStatusAction extends TransportMasterNodeAction< + UpdateIndexShardSnapshotStatusRequest, + ActionResponse.Empty> { + public static final String NAME = "internal:cluster/snapshot/update_snapshot_status"; + public static final ActionType TYPE = new ActionType<>(NAME); + + private final SnapshotsService snapshotsService; + + @Inject + public TransportUpdateSnapshotStatusAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + SnapshotsService snapshotsService, + ActionFilters actionFilters + ) { + super( + NAME, + false, + transportService, + clusterService, + threadPool, + actionFilters, + UpdateIndexShardSnapshotStatusRequest::new, + in -> ActionResponse.Empty.INSTANCE, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.snapshotsService = snapshotsService; + } + + @Override + protected void masterOperation( + Task task, + UpdateIndexShardSnapshotStatusRequest request, + ClusterState state, + ActionListener listener + ) { + snapshotsService.createAndSubmitRequestToUpdateSnapshotState( + request.snapshot(), + request.shardId(), + null, + request.status(), + listener.map(v -> ActionResponse.Empty.INSTANCE) + ); + } + + @Override + protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) { + return null; + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index a23a6f3367351..e44291eacbc06 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1050,7 +1050,7 @@ private void sendLocalRequest(long requestId, final String action, final Transpo @SuppressWarnings("unchecked") final RequestHandlerRegistry reg = (RequestHandlerRegistry) getRequestHandler(action); if (reg == null) { - assert false : action; + assert false : "Action [" + action + "] not found"; throw new ActionNotFoundTransportException("Action [" + action + "] not found"); } final Executor executor = reg.getExecutor(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 5f51bca466857..832f4d5f890bb 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1287,7 +1287,7 @@ public void testSnapshotCompletedByNodeLeft() { // A transport interceptor that throttles the shard snapshot status updates to run one at a time, for more interesting interleavings final TransportInterceptor throttlingInterceptor = new TransportInterceptor() { private final ThrottledTaskRunner runner = new ThrottledTaskRunner( - SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME + "-throttle", + TransportUpdateSnapshotStatusAction.NAME + "-throttle", 1, SnapshotResiliencyTests.this::scheduleNow ); @@ -1299,7 +1299,7 @@ public TransportRequestHandler interceptHandler( boolean forceExecution, TransportRequestHandler actualHandler ) { - if (action.equals(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME)) { + if (action.equals(TransportUpdateSnapshotStatusAction.NAME)) { return (request, channel, task) -> ActionListener.run( new ChannelActionListener<>(channel), l -> runner.enqueueTask( @@ -1456,7 +1456,7 @@ public TransportRequestHandler interceptHandler( boolean forceExecution, TransportRequestHandler actualHandler ) { - if (action.equals(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME)) { + if (action.equals(TransportUpdateSnapshotStatusAction.NAME)) { return (request, channel, task) -> ActionListener.run( ActionTestUtils.assertNoFailureListener(new ChannelActionListener<>(channel)::onResponse), l -> { @@ -2413,7 +2413,6 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { client, List.of() ); - final ActionFilters actionFilters = new ActionFilters(emptySet()); snapshotsService = new SnapshotsService( settings, clusterService, @@ -2421,7 +2420,6 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { indexNameExpressionResolver, repositoriesService, transportService, - actionFilters, EmptySystemIndices.INSTANCE, false ); @@ -2510,7 +2508,12 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { threadPool ); nodeConnectionsService = new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService); + final ActionFilters actionFilters = new ActionFilters(emptySet()); Map, TransportAction> actions = new HashMap<>(); + actions.put( + TransportUpdateSnapshotStatusAction.TYPE, + new TransportUpdateSnapshotStatusAction(transportService, clusterService, threadPool, snapshotsService, actionFilters) + ); actions.put( GlobalCheckpointSyncAction.TYPE, new GlobalCheckpointSyncAction( diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index dbf062932e750..233c7278eed98 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -638,6 +638,7 @@ public class Constants { "internal:cluster/master_history/get", "internal:cluster/coordination_diagnostics/info", "internal:cluster/formation/info", + "internal:cluster/snapshot/update_snapshot_status", "internal:gateway/local/started_shards", "internal:admin/indices/prevalidate_shard_path", "internal:index/metadata/migration_version/update",