Skip to content

Commit c5c3615

Browse files
authored
Move unpromotable relocations to its own transport action (elastic#127330)
Relates ES-10339
1 parent 898dd47 commit c5c3615

File tree

6 files changed

+250
-14
lines changed

6 files changed

+250
-14
lines changed

docs/changelog/127330.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127330
2+
summary: Move unpromotable relocations to its own transport action
3+
area: Recovery
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.ActionResponse;
1516
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
1617
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
@@ -21,6 +22,8 @@
2122
import org.elasticsearch.action.search.OpenPointInTimeRequest;
2223
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
2324
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
25+
import org.elasticsearch.action.support.ActionFilters;
26+
import org.elasticsearch.action.support.TransportAction;
2427
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
2528
import org.elasticsearch.cluster.ClusterChangedEvent;
2629
import org.elasticsearch.cluster.ClusterState;
@@ -38,6 +41,7 @@
3841
import org.elasticsearch.common.settings.ClusterSettings;
3942
import org.elasticsearch.common.settings.Settings;
4043
import org.elasticsearch.common.util.CollectionUtils;
44+
import org.elasticsearch.common.util.concurrent.EsExecutors;
4145
import org.elasticsearch.core.Nullable;
4246
import org.elasticsearch.core.TimeValue;
4347
import org.elasticsearch.index.IndexService;
@@ -48,12 +52,18 @@
4852
import org.elasticsearch.index.engine.NoOpEngine;
4953
import org.elasticsearch.index.shard.IndexShard;
5054
import org.elasticsearch.indices.IndicesService;
55+
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
56+
import org.elasticsearch.indices.recovery.RecoveryState;
57+
import org.elasticsearch.indices.recovery.StatelessUnpromotableRelocationAction;
58+
import org.elasticsearch.injection.guice.Inject;
59+
import org.elasticsearch.plugins.ActionPlugin;
5160
import org.elasticsearch.plugins.ClusterPlugin;
5261
import org.elasticsearch.plugins.EnginePlugin;
5362
import org.elasticsearch.plugins.Plugin;
5463
import org.elasticsearch.plugins.PluginsService;
5564
import org.elasticsearch.search.builder.PointInTimeBuilder;
5665
import org.elasticsearch.snapshots.SnapshotState;
66+
import org.elasticsearch.tasks.Task;
5767
import org.elasticsearch.test.ESIntegTestCase;
5868
import org.elasticsearch.test.XContentTestUtils;
5969
import org.elasticsearch.test.transport.MockTransportService;
@@ -91,7 +101,7 @@ public class ShardRoutingRoleIT extends ESIntegTestCase {
91101

92102
private static final Logger logger = LogManager.getLogger(ShardRoutingRoleIT.class);
93103

94-
public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin {
104+
public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin, ActionPlugin {
95105

96106
volatile int numIndexingCopies = 1;
97107
static final String NODE_ATTR_UNPROMOTABLE_ONLY = "unpromotableonly";
@@ -112,6 +122,61 @@ public ShardRouting.Role newEmptyRole(int copyIndex) {
112122
};
113123
}
114124

125+
// This is implemented in stateless, but for the tests we need to provide a simple implementation
126+
public static class TransportTestUnpromotableRelocationAction extends TransportAction<
127+
StatelessUnpromotableRelocationAction.Request,
128+
ActionResponse.Empty> {
129+
130+
private final IndicesService indicesService;
131+
private final PeerRecoveryTargetService peerRecoveryTargetService;
132+
133+
@Inject
134+
public TransportTestUnpromotableRelocationAction(
135+
ActionFilters actionFilters,
136+
IndicesService indicesService,
137+
PeerRecoveryTargetService peerRecoveryTargetService,
138+
TransportService transportService
139+
) {
140+
super(
141+
StatelessUnpromotableRelocationAction.TYPE.name(),
142+
actionFilters,
143+
transportService.getTaskManager(),
144+
EsExecutors.DIRECT_EXECUTOR_SERVICE
145+
);
146+
this.indicesService = indicesService;
147+
this.peerRecoveryTargetService = peerRecoveryTargetService;
148+
}
149+
150+
@Override
151+
protected void doExecute(
152+
Task task,
153+
StatelessUnpromotableRelocationAction.Request request,
154+
ActionListener<ActionResponse.Empty> listener
155+
) {
156+
try (var recoveryRef = peerRecoveryTargetService.getRecoveryRef(request.getRecoveryId(), request.getShardId())) {
157+
final var indexService = indicesService.indexServiceSafe(request.getShardId().getIndex());
158+
final var indexShard = indexService.getShard(request.getShardId().id());
159+
final var recoveryTarget = recoveryRef.target();
160+
final var recoveryState = recoveryTarget.state();
161+
162+
ActionListener.completeWith(listener, () -> {
163+
indexShard.prepareForIndexRecovery();
164+
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
165+
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
166+
indexShard.openEngineAndSkipTranslogRecovery();
167+
recoveryState.getIndex().setFileDetailsComplete();
168+
recoveryState.setStage(RecoveryState.Stage.FINALIZE);
169+
return ActionResponse.Empty.INSTANCE;
170+
});
171+
}
172+
}
173+
}
174+
175+
@Override
176+
public Collection<ActionHandler> getActions() {
177+
return List.of(new ActionHandler(StatelessUnpromotableRelocationAction.TYPE, TransportTestUnpromotableRelocationAction.class));
178+
}
179+
115180
@Override
116181
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
117182
return List.of(new AllocationDecider() {

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -301,18 +301,21 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
301301
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
302302
assert preExistingRequest == null;
303303
assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false;
304-
ActionListener.run(cleanupOnly.map(v -> {
305-
logger.trace("{} preparing unpromotable shard for recovery", recoveryTarget.shardId());
306-
indexShard.prepareForIndexRecovery();
307-
// Skip unnecessary intermediate stages
308-
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
309-
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
310-
indexShard.openEngineAndSkipTranslogRecovery();
311-
recoveryState.getIndex().setFileDetailsComplete();
312-
recoveryState.setStage(RecoveryState.Stage.FINALIZE);
313-
onGoingRecoveries.markRecoveryAsDone(recoveryId);
314-
return null;
315-
}), indexShard::preRecovery);
304+
client.execute(
305+
StatelessUnpromotableRelocationAction.TYPE,
306+
new StatelessUnpromotableRelocationAction.Request(
307+
recoveryId,
308+
indexShard.shardId(),
309+
indexShard.routingEntry().allocationId().getId(),
310+
recoveryTarget.clusterStateVersion()
311+
),
312+
cleanupOnly.delegateFailure((l, unused) -> {
313+
ActionListener.completeWith(l, () -> {
314+
onGoingRecoveries.markRecoveryAsDone(recoveryId);
315+
return null;
316+
});
317+
})
318+
);
316319
return;
317320
}
318321

server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ public RecoveryRef getRecoverySafe(long id, ShardId shardId) {
167167
throw new IndexShardClosedException(shardId);
168168
}
169169
assert recoveryRef.target().shardId().equals(shardId);
170-
assert recoveryRef.target().indexShard().routingEntry().isPromotableToPrimary();
171170
return recoveryRef;
172171
}
173172

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.indices.recovery;
11+
12+
import org.elasticsearch.action.ActionRequest;
13+
import org.elasticsearch.action.ActionRequestValidationException;
14+
import org.elasticsearch.action.ActionResponse;
15+
import org.elasticsearch.action.ActionType;
16+
import org.elasticsearch.common.io.stream.StreamInput;
17+
import org.elasticsearch.common.io.stream.StreamOutput;
18+
import org.elasticsearch.index.shard.ShardId;
19+
20+
import java.io.IOException;
21+
import java.util.Objects;
22+
23+
public class StatelessUnpromotableRelocationAction {
24+
25+
public static final ActionType<ActionResponse.Empty> TYPE = new ActionType<>(
26+
"internal:index/shard/recovery/stateless_unpromotable_relocation"
27+
);
28+
29+
public static class Request extends ActionRequest {
30+
private final long recoveryId;
31+
private final ShardId shardId;
32+
private final String targetAllocationId;
33+
private final long clusterStateVersion;
34+
35+
public Request(long recoveryId, ShardId shardId, String targetAllocationId, long clusterStateVersion) {
36+
this.recoveryId = recoveryId;
37+
this.shardId = shardId;
38+
this.targetAllocationId = targetAllocationId;
39+
this.clusterStateVersion = clusterStateVersion;
40+
}
41+
42+
public Request(StreamInput in) throws IOException {
43+
super(in);
44+
recoveryId = in.readVLong();
45+
shardId = new ShardId(in);
46+
targetAllocationId = in.readString();
47+
clusterStateVersion = in.readVLong();
48+
}
49+
50+
@Override
51+
public ActionRequestValidationException validate() {
52+
return null;
53+
}
54+
55+
@Override
56+
public void writeTo(StreamOutput out) throws IOException {
57+
super.writeTo(out);
58+
out.writeVLong(recoveryId);
59+
shardId.writeTo(out);
60+
out.writeString(targetAllocationId);
61+
out.writeVLong(clusterStateVersion);
62+
}
63+
64+
public long getRecoveryId() {
65+
return recoveryId;
66+
}
67+
68+
public ShardId getShardId() {
69+
return shardId;
70+
}
71+
72+
public long getClusterStateVersion() {
73+
return clusterStateVersion;
74+
}
75+
76+
public String getTargetAllocationId() {
77+
return targetAllocationId;
78+
}
79+
80+
@Override
81+
public boolean equals(Object o) {
82+
if (o == null || getClass() != o.getClass()) return false;
83+
Request request = (Request) o;
84+
return recoveryId == request.recoveryId
85+
&& clusterStateVersion == request.clusterStateVersion
86+
&& Objects.equals(shardId, request.shardId)
87+
&& Objects.equals(targetAllocationId, request.targetAllocationId);
88+
}
89+
90+
@Override
91+
public int hashCode() {
92+
return Objects.hash(recoveryId, shardId, targetAllocationId, clusterStateVersion);
93+
}
94+
}
95+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.indices.recovery;
11+
12+
import org.elasticsearch.common.UUIDs;
13+
import org.elasticsearch.common.io.stream.Writeable;
14+
import org.elasticsearch.index.shard.ShardId;
15+
import org.elasticsearch.index.shard.ShardIdTests;
16+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
17+
import org.elasticsearch.test.ESTestCase;
18+
19+
import java.io.IOException;
20+
21+
public class StatelessUnpromotableRelocationActionTests extends AbstractWireSerializingTestCase<
22+
StatelessUnpromotableRelocationAction.Request> {
23+
@Override
24+
protected Writeable.Reader<StatelessUnpromotableRelocationAction.Request> instanceReader() {
25+
return StatelessUnpromotableRelocationAction.Request::new;
26+
}
27+
28+
@Override
29+
protected StatelessUnpromotableRelocationAction.Request createTestInstance() {
30+
return new StatelessUnpromotableRelocationAction.Request(
31+
randomNonNegativeLong(),
32+
new ShardId(randomIdentifier(), UUIDs.randomBase64UUID(), randomIntBetween(0, 99)),
33+
randomUUID(),
34+
randomNonNegativeLong()
35+
);
36+
}
37+
38+
@Override
39+
protected StatelessUnpromotableRelocationAction.Request mutateInstance(StatelessUnpromotableRelocationAction.Request instance)
40+
throws IOException {
41+
return switch (between(0, 3)) {
42+
case 0 -> new StatelessUnpromotableRelocationAction.Request(
43+
randomValueOtherThan(instance.getRecoveryId(), ESTestCase::randomNonNegativeLong),
44+
instance.getShardId(),
45+
instance.getTargetAllocationId(),
46+
instance.getClusterStateVersion()
47+
);
48+
case 1 -> new StatelessUnpromotableRelocationAction.Request(
49+
instance.getRecoveryId(),
50+
ShardIdTests.mutate(instance.getShardId()),
51+
instance.getTargetAllocationId(),
52+
instance.getClusterStateVersion()
53+
);
54+
case 2 -> new StatelessUnpromotableRelocationAction.Request(
55+
instance.getRecoveryId(),
56+
instance.getShardId(),
57+
randomValueOtherThan(instance.getTargetAllocationId(), ESTestCase::randomUUID),
58+
instance.getClusterStateVersion()
59+
);
60+
case 3 -> new StatelessUnpromotableRelocationAction.Request(
61+
instance.getRecoveryId(),
62+
instance.getShardId(),
63+
instance.getTargetAllocationId(),
64+
randomValueOtherThan(instance.getClusterStateVersion(), ESTestCase::randomNonNegativeLong)
65+
);
66+
default -> throw new AssertionError("Illegal randomisation branch");
67+
};
68+
}
69+
}

0 commit comments

Comments
 (0)