Skip to content

Commit 68987b4

Browse files
committed
Make TransportLocalClusterStateAction wait for cluster to unblock
This will make TransportLocalClusterStateAction wait for a new state that is not blocked. This means we need a timeout (again). For consistency's sake, we're reusing the REST param `master_timeout` for this timeout as well.
1 parent ccdc562 commit 68987b4

File tree

12 files changed

+401
-78
lines changed

12 files changed

+401
-78
lines changed

rest-api-spec/src/main/resources/rest-api-spec/api/cat.aliases.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@
6565
],
6666
"default": "all",
6767
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
68+
},
69+
"master_timeout":{
70+
"type":"time",
71+
"description":"Timeout for waiting for new cluster state in case it is blocked"
6872
}
6973
}
7074
}

rest-api-spec/src/main/resources/rest-api-spec/api/indices.exists_alias.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@
6161
],
6262
"default":"all",
6363
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
64+
},
65+
"master_timeout":{
66+
"type":"time",
67+
"description":"Timeout for waiting for new cluster state in case it is blocked"
6468
}
6569
}
6670
}

rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_alias.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@
7979
],
8080
"default": "all",
8181
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
82+
},
83+
"master_timeout":{
84+
"type":"time",
85+
"description":"Timeout for waiting for new cluster state in case it is blocked"
8286
}
8387
}
8488
}

server/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@
146146
exports org.elasticsearch.action.support.master;
147147
exports org.elasticsearch.action.support.master.info;
148148
exports org.elasticsearch.action.support.nodes;
149+
exports org.elasticsearch.action.support.local;
149150
exports org.elasticsearch.action.support.replication;
150151
exports org.elasticsearch.action.support.single.instance;
151152
exports org.elasticsearch.action.support.single.shard;

server/src/main/java/org/elasticsearch/action/admin/indices/alias/get/GetAliasesRequest.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,20 @@
88
*/
99
package org.elasticsearch.action.admin.indices.alias.get;
1010

11-
import org.elasticsearch.action.ActionRequest;
1211
import org.elasticsearch.action.ActionRequestValidationException;
1312
import org.elasticsearch.action.AliasesRequest;
1413
import org.elasticsearch.action.support.IndicesOptions;
15-
import org.elasticsearch.action.support.TransportAction;
14+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
15+
import org.elasticsearch.action.support.master.MasterNodeRequest;
1616
import org.elasticsearch.common.Strings;
17-
import org.elasticsearch.common.io.stream.StreamOutput;
17+
import org.elasticsearch.core.TimeValue;
1818
import org.elasticsearch.tasks.CancellableTask;
1919
import org.elasticsearch.tasks.Task;
2020
import org.elasticsearch.tasks.TaskId;
2121

22-
import java.io.IOException;
2322
import java.util.Map;
2423

25-
public class GetAliasesRequest extends ActionRequest implements AliasesRequest {
24+
public class GetAliasesRequest extends LocalClusterStateRequest implements AliasesRequest {
2625

2726
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandHidden();
2827

@@ -31,18 +30,24 @@ public class GetAliasesRequest extends ActionRequest implements AliasesRequest {
3130
private String[] indices = Strings.EMPTY_ARRAY;
3231
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
3332

33+
@Deprecated
3434
public GetAliasesRequest(String... aliases) {
35-
this.aliases = aliases;
36-
this.originalAliases = aliases;
35+
this(MasterNodeRequest.TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, aliases);
3736
}
3837

38+
@Deprecated
3939
public GetAliasesRequest() {
4040
this(Strings.EMPTY_ARRAY);
4141
}
4242

43-
@Override
44-
public void writeTo(StreamOutput out) throws IOException {
45-
TransportAction.localOnly();
43+
public GetAliasesRequest(TimeValue clusterUpdateTimeout, String... aliases) {
44+
super(clusterUpdateTimeout);
45+
this.aliases = aliases;
46+
this.originalAliases = aliases;
47+
}
48+
49+
public GetAliasesRequest(TimeValue clusterUpdateTimeout) {
50+
this(clusterUpdateTimeout, Strings.EMPTY_ARRAY);
4651
}
4752

4853
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.support.ActionFilters;
13-
import org.elasticsearch.action.support.TransportLocalClusterStateAction;
13+
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
1414
import org.elasticsearch.cluster.ClusterState;
1515
import org.elasticsearch.cluster.block.ClusterBlockException;
1616
import org.elasticsearch.cluster.block.ClusterBlockLevel;

server/src/main/java/org/elasticsearch/action/support/TransportLocalClusterStateAction.java

Lines changed: 0 additions & 64 deletions
This file was deleted.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.support.local;
11+
12+
import org.elasticsearch.action.ActionRequest;
13+
import org.elasticsearch.action.support.TransportAction;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.core.TimeValue;
16+
17+
import java.io.IOException;
18+
import java.util.Objects;
19+
20+
/**
21+
* A base request for actions that are executed locally on the node that receives the request.
22+
*/
23+
public abstract class LocalClusterStateRequest extends ActionRequest {
24+
25+
private final TimeValue clusterUpdateTimeout;
26+
27+
protected LocalClusterStateRequest(TimeValue clusterUpdateTimeout) {
28+
this.clusterUpdateTimeout = Objects.requireNonNull(clusterUpdateTimeout);
29+
}
30+
31+
@Override
32+
public void writeTo(StreamOutput out) throws IOException {
33+
TransportAction.localOnly();
34+
}
35+
36+
public TimeValue clusterUpdateTimeout() {
37+
return clusterUpdateTimeout;
38+
}
39+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.support.local;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.ElasticsearchTimeoutException;
15+
import org.elasticsearch.action.ActionListener;
16+
import org.elasticsearch.action.ActionResponse;
17+
import org.elasticsearch.action.ActionRunnable;
18+
import org.elasticsearch.action.support.ActionFilters;
19+
import org.elasticsearch.action.support.TransportAction;
20+
import org.elasticsearch.cluster.ClusterState;
21+
import org.elasticsearch.cluster.ClusterStateObserver;
22+
import org.elasticsearch.cluster.block.ClusterBlockException;
23+
import org.elasticsearch.cluster.service.ClusterService;
24+
import org.elasticsearch.common.util.concurrent.EsExecutors;
25+
import org.elasticsearch.core.TimeValue;
26+
import org.elasticsearch.node.NodeClosedException;
27+
import org.elasticsearch.tasks.CancellableTask;
28+
import org.elasticsearch.tasks.Task;
29+
import org.elasticsearch.tasks.TaskManager;
30+
31+
import java.util.concurrent.Executor;
32+
33+
import static org.elasticsearch.common.Strings.format;
34+
35+
/**
36+
* Analogue of {@link org.elasticsearch.action.support.master.TransportMasterNodeReadAction} except that it runs on the local node rather
37+
* than delegating to the master.
38+
*/
39+
public abstract class TransportLocalClusterStateAction<Request extends LocalClusterStateRequest, Response extends ActionResponse> extends
40+
TransportAction<Request, Response> {
41+
42+
private static final Logger logger = LogManager.getLogger(TransportLocalClusterStateAction.class);
43+
44+
protected final ClusterService clusterService;
45+
protected final Executor executor;
46+
47+
protected TransportLocalClusterStateAction(
48+
String actionName,
49+
ActionFilters actionFilters,
50+
TaskManager taskManager,
51+
ClusterService clusterService,
52+
Executor executor
53+
) {
54+
// TODO replace DIRECT_EXECUTOR_SERVICE when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
55+
super(actionName, actionFilters, taskManager, EsExecutors.DIRECT_EXECUTOR_SERVICE);
56+
this.clusterService = clusterService;
57+
this.executor = executor;
58+
}
59+
60+
protected abstract ClusterBlockException checkBlock(Request request, ClusterState state);
61+
62+
@Override
63+
protected final void doExecute(Task task, Request request, ActionListener<Response> listener) {
64+
final var state = clusterService.state();
65+
final var clusterBlockException = checkBlock(request, state);
66+
if (clusterBlockException != null) {
67+
if (clusterBlockException.retryable() == false) {
68+
listener.onFailure(clusterBlockException);
69+
} else {
70+
waitForClusterUnblock(task, request, listener, state, clusterBlockException);
71+
}
72+
return;
73+
}
74+
75+
innerDoExecute(task, request, listener, state);
76+
}
77+
78+
private void innerDoExecute(Task task, Request request, ActionListener<Response> listener, ClusterState state) {
79+
if (task instanceof CancellableTask cancellableTask && cancellableTask.notifyIfCancelled(listener)) {
80+
return;
81+
}
82+
// Workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
83+
executor.execute(ActionRunnable.wrap(listener, l -> localClusterStateOperation(task, request, state, l)));
84+
}
85+
86+
private void waitForClusterUnblock(
87+
Task task,
88+
Request request,
89+
ActionListener<Response> listener,
90+
ClusterState initialState,
91+
ClusterBlockException exception
92+
) {
93+
var observer = new ClusterStateObserver(
94+
initialState,
95+
clusterService,
96+
request.clusterUpdateTimeout(),
97+
logger,
98+
clusterService.threadPool().getThreadContext()
99+
);
100+
observer.waitForNextChange(new ClusterStateObserver.Listener() {
101+
@Override
102+
public void onNewClusterState(ClusterState state) {
103+
logger.trace("retrying with cluster state version [{}]", state.version());
104+
innerDoExecute(task, request, listener, state);
105+
}
106+
107+
@Override
108+
public void onClusterServiceClose() {
109+
listener.onFailure(new NodeClosedException(clusterService.localNode()));
110+
}
111+
112+
@Override
113+
public void onTimeout(TimeValue timeout) {
114+
logger.debug(
115+
() -> format("timed out while waiting for cluster to unblock in [%s] (timeout [%s])", actionName, timeout),
116+
exception
117+
);
118+
listener.onFailure(new ElasticsearchTimeoutException("timed out while waiting for cluster to unblock", exception));
119+
}
120+
}, clusterState -> isTaskCancelled(task) || checkBlock(request, clusterState) == null);
121+
}
122+
123+
private boolean isTaskCancelled(Task task) {
124+
return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled();
125+
}
126+
127+
protected abstract void localClusterStateOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
128+
throws Exception;
129+
}

server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetAliasesAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.rest.RestRequest;
2828
import org.elasticsearch.rest.RestResponse;
2929
import org.elasticsearch.rest.RestStatus;
30+
import org.elasticsearch.rest.RestUtils;
3031
import org.elasticsearch.rest.Scope;
3132
import org.elasticsearch.rest.ServerlessScope;
3233
import org.elasticsearch.rest.action.RestBuilderListener;
@@ -207,7 +208,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
207208

208209
final boolean namesProvided = request.hasParam("name");
209210
final String[] aliases = request.paramAsStringArrayOrEmptyIfAll("name");
210-
final GetAliasesRequest getAliasesRequest = new GetAliasesRequest(aliases);
211+
final var masterNodeTimeout = RestUtils.getMasterNodeTimeout(request);
212+
final GetAliasesRequest getAliasesRequest = new GetAliasesRequest(masterNodeTimeout, aliases);
211213
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
212214
getAliasesRequest.indices(indices);
213215
getAliasesRequest.indicesOptions(IndicesOptions.fromRequest(request, getAliasesRequest.indicesOptions()));

0 commit comments

Comments
 (0)