Skip to content

Commit d7d9656

Browse files
authored
Add a transport action to confirm nodes have applied cluster state version (elastic#139716)
1 parent e6242aa commit d7d9656

File tree

6 files changed

+564
-0
lines changed

6 files changed

+564
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.state;
11+
12+
import org.elasticsearch.ElasticsearchTimeoutException;
13+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
14+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
15+
import org.elasticsearch.cluster.ClusterState;
16+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
17+
import org.elasticsearch.cluster.coordination.PublicationTransportHandler;
18+
import org.elasticsearch.cluster.metadata.IndexMetadata;
19+
import org.elasticsearch.cluster.node.DiscoveryNode;
20+
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.util.CollectionUtils;
22+
import org.elasticsearch.core.TimeValue;
23+
import org.elasticsearch.index.IndexVersion;
24+
import org.elasticsearch.plugins.Plugin;
25+
import org.elasticsearch.tasks.TaskCancelledException;
26+
import org.elasticsearch.test.ESIntegTestCase;
27+
import org.elasticsearch.test.transport.MockTransportService;
28+
29+
import java.util.Collection;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.function.BiConsumer;
32+
33+
@ESIntegTestCase.ClusterScope(numClientNodes = 0)
34+
public class TransportAwaitClusterStateVersionAppliedActionIT extends ESIntegTestCase {
35+
public void testVersionsThatAreAlreadyApplied() {
36+
// Sample some of the nodes for asserts.
37+
internalCluster().ensureAtLeastNumDataNodes(2);
38+
var masterNode = internalCluster().getMasterName();
39+
var node1 = internalCluster().getNonMasterNodeName();
40+
41+
BiConsumer<Long, Collection<DiscoveryNode>> checkAppliedVersion = (version, nodes) -> {
42+
var response = client().execute(
43+
TransportAwaitClusterStateVersionAppliedAction.TYPE,
44+
new AwaitClusterStateVersionAppliedRequest(version, TimeValue.MINUS_ONE, nodes.toArray(new DiscoveryNode[0]))
45+
).actionGet();
46+
assertFalse(response.hasFailures());
47+
assertTrue(response.failures().isEmpty());
48+
assertEquals(internalCluster().numDataAndMasterNodes(), response.getNodes().size());
49+
assertTrue(response.getNodes().stream().anyMatch(r -> r.getNode().getName().equals(masterNode)));
50+
assertTrue(response.getNodes().stream().anyMatch(r -> r.getNode().getName().equals(node1)));
51+
};
52+
53+
var initialState = internalCluster().getInstance(ClusterService.class, node1).state();
54+
// Succeeds because the version is applied already.
55+
checkAppliedVersion.accept(initialState.version(), initialState.nodes().getAllNodes());
56+
57+
dummyClusterStateUpdate(masterNode, null);
58+
awaitClusterState(masterNode, state -> state.version() == initialState.version() + 1);
59+
60+
// We should succeed again since the previous execution succeeded.
61+
checkAppliedVersion.accept(initialState.version(), initialState.nodes().getAllNodes());
62+
}
63+
64+
public void testWaitingForVersion() {
65+
internalCluster().ensureAtLeastNumDataNodes(2);
66+
var masterNode = internalCluster().getMasterName();
67+
var node1 = internalCluster().getNonMasterNodeName();
68+
69+
var initialState = internalCluster().clusterService(masterNode).state();
70+
71+
DiscoveryNode masterDiscoveryNode = initialState.nodes().resolveNode(masterNode);
72+
DiscoveryNode node1DiscoveryNode = initialState.nodes().resolveNode(node1);
73+
74+
var onePlusVersionFuture = client().execute(
75+
TransportAwaitClusterStateVersionAppliedAction.TYPE,
76+
new AwaitClusterStateVersionAppliedRequest(initialState.version() + 1, TimeValue.MINUS_ONE, masterDiscoveryNode)
77+
);
78+
79+
// Note that here we are waiting for two updates.
80+
var twoPlusVersionFuture = client().execute(
81+
TransportAwaitClusterStateVersionAppliedAction.TYPE,
82+
new AwaitClusterStateVersionAppliedRequest(
83+
initialState.version() + 2,
84+
TimeValue.MINUS_ONE,
85+
masterDiscoveryNode,
86+
node1DiscoveryNode
87+
)
88+
);
89+
90+
assertFalse(onePlusVersionFuture.isDone());
91+
assertFalse(twoPlusVersionFuture.isDone());
92+
93+
// Let's submit one cluster state update.
94+
var updateStarted = new CountDownLatch(2);
95+
dummyClusterStateUpdate(masterNode, updateStarted);
96+
97+
var onePlusVersionResponse = onePlusVersionFuture.actionGet();
98+
// We should only get a response once the state was updated.
99+
assertEquals(1, updateStarted.getCount());
100+
101+
assertFalse(onePlusVersionResponse.hasFailures());
102+
assertTrue(onePlusVersionResponse.failures().isEmpty());
103+
assertEquals(1, onePlusVersionResponse.getNodes().size());
104+
assertTrue(onePlusVersionResponse.getNodes().stream().anyMatch(r -> r.getNode().getName().equals(masterNode)));
105+
106+
// But the future waiting for two updates is still not done.
107+
assertFalse(twoPlusVersionFuture.isDone());
108+
109+
// Submit second update.
110+
dummyClusterStateUpdate(masterNode, updateStarted);
111+
112+
var twoPlusVersionResponse = twoPlusVersionFuture.actionGet();
113+
assertEquals(0, updateStarted.getCount());
114+
115+
assertFalse(twoPlusVersionResponse.hasFailures());
116+
assertTrue(twoPlusVersionResponse.failures().isEmpty());
117+
assertEquals(2, twoPlusVersionResponse.getNodes().size());
118+
assertTrue(twoPlusVersionResponse.getNodes().stream().anyMatch(r -> r.getNode().getName().equals(masterNode)));
119+
assertTrue(twoPlusVersionResponse.getNodes().stream().anyMatch(r -> r.getNode().getName().equals(node1)));
120+
}
121+
122+
public void testNodeNotProcessingClusterState() throws InterruptedException {
123+
internalCluster().ensureAtLeastNumDataNodes(2);
124+
var masterNode = internalCluster().getMasterName();
125+
var node1 = internalCluster().getNonMasterNodeName();
126+
127+
var initialState = internalCluster().clusterService(masterNode).state();
128+
129+
DiscoveryNode masterDiscoveryNode = initialState.nodes().resolveNode(masterNode);
130+
DiscoveryNode node1DiscoveryNode = initialState.nodes().resolveNode(node1);
131+
132+
// Wait for the future version of the cluster state.
133+
var future = client().execute(
134+
TransportAwaitClusterStateVersionAppliedAction.TYPE,
135+
new AwaitClusterStateVersionAppliedRequest(
136+
initialState.version() + 1,
137+
TimeValue.MINUS_ONE,
138+
masterDiscoveryNode,
139+
node1DiscoveryNode
140+
)
141+
);
142+
143+
assertFalse(future.isDone());
144+
145+
// Now we'll block node1 from processing cluster state updates.
146+
var clusterStatePublishLatch = new CountDownLatch(1);
147+
final var node1TransportService = MockTransportService.getInstance(node1);
148+
node1TransportService.addRequestHandlingBehavior(
149+
PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME,
150+
(handler, request, channel, task) -> {
151+
if (clusterStatePublishLatch.getCount() > 0) {
152+
clusterStatePublishLatch.await();
153+
}
154+
handler.messageReceived(request, channel, task);
155+
}
156+
);
157+
158+
// And publish a new state.
159+
var publishLatch = new CountDownLatch(1);
160+
dummyClusterStateUpdate(masterNode, publishLatch);
161+
publishLatch.await();
162+
163+
try {
164+
// We don't get a response since we are waiting for node1.
165+
assertThrows(ElasticsearchTimeoutException.class, () -> future.actionGet(TimeValue.timeValueMillis(500)));
166+
} finally {
167+
clusterStatePublishLatch.countDown();
168+
}
169+
170+
// Once node1 gets the new cluster state we get a response.
171+
var response = future.actionGet();
172+
assertFalse(response.hasFailures());
173+
assertTrue(response.failures().isEmpty());
174+
assertEquals(2, response.getNodes().size());
175+
assertTrue(response.getNodes().stream().anyMatch(r -> r.getNode().getName().equals(masterNode)));
176+
assertTrue(response.getNodes().stream().anyMatch(r -> r.getNode().getName().equals(node1)));
177+
}
178+
179+
public void testTimeout() {
180+
var currentState = internalCluster().getInstance(ClusterService.class).state();
181+
182+
var response = client().execute(
183+
TransportAwaitClusterStateVersionAppliedAction.TYPE,
184+
new AwaitClusterStateVersionAppliedRequest(
185+
currentState.version() + 100,
186+
TimeValue.timeValueMillis(100),
187+
currentState.nodes().getAllNodes().toArray(new DiscoveryNode[0])
188+
)
189+
).actionGet();
190+
191+
assertEquals(internalCluster().numDataAndMasterNodes(), response.failures().size());
192+
// The structure is FailedNodeException -> RemoteTransportException -> ElasticsearchTimeoutException
193+
assertTrue(response.failures().get(0).getCause().getCause() instanceof ElasticsearchTimeoutException);
194+
assertEquals(0, response.getNodes().size());
195+
}
196+
197+
public void testCancellation() {
198+
var currentState = internalCluster().getInstance(ClusterService.class).state();
199+
200+
var future = client().execute(
201+
TransportAwaitClusterStateVersionAppliedAction.TYPE,
202+
new AwaitClusterStateVersionAppliedRequest(
203+
currentState.version() + 100,
204+
TimeValue.MINUS_ONE,
205+
currentState.nodes().getAllNodes().toArray(new DiscoveryNode[0])
206+
)
207+
);
208+
209+
var tasks = client().admin()
210+
.cluster()
211+
.prepareListTasks()
212+
.setActions(TransportAwaitClusterStateVersionAppliedAction.TYPE.name())
213+
.get()
214+
.getTasks();
215+
assertEquals(1, tasks.size());
216+
var thisTask = tasks.get(0);
217+
218+
assertFalse(future.isDone());
219+
220+
var cancelRequest = new CancelTasksRequest().setTargetTaskId(thisTask.taskId()).setReason("cancelled");
221+
client().execute(TransportCancelTasksAction.TYPE, cancelRequest);
222+
223+
assertThrows(TaskCancelledException.class, () -> future.actionGet(SAFE_AWAIT_TIMEOUT));
224+
}
225+
226+
@Override
227+
protected Collection<Class<? extends Plugin>> nodePlugins() {
228+
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
229+
}
230+
231+
private void dummyClusterStateUpdate(String masterNode, CountDownLatch latch) {
232+
internalCluster().clusterService(masterNode).submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask() {
233+
@Override
234+
public ClusterState execute(ClusterState currentState) {
235+
if (latch != null) {
236+
latch.countDown();
237+
}
238+
return ClusterState.builder(currentState)
239+
.metadata(
240+
currentState.metadata()
241+
.withAddedIndex(
242+
IndexMetadata.builder(randomIdentifier())
243+
.settings(indexSettings(IndexVersion.current(), randomIdentifier(), 1, 0))
244+
.build()
245+
)
246+
)
247+
.build();
248+
}
249+
250+
@Override
251+
public void onFailure(Exception e) {
252+
fail(e);
253+
}
254+
});
255+
}
256+
}

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
6767
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportSnapshotsStatusAction;
6868
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
69+
import org.elasticsearch.action.admin.cluster.state.TransportAwaitClusterStateVersionAppliedAction;
6970
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
7071
import org.elasticsearch.action.admin.cluster.stats.ExtendedSearchUsageLongCounter;
7172
import org.elasticsearch.action.admin.cluster.stats.ExtendedSearchUsageMetric;
@@ -814,6 +815,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
814815
actions.register(TransportShardFlushAction.TYPE, TransportShardFlushAction.class);
815816
actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);
816817
actions.register(TransportPrevalidateShardPathAction.TYPE, TransportPrevalidateShardPathAction.class);
818+
actions.register(TransportAwaitClusterStateVersionAppliedAction.TYPE, TransportAwaitClusterStateVersionAppliedAction.class);
817819

818820
// desired nodes
819821
actions.register(GetDesiredNodesAction.INSTANCE, TransportGetDesiredNodesAction.class);
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.state;
11+
12+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.core.TimeValue;
16+
import org.elasticsearch.tasks.CancellableTask;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.tasks.TaskId;
19+
20+
import java.util.Map;
21+
import java.util.Objects;
22+
23+
public class AwaitClusterStateVersionAppliedRequest extends BaseNodesRequest {
24+
private final long clusterStateVersion;
25+
private final TimeValue nodeTimeout;
26+
27+
/// Creates a new instance of the request.
28+
/// @param clusterStateVersion a version that will be awaited on the provided set of nodes
29+
/// @param nodeTimeout a timeout for the cluster state observer awaiting application of the cluster state version on every node.
30+
/// Use [TimeValue#MINUS_ONE] as a "no timeout" value.
31+
/// @param concreteNodes nodes to use when checking if a cluster state version is applied
32+
public AwaitClusterStateVersionAppliedRequest(long clusterStateVersion, TimeValue nodeTimeout, DiscoveryNode... concreteNodes) {
33+
super(concreteNodes);
34+
this.clusterStateVersion = clusterStateVersion;
35+
this.nodeTimeout = Objects.requireNonNull(nodeTimeout);
36+
}
37+
38+
public long clusterStateVersion() {
39+
return clusterStateVersion;
40+
}
41+
42+
public TimeValue nodeTimeout() {
43+
return nodeTimeout;
44+
}
45+
46+
@Override
47+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
48+
return new CancellableTask(id, type, action, "", parentTaskId, headers) {
49+
@Override
50+
public String getDescription() {
51+
return Strings.format("waiting for cluster state version=%s to be applied", clusterStateVersion);
52+
}
53+
};
54+
}
55+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.cluster.state;
11+
12+
import org.elasticsearch.action.FailedNodeException;
13+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
14+
import org.elasticsearch.cluster.ClusterName;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.io.stream.StreamOutput;
17+
18+
import java.io.IOException;
19+
import java.util.List;
20+
21+
public class AwaitClusterStateVersionAppliedResponse extends BaseNodesResponse<
22+
TransportAwaitClusterStateVersionAppliedAction.NodeResponse> {
23+
public AwaitClusterStateVersionAppliedResponse(StreamInput in) throws IOException {
24+
super(in);
25+
}
26+
27+
public AwaitClusterStateVersionAppliedResponse(
28+
ClusterName clusterName,
29+
List<TransportAwaitClusterStateVersionAppliedAction.NodeResponse> nodeResponses,
30+
List<FailedNodeException> failures
31+
) {
32+
super(clusterName, nodeResponses, failures);
33+
}
34+
35+
@Override
36+
protected List<TransportAwaitClusterStateVersionAppliedAction.NodeResponse> readNodesFrom(StreamInput in) throws IOException {
37+
return in.readCollectionAsList(TransportAwaitClusterStateVersionAppliedAction.NodeResponse::new);
38+
}
39+
40+
@Override
41+
protected void writeNodesTo(StreamOutput out, List<TransportAwaitClusterStateVersionAppliedAction.NodeResponse> nodes)
42+
throws IOException {
43+
out.writeCollection(nodes);
44+
}
45+
}

0 commit comments

Comments
 (0)