diff --git a/docs/changelog/127330.yaml b/docs/changelog/127330.yaml new file mode 100644 index 0000000000000..059c8185c0060 --- /dev/null +++ b/docs/changelog/127330.yaml @@ -0,0 +1,5 @@ +pr: 127330 +summary: Move unpromotable relocations to its own transport action +area: Recovery +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 56a84fe4d48d0..455c53049747f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; @@ -21,6 +22,8 @@ import org.elasticsearch.action.search.OpenPointInTimeRequest; import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportOpenPointInTimeAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -38,6 +41,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexService; @@ -48,12 +52,18 @@ import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.StatelessUnpromotableRelocationAction; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.test.transport.MockTransportService; @@ -91,7 +101,7 @@ public class ShardRoutingRoleIT extends ESIntegTestCase { private static final Logger logger = LogManager.getLogger(ShardRoutingRoleIT.class); - public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin { + public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin, ActionPlugin { volatile int numIndexingCopies = 1; static final String NODE_ATTR_UNPROMOTABLE_ONLY = "unpromotableonly"; @@ -112,6 +122,61 @@ public ShardRouting.Role newEmptyRole(int copyIndex) { }; } + // This is implemented in stateless, but for the tests we need to provide a simple implementation + public static class TransportTestUnpromotableRelocationAction extends TransportAction< + StatelessUnpromotableRelocationAction.Request, + ActionResponse.Empty> { + + private final IndicesService indicesService; + private final PeerRecoveryTargetService peerRecoveryTargetService; + + @Inject + public TransportTestUnpromotableRelocationAction( + ActionFilters actionFilters, + IndicesService indicesService, + PeerRecoveryTargetService peerRecoveryTargetService, + TransportService transportService + ) { + super( + StatelessUnpromotableRelocationAction.TYPE.name(), + actionFilters, + transportService.getTaskManager(), + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.indicesService = indicesService; + this.peerRecoveryTargetService = peerRecoveryTargetService; + } + + @Override + protected void doExecute( + Task task, + StatelessUnpromotableRelocationAction.Request request, + ActionListener listener + ) { + try (var recoveryRef = peerRecoveryTargetService.getRecoveryRef(request.getRecoveryId(), request.getShardId())) { + final var indexService = indicesService.indexServiceSafe(request.getShardId().getIndex()); + final var indexShard = indexService.getShard(request.getShardId().id()); + final var recoveryTarget = recoveryRef.target(); + final var recoveryState = recoveryTarget.state(); + + ActionListener.completeWith(listener, () -> { + indexShard.prepareForIndexRecovery(); + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + indexShard.openEngineAndSkipTranslogRecovery(); + recoveryState.getIndex().setFileDetailsComplete(); + recoveryState.setStage(RecoveryState.Stage.FINALIZE); + return ActionResponse.Empty.INSTANCE; + }); + } + } + } + + @Override + public Collection getActions() { + return List.of(new ActionHandler(StatelessUnpromotableRelocationAction.TYPE, TransportTestUnpromotableRelocationAction.class)); + } + @Override public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { return List.of(new AllocationDecider() { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 359d084342a74..64a30cde799ae 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -301,18 +301,21 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi if (indexShard.routingEntry().isPromotableToPrimary() == false) { assert preExistingRequest == null; assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false; - ActionListener.run(cleanupOnly.map(v -> { - logger.trace("{} preparing unpromotable shard for recovery", recoveryTarget.shardId()); - indexShard.prepareForIndexRecovery(); - // Skip unnecessary intermediate stages - recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); - recoveryState.setStage(RecoveryState.Stage.TRANSLOG); - indexShard.openEngineAndSkipTranslogRecovery(); - recoveryState.getIndex().setFileDetailsComplete(); - recoveryState.setStage(RecoveryState.Stage.FINALIZE); - onGoingRecoveries.markRecoveryAsDone(recoveryId); - return null; - }), indexShard::preRecovery); + client.execute( + StatelessUnpromotableRelocationAction.TYPE, + new StatelessUnpromotableRelocationAction.Request( + recoveryId, + indexShard.shardId(), + indexShard.routingEntry().allocationId().getId(), + recoveryTarget.clusterStateVersion() + ), + cleanupOnly.delegateFailure((l, unused) -> { + ActionListener.completeWith(l, () -> { + onGoingRecoveries.markRecoveryAsDone(recoveryId); + return null; + }); + }) + ); return; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index 1904c702a3697..95ef0544715ae 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -167,7 +167,6 @@ public RecoveryRef getRecoverySafe(long id, ShardId shardId) { throw new IndexShardClosedException(shardId); } assert recoveryRef.target().shardId().equals(shardId); - assert recoveryRef.target().indexShard().routingEntry().isPromotableToPrimary(); return recoveryRef; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationAction.java b/server/src/main/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationAction.java new file mode 100644 index 0000000000000..3557a6ce0fa97 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationAction.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Objects; + +public class StatelessUnpromotableRelocationAction { + + public static final ActionType TYPE = new ActionType<>( + "internal:index/shard/recovery/stateless_unpromotable_relocation" + ); + + public static class Request extends ActionRequest { + private final long recoveryId; + private final ShardId shardId; + private final String targetAllocationId; + private final long clusterStateVersion; + + public Request(long recoveryId, ShardId shardId, String targetAllocationId, long clusterStateVersion) { + this.recoveryId = recoveryId; + this.shardId = shardId; + this.targetAllocationId = targetAllocationId; + this.clusterStateVersion = clusterStateVersion; + } + + public Request(StreamInput in) throws IOException { + super(in); + recoveryId = in.readVLong(); + shardId = new ShardId(in); + targetAllocationId = in.readString(); + clusterStateVersion = in.readVLong(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(recoveryId); + shardId.writeTo(out); + out.writeString(targetAllocationId); + out.writeVLong(clusterStateVersion); + } + + public long getRecoveryId() { + return recoveryId; + } + + public ShardId getShardId() { + return shardId; + } + + public long getClusterStateVersion() { + return clusterStateVersion; + } + + public String getTargetAllocationId() { + return targetAllocationId; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return recoveryId == request.recoveryId + && clusterStateVersion == request.clusterStateVersion + && Objects.equals(shardId, request.shardId) + && Objects.equals(targetAllocationId, request.targetAllocationId); + } + + @Override + public int hashCode() { + return Objects.hash(recoveryId, shardId, targetAllocationId, clusterStateVersion); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationActionTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationActionTests.java new file mode 100644 index 0000000000000..4ae1f7b311d7e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/recovery/StatelessUnpromotableRelocationActionTests.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardIdTests; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class StatelessUnpromotableRelocationActionTests extends AbstractWireSerializingTestCase< + StatelessUnpromotableRelocationAction.Request> { + @Override + protected Writeable.Reader instanceReader() { + return StatelessUnpromotableRelocationAction.Request::new; + } + + @Override + protected StatelessUnpromotableRelocationAction.Request createTestInstance() { + return new StatelessUnpromotableRelocationAction.Request( + randomNonNegativeLong(), + new ShardId(randomIdentifier(), UUIDs.randomBase64UUID(), randomIntBetween(0, 99)), + randomUUID(), + randomNonNegativeLong() + ); + } + + @Override + protected StatelessUnpromotableRelocationAction.Request mutateInstance(StatelessUnpromotableRelocationAction.Request instance) + throws IOException { + return switch (between(0, 3)) { + case 0 -> new StatelessUnpromotableRelocationAction.Request( + randomValueOtherThan(instance.getRecoveryId(), ESTestCase::randomNonNegativeLong), + instance.getShardId(), + instance.getTargetAllocationId(), + instance.getClusterStateVersion() + ); + case 1 -> new StatelessUnpromotableRelocationAction.Request( + instance.getRecoveryId(), + ShardIdTests.mutate(instance.getShardId()), + instance.getTargetAllocationId(), + instance.getClusterStateVersion() + ); + case 2 -> new StatelessUnpromotableRelocationAction.Request( + instance.getRecoveryId(), + instance.getShardId(), + randomValueOtherThan(instance.getTargetAllocationId(), ESTestCase::randomUUID), + instance.getClusterStateVersion() + ); + case 3 -> new StatelessUnpromotableRelocationAction.Request( + instance.getRecoveryId(), + instance.getShardId(), + instance.getTargetAllocationId(), + randomValueOtherThan(instance.getClusterStateVersion(), ESTestCase::randomNonNegativeLong) + ); + default -> throw new AssertionError("Illegal randomisation branch"); + }; + } +}