Skip to content

Commit eeedb98

Browse files
authored
Make cluster health API cancellable (#96990)
This API can be quite heavy in large clusters, and might spam the `MANAGEMENT` threadpool queue with work for clients that have long-since given up. This commit adds some basic cancellability checks to reduce the problem. Backport of #96551 to 7.17
1 parent e1995a7 commit eeedb98

File tree

7 files changed

+163
-28
lines changed

7 files changed

+163
-28
lines changed

docs/changelog/96551.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 96551
2+
summary: Make cluster health API cancellable
3+
area: Distributed
4+
type: bug
5+
issues: []
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http;
10+
11+
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.client.Cancellable;
15+
import org.elasticsearch.client.Request;
16+
import org.elasticsearch.client.Response;
17+
import org.elasticsearch.cluster.ClusterState;
18+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
19+
import org.elasticsearch.cluster.service.ClusterService;
20+
import org.elasticsearch.common.Priority;
21+
22+
import java.util.concurrent.CancellationException;
23+
import java.util.concurrent.CyclicBarrier;
24+
25+
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
26+
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
27+
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefixOnMaster;
28+
29+
public class ClusterHealthRestCancellationIT extends HttpSmokeTestCase {
30+
31+
public void testClusterHealthRestCancellation() throws Exception {
32+
33+
final CyclicBarrier barrier = new CyclicBarrier(2);
34+
35+
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
36+
.submitStateUpdateTask("blocking", new ClusterStateUpdateTask() {
37+
@Override
38+
public ClusterState execute(ClusterState currentState) {
39+
safeAwait(barrier);
40+
safeAwait(barrier);
41+
return currentState;
42+
}
43+
44+
@Override
45+
public void onFailure(String source, Exception e) {
46+
throw new AssertionError(e);
47+
}
48+
});
49+
50+
final Request clusterHealthRequest = new Request(HttpGet.METHOD_NAME, "/_cluster/health");
51+
clusterHealthRequest.addParameter("wait_for_events", Priority.LANGUID.toString());
52+
53+
final PlainActionFuture<Response> future = new PlainActionFuture<>();
54+
logger.info("--> sending cluster state request");
55+
final Cancellable cancellable = getRestClient().performRequestAsync(clusterHealthRequest, wrapAsRestResponseListener(future));
56+
57+
safeAwait(barrier);
58+
59+
awaitTaskWithPrefixOnMaster(ClusterHealthAction.NAME);
60+
61+
logger.info("--> cancelling cluster health request");
62+
cancellable.cancel();
63+
expectThrows(CancellationException.class, future::actionGet);
64+
65+
logger.info("--> checking cluster health task cancelled");
66+
assertAllCancellableTasksAreCancelled(ClusterHealthAction.NAME);
67+
68+
safeAwait(barrier);
69+
}
70+
71+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
import org.elasticsearch.common.io.stream.StreamInput;
2020
import org.elasticsearch.common.io.stream.StreamOutput;
2121
import org.elasticsearch.core.TimeValue;
22+
import org.elasticsearch.tasks.CancellableTask;
23+
import org.elasticsearch.tasks.Task;
24+
import org.elasticsearch.tasks.TaskId;
2225

2326
import java.io.IOException;
27+
import java.util.Map;
2428
import java.util.Objects;
2529
import java.util.concurrent.TimeUnit;
2630

@@ -264,6 +268,11 @@ public ActionRequestValidationException validate() {
264268
return null;
265269
}
266270

271+
@Override
272+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
273+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
274+
}
275+
267276
public enum Level {
268277
CLUSTER,
269278
INDICES,

server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.core.TimeValue;
3535
import org.elasticsearch.index.IndexNotFoundException;
3636
import org.elasticsearch.node.NodeClosedException;
37+
import org.elasticsearch.tasks.CancellableTask;
3738
import org.elasticsearch.tasks.Task;
3839
import org.elasticsearch.threadpool.ThreadPool;
3940
import org.elasticsearch.transport.TransportService;
@@ -91,28 +92,42 @@ protected void masterOperation(
9192
final ClusterState unusedState,
9293
final ActionListener<ClusterHealthResponse> listener
9394
) {
95+
assert task instanceof CancellableTask;
96+
final CancellableTask cancellableTask = (CancellableTask) task;
9497

9598
final int waitCount = getWaitCount(request);
9699

97100
if (request.waitForEvents() != null) {
98-
waitForEventsAndExecuteHealth(request, listener, waitCount, threadPool.relativeTimeInMillis() + request.timeout().millis());
101+
waitForEventsAndExecuteHealth(
102+
cancellableTask,
103+
request,
104+
listener,
105+
waitCount,
106+
threadPool.relativeTimeInMillis() + request.timeout().millis()
107+
);
99108
} else {
100109
executeHealth(
110+
cancellableTask,
101111
request,
102112
clusterService.state(),
103113
listener,
104114
waitCount,
105-
clusterState -> listener.onResponse(getResponse(request, clusterState, waitCount, TimeoutState.OK))
115+
clusterState -> sendResponse(cancellableTask, request, clusterState, waitCount, TimeoutState.OK, listener)
106116
);
107117
}
108118
}
109119

110120
private void waitForEventsAndExecuteHealth(
121+
final CancellableTask task,
111122
final ClusterHealthRequest request,
112123
final ActionListener<ClusterHealthResponse> listener,
113124
final int waitCount,
114125
final long endTimeRelativeMillis
115126
) {
127+
if (task.notifyIfCancelled(listener)) {
128+
return;
129+
}
130+
116131
assert request.waitForEvents() != null;
117132
if (request.local()) {
118133
clusterService.submitStateUpdateTask(
@@ -129,11 +144,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
129144
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
130145
request.timeout(newTimeout);
131146
executeHealth(
147+
task,
132148
request,
133149
clusterService.state(),
134150
listener,
135151
waitCount,
136-
observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis)
152+
observedState -> waitForEventsAndExecuteHealth(task, request, listener, waitCount, endTimeRelativeMillis)
137153
);
138154
}
139155

@@ -166,11 +182,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
166182
assert newState.stateUUID().equals(appliedState.stateUUID())
167183
: newState.stateUUID() + " vs " + appliedState.stateUUID();
168184
executeHealth(
185+
task,
169186
request,
170187
appliedState,
171188
listener,
172189
waitCount,
173-
observedState -> waitForEventsAndExecuteHealth(request, listener, waitCount, endTimeRelativeMillis)
190+
observedState -> waitForEventsAndExecuteHealth(task, request, listener, waitCount, endTimeRelativeMillis)
174191
);
175192
}
176193

@@ -187,7 +204,7 @@ public void onNoLongerMaster(String source) {
187204
@Override
188205
public void onFailure(String source, Exception e) {
189206
if (e instanceof ProcessClusterEventTimeoutException) {
190-
listener.onResponse(getResponse(request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT));
207+
sendResponse(task, request, clusterService.state(), waitCount, TimeoutState.TIMED_OUT, listener);
191208
} else {
192209
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
193210
listener.onFailure(e);
@@ -199,21 +216,25 @@ public void onFailure(String source, Exception e) {
199216
}
200217

201218
private void executeHealth(
219+
final CancellableTask task,
202220
final ClusterHealthRequest request,
203221
final ClusterState currentState,
204222
final ActionListener<ClusterHealthResponse> listener,
205223
final int waitCount,
206224
final Consumer<ClusterState> onNewClusterStateAfterDelay
207225
) {
226+
if (task.notifyIfCancelled(listener)) {
227+
return;
228+
}
208229

209230
if (request.timeout().millis() == 0) {
210-
listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.ZERO_TIMEOUT));
231+
sendResponse(task, request, currentState, waitCount, TimeoutState.ZERO_TIMEOUT, listener);
211232
return;
212233
}
213234

214235
final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, waitCount);
215236
if (validationPredicate.test(currentState)) {
216-
listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.OK));
237+
sendResponse(task, request, currentState, waitCount, TimeoutState.OK, listener);
217238
} else {
218239
final ClusterStateObserver observer = new ClusterStateObserver(
219240
currentState,
@@ -235,7 +256,7 @@ public void onClusterServiceClose() {
235256

236257
@Override
237258
public void onTimeout(TimeValue timeout) {
238-
listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, TimeoutState.TIMED_OUT));
259+
sendResponse(task, request, observer.setAndGetObservedState(), waitCount, TimeoutState.TIMED_OUT, listener);
239260
}
240261
};
241262
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());
@@ -282,27 +303,32 @@ private enum TimeoutState {
282303
ZERO_TIMEOUT
283304
}
284305

285-
private ClusterHealthResponse getResponse(
306+
private void sendResponse(
307+
final CancellableTask task,
286308
final ClusterHealthRequest request,
287-
ClusterState clusterState,
309+
final ClusterState clusterState,
288310
final int waitFor,
289-
TimeoutState timeoutState
311+
final TimeoutState timeoutState,
312+
final ActionListener<ClusterHealthResponse> listener
290313
) {
291-
ClusterHealthResponse response = clusterHealth(
292-
request,
293-
clusterState,
294-
clusterService.getMasterService().numberOfPendingTasks(),
295-
allocationService.getNumberOfInFlightFetches(),
296-
clusterService.getMasterService().getMaxTaskWaitTime()
297-
);
298-
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
299-
boolean valid = (readyCounter == waitFor);
300-
assert valid || (timeoutState != TimeoutState.OK);
301-
// If valid && timeoutState == TimeoutState.ZERO_TIMEOUT then we immediately found **and processed** a valid state, so we don't
302-
// consider this a timeout. However if timeoutState == TimeoutState.TIMED_OUT then we didn't process a valid state (perhaps we
303-
// failed on wait_for_events) so this does count as a timeout.
304-
response.setTimedOut(valid == false || timeoutState == TimeoutState.TIMED_OUT);
305-
return response;
314+
ActionListener.completeWith(listener, () -> {
315+
task.ensureNotCancelled();
316+
ClusterHealthResponse response = clusterHealth(
317+
request,
318+
clusterState,
319+
clusterService.getMasterService().numberOfPendingTasks(),
320+
allocationService.getNumberOfInFlightFetches(),
321+
clusterService.getMasterService().getMaxTaskWaitTime()
322+
);
323+
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
324+
boolean valid = (readyCounter == waitFor);
325+
assert valid || (timeoutState != TimeoutState.OK);
326+
// If valid && timeoutState == TimeoutState.ZERO_TIMEOUT then we immediately found **and processed** a valid state, so we don't
327+
// consider this a timeout. However if timeoutState == TimeoutState.TIMED_OUT then we didn't process a valid state (perhaps we
328+
// failed on wait_for_events) so this does count as a timeout.
329+
response.setTimedOut(valid == false || timeoutState == TimeoutState.TIMED_OUT);
330+
return response;
331+
});
306332
}
307333

308334
static int prepareResponse(

server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterHealthAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.Strings;
1818
import org.elasticsearch.rest.BaseRestHandler;
1919
import org.elasticsearch.rest.RestRequest;
20+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2021
import org.elasticsearch.rest.action.RestStatusToXContentListener;
2122

2223
import java.io.IOException;
@@ -50,7 +51,9 @@ public boolean allowSystemIndexAccessByDefault() {
5051
@Override
5152
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
5253
final ClusterHealthRequest clusterHealthRequest = fromRequest(request);
53-
return channel -> client.admin().cluster().health(clusterHealthRequest, new RestStatusToXContentListener<>(channel));
54+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
55+
.cluster()
56+
.health(clusterHealthRequest, new RestStatusToXContentListener<>(channel));
5457
}
5558

5659
public static ClusterHealthRequest fromRequest(final RestRequest request) {

server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212

1313
import org.elasticsearch.Version;
1414
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
1516
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
1617
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
1718
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
1819
import org.elasticsearch.action.support.ActionFilters;
20+
import org.elasticsearch.action.support.ActionTestUtils;
1921
import org.elasticsearch.action.support.IndicesOptions;
2022
import org.elasticsearch.action.support.PlainActionFuture;
2123
import org.elasticsearch.cluster.ClusterName;
@@ -40,6 +42,8 @@
4042
import org.elasticsearch.common.settings.Settings;
4143
import org.elasticsearch.common.util.set.Sets;
4244
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
45+
import org.elasticsearch.tasks.CancellableTask;
46+
import org.elasticsearch.tasks.TaskId;
4347
import org.elasticsearch.test.ESTestCase;
4448
import org.elasticsearch.test.gateway.TestGatewayAllocator;
4549
import org.elasticsearch.test.transport.CapturingTransport;
@@ -156,7 +160,12 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte
156160
new AllocationService(null, new TestGatewayAllocator(), null, null, null)
157161
);
158162
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
159-
action.execute(new ClusterHealthRequest().waitForGreenStatus(), listener);
163+
ActionTestUtils.execute(
164+
action,
165+
new CancellableTask(1, "direct", ClusterHealthAction.NAME, "", TaskId.EMPTY_TASK_ID, Collections.emptyMap()),
166+
new ClusterHealthRequest().waitForGreenStatus(),
167+
listener
168+
);
160169

161170
assertFalse(listener.isDone());
162171

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
import java.util.Set;
140140
import java.util.TimeZone;
141141
import java.util.concurrent.CopyOnWriteArrayList;
142+
import java.util.concurrent.CyclicBarrier;
142143
import java.util.concurrent.ExecutorService;
143144
import java.util.concurrent.TimeUnit;
144145
import java.util.concurrent.atomic.AtomicInteger;
@@ -1824,4 +1825,15 @@ public String toString() {
18241825
return String.format(Locale.ROOT, "%s: %s", level.name(), message);
18251826
}
18261827
}
1828+
1829+
public static void safeAwait(CyclicBarrier barrier) {
1830+
try {
1831+
barrier.await(10, TimeUnit.SECONDS);
1832+
} catch (InterruptedException e) {
1833+
Thread.currentThread().interrupt();
1834+
throw new AssertionError("unexpected", e);
1835+
} catch (Exception e) {
1836+
throw new AssertionError("unexpected", e);
1837+
}
1838+
}
18271839
}

0 commit comments

Comments
 (0)