From 37dd5246f90f35a3de70da221f013d57595e6151 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 23 Apr 2025 23:39:53 +0200 Subject: [PATCH 1/7] Move unpromotable relocation to its own transport action Relates ES-10339 --- .../recovery/PeerRecoveryTargetService.java | 38 +++++--- .../recovery/RecoveriesCollection.java | 1 - ...StatelessUnpromotableRelocationAction.java | 95 +++++++++++++++++++ ...lessUnpromotableRelocationActionTests.java | 69 ++++++++++++++ 4 files changed, 190 insertions(+), 13 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationAction.java create mode 100644 server/src/test/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationActionTests.java diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 359d084342a74..94af0a8090f91 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -301,18 +301,32 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi if (indexShard.routingEntry().isPromotableToPrimary() == false) { assert preExistingRequest == null; assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false; - ActionListener.run(cleanupOnly.map(v -> { - logger.trace("{} preparing unpromotable shard for recovery", recoveryTarget.shardId()); - indexShard.prepareForIndexRecovery(); - // Skip unnecessary intermediate stages - recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); - recoveryState.setStage(RecoveryState.Stage.TRANSLOG); - indexShard.openEngineAndSkipTranslogRecovery(); - recoveryState.getIndex().setFileDetailsComplete(); - recoveryState.setStage(RecoveryState.Stage.FINALIZE); - onGoingRecoveries.markRecoveryAsDone(recoveryId); - return null; - }), indexShard::preRecovery); + try (onCompletion) { + client.execute( + StatelessUnpromotableRelocationAction.TYPE, + new StatelessUnpromotableRelocationAction.Request( + recoveryId, + indexShard.shardId(), + indexShard.routingEntry().allocationId().getId(), + recoveryTarget.clusterStateVersion() + ), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + onGoingRecoveries.markRecoveryAsDone(recoveryId); + } + + @Override + public void onFailure(Exception e) { + onGoingRecoveries.failRecovery( + recoveryId, + new RecoveryFailedException(recoveryState, "failed to recover unpromotable shard", e), + true + ); + } + } + ); + } return; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index 1904c702a3697..95ef0544715ae 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -167,7 +167,6 @@ public RecoveryRef getRecoverySafe(long id, ShardId shardId) { throw new IndexShardClosedException(shardId); } assert recoveryRef.target().shardId().equals(shardId); - assert recoveryRef.target().indexShard().routingEntry().isPromotableToPrimary(); return recoveryRef; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationAction.java b/server/src/main/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationAction.java new file mode 100644 index 0000000000000..3557a6ce0fa97 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationAction.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Objects; + +public class StatelessUnpromotableRelocationAction { + + public static final ActionType TYPE = new ActionType<>( + "internal:index/shard/recovery/stateless_unpromotable_relocation" + ); + + public static class Request extends ActionRequest { + private final long recoveryId; + private final ShardId shardId; + private final String targetAllocationId; + private final long clusterStateVersion; + + public Request(long recoveryId, ShardId shardId, String targetAllocationId, long clusterStateVersion) { + this.recoveryId = recoveryId; + this.shardId = shardId; + this.targetAllocationId = targetAllocationId; + this.clusterStateVersion = clusterStateVersion; + } + + public Request(StreamInput in) throws IOException { + super(in); + recoveryId = in.readVLong(); + shardId = new ShardId(in); + targetAllocationId = in.readString(); + clusterStateVersion = in.readVLong(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(recoveryId); + shardId.writeTo(out); + out.writeString(targetAllocationId); + out.writeVLong(clusterStateVersion); + } + + public long getRecoveryId() { + return recoveryId; + } + + public ShardId getShardId() { + return shardId; + } + + public long getClusterStateVersion() { + return clusterStateVersion; + } + + public String getTargetAllocationId() { + return targetAllocationId; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return recoveryId == request.recoveryId + && clusterStateVersion == request.clusterStateVersion + && Objects.equals(shardId, request.shardId) + && Objects.equals(targetAllocationId, request.targetAllocationId); + } + + @Override + public int hashCode() { + return Objects.hash(recoveryId, shardId, targetAllocationId, clusterStateVersion); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationActionTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationActionTests.java new file mode 100644 index 0000000000000..4ae1f7b311d7e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationActionTests.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardIdTests; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class StatelessUnpromotableRelocationActionTests extends AbstractWireSerializingTestCase< + StatelessUnpromotableRelocationAction.Request> { + @Override + protected Writeable.Reader instanceReader() { + return StatelessUnpromotableRelocationAction.Request::new; + } + + @Override + protected StatelessUnpromotableRelocationAction.Request createTestInstance() { + return new StatelessUnpromotableRelocationAction.Request( + randomNonNegativeLong(), + new ShardId(randomIdentifier(), UUIDs.randomBase64UUID(), randomIntBetween(0, 99)), + randomUUID(), + randomNonNegativeLong() + ); + } + + @Override + protected StatelessUnpromotableRelocationAction.Request mutateInstance(StatelessUnpromotableRelocationAction.Request instance) + throws IOException { + return switch (between(0, 3)) { + case 0 -> new StatelessUnpromotableRelocationAction.Request( + randomValueOtherThan(instance.getRecoveryId(), ESTestCase::randomNonNegativeLong), + instance.getShardId(), + instance.getTargetAllocationId(), + instance.getClusterStateVersion() + ); + case 1 -> new StatelessUnpromotableRelocationAction.Request( + instance.getRecoveryId(), + ShardIdTests.mutate(instance.getShardId()), + instance.getTargetAllocationId(), + instance.getClusterStateVersion() + ); + case 2 -> new StatelessUnpromotableRelocationAction.Request( + instance.getRecoveryId(), + instance.getShardId(), + randomValueOtherThan(instance.getTargetAllocationId(), ESTestCase::randomUUID), + instance.getClusterStateVersion() + ); + case 3 -> new StatelessUnpromotableRelocationAction.Request( + instance.getRecoveryId(), + instance.getShardId(), + instance.getTargetAllocationId(), + randomValueOtherThan(instance.getClusterStateVersion(), ESTestCase::randomNonNegativeLong) + ); + default -> throw new AssertionError("Illegal randomisation branch"); + }; + } +} From 03bbbe76db62161e8f7098b252f4013c405037a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 24 Apr 2025 16:12:22 +0200 Subject: [PATCH 2/7] Update docs/changelog/127330.yaml --- docs/changelog/127330.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/127330.yaml diff --git a/docs/changelog/127330.yaml b/docs/changelog/127330.yaml new file mode 100644 index 0000000000000..059c8185c0060 --- /dev/null +++ b/docs/changelog/127330.yaml @@ -0,0 +1,5 @@ +pr: 127330 +summary: Move unpromotable relocations to its own transport action +area: Recovery +type: enhancement +issues: [] From 9ef9621e12f490c1e8df53f768a57392df94edf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 24 Apr 2025 17:43:02 +0200 Subject: [PATCH 3/7] Use cleanupOnly listener instead --- .../recovery/PeerRecoveryTargetService.java | 54 ++++++------------- 1 file changed, 17 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 94af0a8090f91..da4e3cd4dfc03 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -333,43 +333,23 @@ public void onFailure(Exception e) { if (indexShard.routingEntry().isSearchable() == false && recoveryState.getPrimary()) { assert preExistingRequest == null; assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false; - try (onCompletion) { - client.execute( - StatelessPrimaryRelocationAction.TYPE, - new StatelessPrimaryRelocationAction.Request( - recoveryId, - indexShard.shardId(), - transportService.getLocalNode(), - indexShard.routingEntry().allocationId().getId(), - recoveryTarget.clusterStateVersion() - ), - new ActionListener<>() { - @Override - public void onResponse(ActionResponse.Empty ignored) { - onGoingRecoveries.markRecoveryAsDone(recoveryId); - } - - @Override - public void onFailure(Exception e) { - final var cause = ExceptionsHelper.unwrapCause(e); - final var sendShardFailure = - // these indicate the source shard has already failed, which will independently notify the master and fail - // the target shard - false == (cause instanceof ShardNotFoundException - || cause instanceof IndexNotFoundException - || cause instanceof AlreadyClosedException); - - // TODO retries? See RecoveryResponseHandler#handleException - onGoingRecoveries.failRecovery( - recoveryId, - new RecoveryFailedException(recoveryState, null, e), - sendShardFailure - ); - } - } - ); - return; - } + client.execute( + StatelessPrimaryRelocationAction.TYPE, + new StatelessPrimaryRelocationAction.Request( + recoveryId, + indexShard.shardId(), + transportService.getLocalNode(), + indexShard.routingEntry().allocationId().getId(), + recoveryTarget.clusterStateVersion() + ), + cleanupOnly.delegateFailure((l, unused) -> { + ActionListener.completeWith(l, () -> { + onGoingRecoveries.markRecoveryAsDone(recoveryId); + return null; + }); + }) + ); + return; } record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {} From 7aa53320076b36de6540e1360f86313f52bb8f49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 24 Apr 2025 19:20:20 +0200 Subject: [PATCH 4/7] Revert "Use cleanupOnly listener instead" This reverts commit 9ef9621e12f490c1e8df53f768a57392df94edf7. --- .../recovery/PeerRecoveryTargetService.java | 54 +++++++++++++------ 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index da4e3cd4dfc03..94af0a8090f91 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -333,23 +333,43 @@ public void onFailure(Exception e) { if (indexShard.routingEntry().isSearchable() == false && recoveryState.getPrimary()) { assert preExistingRequest == null; assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false; - client.execute( - StatelessPrimaryRelocationAction.TYPE, - new StatelessPrimaryRelocationAction.Request( - recoveryId, - indexShard.shardId(), - transportService.getLocalNode(), - indexShard.routingEntry().allocationId().getId(), - recoveryTarget.clusterStateVersion() - ), - cleanupOnly.delegateFailure((l, unused) -> { - ActionListener.completeWith(l, () -> { - onGoingRecoveries.markRecoveryAsDone(recoveryId); - return null; - }); - }) - ); - return; + try (onCompletion) { + client.execute( + StatelessPrimaryRelocationAction.TYPE, + new StatelessPrimaryRelocationAction.Request( + recoveryId, + indexShard.shardId(), + transportService.getLocalNode(), + indexShard.routingEntry().allocationId().getId(), + recoveryTarget.clusterStateVersion() + ), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty ignored) { + onGoingRecoveries.markRecoveryAsDone(recoveryId); + } + + @Override + public void onFailure(Exception e) { + final var cause = ExceptionsHelper.unwrapCause(e); + final var sendShardFailure = + // these indicate the source shard has already failed, which will independently notify the master and fail + // the target shard + false == (cause instanceof ShardNotFoundException + || cause instanceof IndexNotFoundException + || cause instanceof AlreadyClosedException); + + // TODO retries? See RecoveryResponseHandler#handleException + onGoingRecoveries.failRecovery( + recoveryId, + new RecoveryFailedException(recoveryState, null, e), + sendShardFailure + ); + } + } + ); + return; + } } record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {} From c691b69d2f17e12c0ec75d86c5d1eceb3e414ae1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 24 Apr 2025 19:23:58 +0200 Subject: [PATCH 5/7] Use cleanupOnly listener instead --- .../recovery/PeerRecoveryTargetService.java | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 94af0a8090f91..60a1f2555cdcb 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -310,21 +310,12 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.routingEntry().allocationId().getId(), recoveryTarget.clusterStateVersion() ), - new ActionListener<>() { - @Override - public void onResponse(ActionResponse.Empty empty) { + cleanupOnly.delegateFailure((l, unused) -> { + ActionListener.completeWith(l, () -> { onGoingRecoveries.markRecoveryAsDone(recoveryId); - } - - @Override - public void onFailure(Exception e) { - onGoingRecoveries.failRecovery( - recoveryId, - new RecoveryFailedException(recoveryState, "failed to recover unpromotable shard", e), - true - ); - } - } + return null; + }); + }) ); } return; From 93cc8aac86c7d520097a029e7ef0c4a22d9a36ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 24 Apr 2025 19:24:54 +0200 Subject: [PATCH 6/7] More changes... --- .../recovery/PeerRecoveryTargetService.java | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 60a1f2555cdcb..64a30cde799ae 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -301,23 +301,21 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi if (indexShard.routingEntry().isPromotableToPrimary() == false) { assert preExistingRequest == null; assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false; - try (onCompletion) { - client.execute( - StatelessUnpromotableRelocationAction.TYPE, - new StatelessUnpromotableRelocationAction.Request( - recoveryId, - indexShard.shardId(), - indexShard.routingEntry().allocationId().getId(), - recoveryTarget.clusterStateVersion() - ), - cleanupOnly.delegateFailure((l, unused) -> { - ActionListener.completeWith(l, () -> { - onGoingRecoveries.markRecoveryAsDone(recoveryId); - return null; - }); - }) - ); - } + client.execute( + StatelessUnpromotableRelocationAction.TYPE, + new StatelessUnpromotableRelocationAction.Request( + recoveryId, + indexShard.shardId(), + indexShard.routingEntry().allocationId().getId(), + recoveryTarget.clusterStateVersion() + ), + cleanupOnly.delegateFailure((l, unused) -> { + ActionListener.completeWith(l, () -> { + onGoingRecoveries.markRecoveryAsDone(recoveryId); + return null; + }); + }) + ); return; } From 03d52fb3fcf3621bd83c12b630eaaa58f5093695 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 30 Apr 2025 22:35:46 +0200 Subject: [PATCH 7/7] Add unpromotable transport action implementation for tests --- .../cluster/routing/ShardRoutingRoleIT.java | 67 ++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 56a84fe4d48d0..455c53049747f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; @@ -21,6 +22,8 @@ import org.elasticsearch.action.search.OpenPointInTimeRequest; import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportOpenPointInTimeAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -38,6 +41,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexService; @@ -48,12 +52,18 @@ import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.StatelessUnpromotableRelocationAction; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.test.transport.MockTransportService; @@ -91,7 +101,7 @@ public class ShardRoutingRoleIT extends ESIntegTestCase { private static final Logger logger = LogManager.getLogger(ShardRoutingRoleIT.class); - public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin { + public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin, ActionPlugin { volatile int numIndexingCopies = 1; static final String NODE_ATTR_UNPROMOTABLE_ONLY = "unpromotableonly"; @@ -112,6 +122,61 @@ public ShardRouting.Role newEmptyRole(int copyIndex) { }; } + // This is implemented in stateless, but for the tests we need to provide a simple implementation + public static class TransportTestUnpromotableRelocationAction extends TransportAction< + StatelessUnpromotableRelocationAction.Request, + ActionResponse.Empty> { + + private final IndicesService indicesService; + private final PeerRecoveryTargetService peerRecoveryTargetService; + + @Inject + public TransportTestUnpromotableRelocationAction( + ActionFilters actionFilters, + IndicesService indicesService, + PeerRecoveryTargetService peerRecoveryTargetService, + TransportService transportService + ) { + super( + StatelessUnpromotableRelocationAction.TYPE.name(), + actionFilters, + transportService.getTaskManager(), + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.indicesService = indicesService; + this.peerRecoveryTargetService = peerRecoveryTargetService; + } + + @Override + protected void doExecute( + Task task, + StatelessUnpromotableRelocationAction.Request request, + ActionListener listener + ) { + try (var recoveryRef = peerRecoveryTargetService.getRecoveryRef(request.getRecoveryId(), request.getShardId())) { + final var indexService = indicesService.indexServiceSafe(request.getShardId().getIndex()); + final var indexShard = indexService.getShard(request.getShardId().id()); + final var recoveryTarget = recoveryRef.target(); + final var recoveryState = recoveryTarget.state(); + + ActionListener.completeWith(listener, () -> { + indexShard.prepareForIndexRecovery(); + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + indexShard.openEngineAndSkipTranslogRecovery(); + recoveryState.getIndex().setFileDetailsComplete(); + recoveryState.setStage(RecoveryState.Stage.FINALIZE); + return ActionResponse.Empty.INSTANCE; + }); + } + } + } + + @Override + public Collection getActions() { + return List.of(new ActionHandler(StatelessUnpromotableRelocationAction.TYPE, TransportTestUnpromotableRelocationAction.class)); + } + @Override public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { return List.of(new AllocationDecider() {