Skip to content

Commit e144bef

Browse files
authored
New TransportBroadcastUnpromotableAction action (#93600)
Introduces: * New action that can be used to broadcast to unpromotable shards of a given IndexShardRoutingTable. * New hook in ReplicationOperation for custom logic when the primary operation completes. If there is a failure, this increases the shard failures of the replication operation. * Refresh action now uses the new hook to broadcast the unpromotable refresh action to all unpromotable shards. Fixes ES-5454 Fixes ES-5212
1 parent 4d117c5 commit e144bef

File tree

15 files changed

+885
-93
lines changed

15 files changed

+885
-93
lines changed

docs/changelog/93600.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 93600
2+
summary: New `TransportBroadcastUnpromotableAction` action
3+
area: CRUD
4+
type: feature
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.elasticsearch.action.ActionResponse;
1414
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
15+
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
1516
import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction;
1617
import org.elasticsearch.action.search.ClosePointInTimeAction;
1718
import org.elasticsearch.action.search.ClosePointInTimeRequest;
@@ -51,6 +52,7 @@
5152
import org.elasticsearch.test.ESIntegTestCase;
5253
import org.elasticsearch.test.XContentTestUtils;
5354
import org.elasticsearch.test.transport.MockTransportService;
55+
import org.elasticsearch.transport.ConnectTransportException;
5456
import org.elasticsearch.transport.TransportService;
5557

5658
import java.io.IOException;
@@ -269,7 +271,7 @@ private static void installMockTransportVerifications(RoutingTableWatcher routin
269271
connection.sendRequest(requestId, action, request, options);
270272
});
271273
mockTransportService.addRequestHandlingBehavior(
272-
TransportUnpromotableShardRefreshAction.NAME,
274+
TransportUnpromotableShardRefreshAction.NAME + "[u]",
273275
(handler, request, channel, task) -> {
274276
// Skip handling the request and send an immediate empty response
275277
channel.sendResponse(ActionResponse.Empty.INSTANCE);
@@ -690,6 +692,63 @@ public void testRefreshOfUnpromotableShards() throws Exception {
690692
}
691693
}
692694

695+
public void testRefreshFailsIfUnpromotableDisconnects() throws Exception {
696+
var routingTableWatcher = new RoutingTableWatcher();
697+
var additionalNumberOfNodesWithUnpromotableShards = 1;
698+
routingTableWatcher.numReplicas = routingTableWatcher.numIndexingCopies + additionalNumberOfNodesWithUnpromotableShards - 1;
699+
internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numIndexingCopies + 1);
700+
final String nodeWithUnpromotableOnly = internalCluster().startDataOnlyNode(
701+
Settings.builder().put("node.attr." + TestPlugin.NODE_ATTR_UNPROMOTABLE_ONLY, "true").build()
702+
);
703+
installMockTransportVerifications(routingTableWatcher);
704+
getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies;
705+
706+
final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
707+
try {
708+
// verify the correct number of shard copies of each role as the routing table evolves
709+
masterClusterService.addListener(routingTableWatcher);
710+
711+
createIndex(
712+
INDEX_NAME,
713+
Settings.builder()
714+
.put(routingTableWatcher.getIndexSettings())
715+
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), false)
716+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
717+
.build()
718+
);
719+
ensureGreen(INDEX_NAME);
720+
assertEngineTypes();
721+
722+
indexRandom(false, INDEX_NAME, randomIntBetween(1, 10));
723+
724+
for (var transportService : internalCluster().getInstances(TransportService.class)) {
725+
MockTransportService mockTransportService = (MockTransportService) transportService;
726+
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
727+
if (action.equals(TransportUnpromotableShardRefreshAction.NAME + "[u]")
728+
&& nodeWithUnpromotableOnly.equals(connection.getNode().getName())) {
729+
logger.info("--> preventing {} request by throwing ConnectTransportException", action);
730+
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request");
731+
}
732+
connection.sendRequest(requestId, action, request, options);
733+
});
734+
}
735+
736+
RefreshResponse response = client().admin().indices().prepareRefresh(INDEX_NAME).execute().actionGet();
737+
assertThat(
738+
"each unpromotable replica shard should be added to the shard failures",
739+
response.getFailedShards(),
740+
equalTo((routingTableWatcher.numReplicas - (routingTableWatcher.numIndexingCopies - 1)) * routingTableWatcher.numShards)
741+
);
742+
assertThat(
743+
"the total shards is incremented with the unpromotable shard failures",
744+
response.getTotalShards(),
745+
equalTo(response.getSuccessfulShards() + response.getFailedShards())
746+
);
747+
} finally {
748+
masterClusterService.removeListener(routingTableWatcher);
749+
}
750+
}
751+
693752
public void testNodesWithUnpromotableShardsNeverGetReplicationActions() throws Exception {
694753
var routingTableWatcher = new RoutingTableWatcher();
695754
var additionalNumberOfNodesWithUnpromotableShards = randomIntBetween(1, 3);

server/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
exports org.elasticsearch.action.support;
140140
exports org.elasticsearch.action.support.broadcast;
141141
exports org.elasticsearch.action.support.broadcast.node;
142+
exports org.elasticsearch.action.support.broadcast.unpromotable;
142143
exports org.elasticsearch.action.support.master;
143144
exports org.elasticsearch.action.support.master.info;
144145
exports org.elasticsearch.action.support.nodes;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.admin.indices.refresh;
10+
11+
import org.elasticsearch.action.support.replication.ReplicationRequest;
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.index.engine.Engine;
14+
import org.elasticsearch.index.shard.ShardId;
15+
16+
import java.io.IOException;
17+
18+
/**
19+
* A request that is sent to the promotable replicas of a primary shard
20+
*/
21+
public class ShardRefreshReplicaRequest extends ReplicationRequest<ShardRefreshReplicaRequest> {
22+
23+
/**
24+
* Holds the refresh result of the primary shard. This will be used by {@link TransportShardRefreshAction} to construct a
25+
* {@link UnpromotableShardRefreshRequest} to broadcast to the unpromotable replicas. The refresh result is not serialized to maintain
26+
* backwards compatibility for the refresh requests to promotable replicas which do not need the refresh result. For this reason, the
27+
* field is package-private.
28+
*/
29+
final Engine.RefreshResult primaryRefreshResult;
30+
31+
public ShardRefreshReplicaRequest(StreamInput in) throws IOException {
32+
super(in);
33+
primaryRefreshResult = Engine.RefreshResult.NO_REFRESH;
34+
}
35+
36+
public ShardRefreshReplicaRequest(ShardId shardId, Engine.RefreshResult primaryRefreshResult) {
37+
super(shardId);
38+
this.primaryRefreshResult = primaryRefreshResult;
39+
}
40+
41+
@Override
42+
public String toString() {
43+
return "ShardRefreshReplicaRequest{" + shardId + "}";
44+
}
45+
46+
}

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 45 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,15 @@
1010

1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.ActionListenerResponseHandler;
13+
import org.elasticsearch.action.ActionResponse;
1314
import org.elasticsearch.action.ActionType;
1415
import org.elasticsearch.action.support.ActionFilters;
15-
import org.elasticsearch.action.support.RefCountingListener;
1616
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
17+
import org.elasticsearch.action.support.replication.ReplicationOperation;
1718
import org.elasticsearch.action.support.replication.ReplicationResponse;
1819
import org.elasticsearch.action.support.replication.TransportReplicationAction;
19-
import org.elasticsearch.cluster.ClusterState;
2020
import org.elasticsearch.cluster.action.shard.ShardStateAction;
21-
import org.elasticsearch.cluster.node.DiscoveryNode;
22-
import org.elasticsearch.cluster.routing.ShardRouting;
21+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2322
import org.elasticsearch.cluster.service.ClusterService;
2423
import org.elasticsearch.common.inject.Inject;
2524
import org.elasticsearch.common.io.stream.StreamInput;
@@ -28,19 +27,14 @@
2827
import org.elasticsearch.indices.IndicesService;
2928
import org.elasticsearch.logging.LogManager;
3029
import org.elasticsearch.logging.Logger;
31-
import org.elasticsearch.tasks.Task;
3230
import org.elasticsearch.threadpool.ThreadPool;
33-
import org.elasticsearch.transport.TransportRequestOptions;
34-
import org.elasticsearch.transport.TransportResponse;
3531
import org.elasticsearch.transport.TransportService;
3632

3733
import java.io.IOException;
38-
import java.util.function.Predicate;
39-
import java.util.stream.Collectors;
4034

4135
public class TransportShardRefreshAction extends TransportReplicationAction<
4236
BasicReplicationRequest,
43-
BasicReplicationRequest,
37+
ShardRefreshReplicaRequest,
4438
ReplicationResponse> {
4539

4640
private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);
@@ -69,10 +63,11 @@ public TransportShardRefreshAction(
6963
shardStateAction,
7064
actionFilters,
7165
BasicReplicationRequest::new,
72-
BasicReplicationRequest::new,
66+
ShardRefreshReplicaRequest::new,
7367
ThreadPool.Names.REFRESH
7468
);
75-
new TransportUnpromotableShardRefreshAction(transportService, actionFilters, indicesService);
69+
// registers the unpromotable version of shard refresh action
70+
new TransportUnpromotableShardRefreshAction(clusterService, transportService, actionFilters, indicesService);
7671
}
7772

7873
@Override
@@ -84,53 +79,53 @@ protected ReplicationResponse newResponseInstance(StreamInput in) throws IOExcep
8479
protected void shardOperationOnPrimary(
8580
BasicReplicationRequest shardRequest,
8681
IndexShard primary,
87-
ActionListener<PrimaryResult<BasicReplicationRequest, ReplicationResponse>> listener
82+
ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener
8883
) {
89-
try (var listeners = new RefCountingListener(listener.map(v -> new PrimaryResult<>(shardRequest, new ReplicationResponse())))) {
90-
var refreshResult = primary.refresh(SOURCE_API);
84+
ActionListener.completeWith(listener, () -> {
85+
ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), primary.refresh(SOURCE_API));
86+
replicaRequest.setParentTask(shardRequest.getParentTask());
9187
logger.trace("{} refresh request executed on primary", primary.shardId());
92-
93-
// Forward the request to all nodes that hold unpromotable replica shards
94-
final ClusterState clusterState = clusterService.state();
95-
final Task parentTaskId = taskManager.getTask(shardRequest.getParentTask().getId());
96-
clusterState.routingTable()
97-
.shardRoutingTable(shardRequest.shardId())
98-
.assignedShards()
99-
.stream()
100-
.filter(Predicate.not(ShardRouting::isPromotableToPrimary))
101-
.map(ShardRouting::currentNodeId)
102-
.collect(Collectors.toUnmodifiableSet())
103-
.forEach(nodeId -> {
104-
final DiscoveryNode node = clusterState.nodes().get(nodeId);
105-
UnpromotableShardRefreshRequest request = new UnpromotableShardRefreshRequest(
106-
primary.shardId(),
107-
refreshResult.generation()
108-
);
109-
logger.trace("forwarding refresh request [{}] to node [{}]", request, node);
110-
transportService.sendChildRequest(
111-
node,
112-
TransportUnpromotableShardRefreshAction.NAME,
113-
request,
114-
parentTaskId,
115-
TransportRequestOptions.EMPTY,
116-
new ActionListenerResponseHandler<>(
117-
listeners.acquire(ignored -> {}),
118-
(in) -> TransportResponse.Empty.INSTANCE,
119-
ThreadPool.Names.REFRESH
120-
)
121-
);
122-
});
123-
} catch (Exception e) {
124-
listener.onFailure(e);
125-
}
88+
return new PrimaryResult<>(replicaRequest, new ReplicationResponse());
89+
});
12690
}
12791

12892
@Override
129-
protected void shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
93+
protected void shardOperationOnReplica(ShardRefreshReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
13094
ActionListener.completeWith(listener, () -> {
13195
replica.refresh(SOURCE_API);
13296
logger.trace("{} refresh request executed on replica", replica.shardId());
13397
return new ReplicaResult();
13498
});
13599
}
100+
101+
@Override
102+
protected ReplicationOperation.Replicas<ShardRefreshReplicaRequest> newReplicasProxy() {
103+
return new UnpromotableReplicasRefreshProxy();
104+
}
105+
106+
protected class UnpromotableReplicasRefreshProxy extends ReplicasProxy {
107+
108+
@Override
109+
public void onPrimaryOperationComplete(
110+
ShardRefreshReplicaRequest replicaRequest,
111+
IndexShardRoutingTable indexShardRoutingTable,
112+
ActionListener<Void> listener
113+
) {
114+
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
115+
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
116+
indexShardRoutingTable,
117+
replicaRequest.primaryRefreshResult.generation()
118+
);
119+
transportService.sendRequest(
120+
transportService.getLocalNode(),
121+
TransportUnpromotableShardRefreshAction.NAME,
122+
unpromotableReplicaRequest,
123+
new ActionListenerResponseHandler<>(
124+
listener.delegateFailure((l, r) -> l.onResponse(null)),
125+
(in) -> ActionResponse.Empty.INSTANCE,
126+
ThreadPool.Names.REFRESH
127+
)
128+
);
129+
}
130+
}
136131
}

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,37 +11,42 @@
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.ActionResponse;
1313
import org.elasticsearch.action.support.ActionFilters;
14-
import org.elasticsearch.action.support.HandledTransportAction;
14+
import org.elasticsearch.action.support.broadcast.unpromotable.TransportBroadcastUnpromotableAction;
15+
import org.elasticsearch.cluster.service.ClusterService;
1516
import org.elasticsearch.common.inject.Inject;
16-
import org.elasticsearch.index.engine.Engine;
1717
import org.elasticsearch.index.shard.IndexShard;
1818
import org.elasticsearch.indices.IndicesService;
1919
import org.elasticsearch.tasks.Task;
2020
import org.elasticsearch.threadpool.ThreadPool;
2121
import org.elasticsearch.transport.TransportService;
2222

23-
public class TransportUnpromotableShardRefreshAction extends HandledTransportAction<UnpromotableShardRefreshRequest, ActionResponse.Empty> {
24-
public static final String NAME = RefreshAction.NAME + "[u]";
23+
public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<UnpromotableShardRefreshRequest> {
24+
25+
public static final String NAME = RefreshAction.NAME + "/unpromotable";
2526

2627
private final IndicesService indicesService;
2728

2829
@Inject
2930
public TransportUnpromotableShardRefreshAction(
31+
ClusterService clusterService,
3032
TransportService transportService,
3133
ActionFilters actionFilters,
3234
IndicesService indicesService
3335
) {
34-
super(NAME, transportService, actionFilters, UnpromotableShardRefreshRequest::new, ThreadPool.Names.REFRESH);
36+
super(NAME, clusterService, transportService, actionFilters, UnpromotableShardRefreshRequest::new, ThreadPool.Names.REFRESH);
3537
this.indicesService = indicesService;
3638
}
3739

3840
@Override
39-
protected void doExecute(Task task, UnpromotableShardRefreshRequest request, ActionListener<ActionResponse.Empty> responseListener) {
41+
protected void unpromotableShardOperation(
42+
Task task,
43+
UnpromotableShardRefreshRequest request,
44+
ActionListener<ActionResponse.Empty> responseListener
45+
) {
4046
ActionListener.run(responseListener, listener -> {
41-
assert request.getSegmentGeneration() != Engine.RefreshResult.UNKNOWN_GENERATION
42-
: "The request segment is " + request.getSegmentGeneration();
43-
IndexShard shard = indicesService.indexServiceSafe(request.getShardId().getIndex()).getShard(request.getShardId().id());
47+
IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
4448
shard.waitForSegmentGeneration(request.getSegmentGeneration(), listener.map(l -> ActionResponse.Empty.INSTANCE));
4549
});
4650
}
51+
4752
}

0 commit comments

Comments
 (0)