Skip to content

Commit 037d2f1

Browse files
committed
Add waiting for master
1 parent 1b81ef6 commit 037d2f1

File tree

14 files changed

+222
-28
lines changed

14 files changed

+222
-28
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateActionDisruptionIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public void testNonLocalRequestAlwaysFindsMaster() throws Exception {
5252
runRepeatedlyWhileChangingMaster(() -> {
5353
final ClusterStateRequestBuilder clusterStateRequestBuilder = clusterAdmin().prepareState(TimeValue.timeValueMillis(100))
5454
.clear()
55+
.setWaitForMaster(true)
5556
.setNodes(true)
5657
.setBlocks(true);
5758
final ClusterStateResponse clusterStateResponse;
@@ -98,6 +99,7 @@ public void testNonLocalRequestAlwaysFindsMasterAndWaitsForMetadata() throws Exc
9899
.cluster()
99100
.prepareState(TimeValue.timeValueMillis(100))
100101
.clear()
102+
.setWaitForMaster(true)
101103
.setNodes(true)
102104
.setMetadata(true)
103105
.setBlocks(true)

server/src/internalClusterTest/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,15 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
3737
logger.info("--> start data node / non master node");
3838
internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
3939
try {
40-
assertThat(clusterAdmin().prepareState(TimeValue.timeValueMillis(100)).get().getState().nodes().getMasterNodeId(), nullValue());
40+
assertThat(
41+
clusterAdmin().prepareState(TimeValue.timeValueMillis(100))
42+
.setWaitForMaster(true)
43+
.get()
44+
.getState()
45+
.nodes()
46+
.getMasterNodeId(),
47+
nullValue()
48+
);
4149
fail("should not be able to find master");
4250
} catch (MasterNotDiscoveredException e) {
4351
// all is well, no master elected
@@ -49,6 +57,7 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
4957
.admin()
5058
.cluster()
5159
.prepareState(TEST_REQUEST_TIMEOUT)
60+
.setWaitForMaster(true)
5261
.get()
5362
.getState()
5463
.nodes()
@@ -74,7 +83,15 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
7483
internalCluster().stopCurrentMasterNode();
7584

7685
try {
77-
assertThat(clusterAdmin().prepareState(TimeValue.timeValueMillis(100)).get().getState().nodes().getMasterNodeId(), nullValue());
86+
assertThat(
87+
clusterAdmin().prepareState(TimeValue.timeValueMillis(100))
88+
.setWaitForMaster(true)
89+
.get()
90+
.getState()
91+
.nodes()
92+
.getMasterNodeId(),
93+
nullValue()
94+
);
7895
fail("should not be able to find master");
7996
} catch (MasterNotDiscoveredException e) {
8097
// all is well, no master elected
@@ -89,6 +106,7 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
89106
.admin()
90107
.cluster()
91108
.prepareState(TEST_REQUEST_TIMEOUT)
109+
.setWaitForMaster(true)
92110
.get()
93111
.getState()
94112
.nodes()
@@ -115,7 +133,15 @@ public void testElectOnlyBetweenMasterNodes() throws Exception {
115133
logger.info("--> start data node / non master node");
116134
internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
117135
try {
118-
assertThat(clusterAdmin().prepareState(TimeValue.timeValueMillis(100)).get().getState().nodes().getMasterNodeId(), nullValue());
136+
assertThat(
137+
clusterAdmin().prepareState(TimeValue.timeValueMillis(100))
138+
.setWaitForMaster(true)
139+
.get()
140+
.getState()
141+
.nodes()
142+
.getMasterNodeId(),
143+
nullValue()
144+
);
119145
fail("should not be able to find master");
120146
} catch (MasterNotDiscoveredException e) {
121147
// all is well, no master elected
@@ -127,6 +153,7 @@ public void testElectOnlyBetweenMasterNodes() throws Exception {
127153
.admin()
128154
.cluster()
129155
.prepareState(TEST_REQUEST_TIMEOUT)
156+
.setWaitForMaster(true)
130157
.get()
131158
.getState()
132159
.nodes()
@@ -186,6 +213,7 @@ public void testElectOnlyBetweenMasterNodes() throws Exception {
186213
.admin()
187214
.cluster()
188215
.prepareState(TEST_REQUEST_TIMEOUT)
216+
.setWaitForMaster(true)
189217
.get()
190218
.getState()
191219
.nodes()
@@ -198,6 +226,7 @@ public void testElectOnlyBetweenMasterNodes() throws Exception {
198226
.admin()
199227
.cluster()
200228
.prepareState(TEST_REQUEST_TIMEOUT)
229+
.setWaitForMaster(true)
201230
.get()
202231
.getState()
203232
.nodes()

server/src/internalClusterTest/java/org/elasticsearch/readiness/ReadinessClusterIT.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,24 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
112112

113113
private void assertMasterNode(Client client, String node) {
114114
assertThat(
115-
client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode().getName(),
115+
client.admin()
116+
.cluster()
117+
.prepareState(TEST_REQUEST_TIMEOUT)
118+
.setWaitForMaster(true)
119+
.get()
120+
.getState()
121+
.nodes()
122+
.getMasterNode()
123+
.getName(),
116124
equalTo(node)
117125
);
118126
}
119127

120128
private void expectMasterNotFound() {
121-
expectThrows(MasterNotDiscoveredException.class, clusterAdmin().prepareState(TimeValue.timeValueMillis(100)));
129+
expectThrows(
130+
MasterNotDiscoveredException.class,
131+
clusterAdmin().prepareState(TimeValue.timeValueMillis(100)).setWaitForMaster(true)
132+
);
122133
}
123134

124135
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/108613")
@@ -193,8 +204,6 @@ public Settings onNodeStopped(String nodeName) throws Exception {
193204
internalCluster().restartNode(masterNode, new InternalTestCluster.RestartCallback() {
194205
@Override
195206
public Settings onNodeStopped(String nodeName) throws Exception {
196-
expectMasterNotFound();
197-
198207
logger.info("--> master node [{}] stopped", nodeName);
199208

200209
for (String dataNode : dataNodes) {

server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/ComponentTemplatesFileSettingsIT.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,16 @@ public class ComponentTemplatesFileSettingsIT extends ESIntegTestCase {
357357

358358
private void assertMasterNode(Client client, String node) throws ExecutionException, InterruptedException {
359359
assertThat(
360-
client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).execute().get().getState().nodes().getMasterNode().getName(),
360+
client.admin()
361+
.cluster()
362+
.prepareState(TEST_REQUEST_TIMEOUT)
363+
.setWaitForMaster(true)
364+
.execute()
365+
.get()
366+
.getState()
367+
.nodes()
368+
.getMasterNode()
369+
.getName(),
361370
equalTo(node)
362371
);
363372
}

server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,15 @@ public void resetVersionCounter() {
127127

128128
private void assertMasterNode(Client client, String node) {
129129
assertThat(
130-
client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode().getName(),
130+
client.admin()
131+
.cluster()
132+
.prepareState(TEST_REQUEST_TIMEOUT)
133+
.setWaitForMaster(true)
134+
.get()
135+
.getState()
136+
.nodes()
137+
.getMasterNode()
138+
.getName(),
131139
equalTo(node)
132140
);
133141
}

server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/RepositoriesFileSettingsIT.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,16 @@ public class RepositoriesFileSettingsIT extends ESIntegTestCase {
9494

9595
private void assertMasterNode(Client client, String node) throws ExecutionException, InterruptedException {
9696
assertThat(
97-
client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).execute().get().getState().nodes().getMasterNode().getName(),
97+
client.admin()
98+
.cluster()
99+
.prepareState(TEST_REQUEST_TIMEOUT)
100+
.setWaitForMaster(true)
101+
.execute()
102+
.get()
103+
.getState()
104+
.nodes()
105+
.getMasterNode()
106+
.getName(),
98107
equalTo(node)
99108
);
100109
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,9 @@ public ClusterStateRequestBuilder setWaitForTimeOut(TimeValue waitForTimeout) {
108108
request.waitForTimeout(waitForTimeout);
109109
return this;
110110
}
111+
112+
public ClusterStateRequestBuilder setWaitForMaster(boolean waitForMaster) {
113+
request.waitForMaster(waitForMaster);
114+
return this;
115+
}
111116
}

server/src/main/java/org/elasticsearch/action/support/local/LocalClusterStateRequest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public abstract class LocalClusterStateRequest extends ActionRequest {
3232
*/
3333
private final TimeValue masterTimeout;
3434

35+
private boolean waitForMaster = false;
36+
3537
protected LocalClusterStateRequest(TimeValue masterTimeout) {
3638
this.masterTimeout = Objects.requireNonNull(masterTimeout);
3739
}
@@ -74,4 +76,13 @@ public ActionRequestValidationException validate() {
7476
public TimeValue masterTimeout() {
7577
return masterTimeout;
7678
}
79+
80+
public LocalClusterStateRequest waitForMaster(boolean waitForMaster) {
81+
this.waitForMaster = waitForMaster;
82+
return this;
83+
}
84+
85+
public boolean waitForMaster() {
86+
return waitForMaster;
87+
}
7788
}

server/src/main/java/org/elasticsearch/action/support/local/TransportLocalClusterStateAction.java

Lines changed: 104 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.util.concurrent.EsExecutors;
2525
import org.elasticsearch.core.FixForMultiProject;
26+
import org.elasticsearch.core.Strings;
2627
import org.elasticsearch.core.TimeValue;
28+
import org.elasticsearch.discovery.MasterNotDiscoveredException;
2729
import org.elasticsearch.node.NodeClosedException;
2830
import org.elasticsearch.tasks.CancellableTask;
2931
import org.elasticsearch.tasks.Task;
32+
import org.elasticsearch.tasks.TaskCancelledException;
3033
import org.elasticsearch.tasks.TaskManager;
3134

3235
import java.util.concurrent.Executor;
36+
import java.util.function.Predicate;
3337

3438
import static org.elasticsearch.common.Strings.format;
3539

@@ -70,15 +74,19 @@ protected final void doExecute(Task task, Request request, ActionListener<Respon
7074
request.setParentTask(clusterService.localNode().getId(), task.getId());
7175
}
7276
final var state = clusterService.state();
73-
final var clusterBlockException = checkBlock(request, state);
74-
if (clusterBlockException != null) {
75-
if (clusterBlockException.retryable() == false) {
76-
listener.onFailure(clusterBlockException);
77+
if (request.waitForMaster() && state.nodes().getMasterNode() == null) {
78+
waitForClusterUnblock(task, request, listener, state, null);
79+
} else {
80+
final var clusterBlockException = checkBlock(request, state);
81+
if (clusterBlockException != null) {
82+
if (clusterBlockException.retryable() == false) {
83+
listener.onFailure(clusterBlockException);
84+
} else {
85+
waitForClusterUnblock(task, request, listener, state, clusterBlockException);
86+
}
7787
} else {
78-
waitForClusterUnblock(task, request, listener, state, clusterBlockException);
88+
innerDoExecute(task, request, listener, state);
7989
}
80-
} else {
81-
innerDoExecute(task, request, listener, state);
8290
}
8391
}
8492

@@ -118,16 +126,99 @@ public void onClusterServiceClose() {
118126

119127
@Override
120128
public void onTimeout(TimeValue timeout) {
121-
logger.debug(
122-
() -> format("timed out while waiting for cluster to unblock in [%s] (timeout [%s])", actionName, timeout),
123-
exception
129+
logger.debug(() -> format("timed out while retrying [%s] after failure (timeout [%s])", actionName, timeout), exception);
130+
final var timeoutException = initialState.nodes().getMasterNode() == null
131+
? new MasterNotDiscoveredException()
132+
: new ElasticsearchTimeoutException("timed out while waiting for cluster to unblock", exception);
133+
listener.onFailure(timeoutException);
134+
}
135+
}, clusterState -> isTaskCancelled(task) || isClusterStateReady(request, clusterState));
136+
}
137+
138+
private boolean isClusterStateReady(Request request, ClusterState clusterState) {
139+
if (request.waitForMaster() && clusterState.nodes().getMasterNode() == null) {
140+
return false;
141+
}
142+
return checkBlock(request, clusterState) == null;
143+
}
144+
145+
private class AsyncSingleAction {
146+
147+
private final ActionListener<Response> listener;
148+
private final Request request;
149+
private final Task task;
150+
private final long startTime;
151+
private ClusterStateObserver observer;
152+
153+
AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
154+
this.task = task;
155+
this.request = request;
156+
this.listener = listener;
157+
this.startTime = clusterService.threadPool().relativeTimeInMillis();
158+
}
159+
160+
protected void doStart(ClusterState clusterState) {
161+
if (isTaskCancelled()) {
162+
listener.onFailure(new TaskCancelledException("Task was cancelled"));
163+
return;
164+
}
165+
if (clusterState.nodes().getMasterNode() == null) {
166+
return;
167+
}
168+
}
169+
170+
private void retry(long currentStateVersion, final Throwable failure, final Predicate<ClusterState> statePredicate) {
171+
if (observer == null) {
172+
final TimeValue timeout;
173+
if (request.masterTimeout().millis() < 0) {
174+
timeout = null;
175+
} else {
176+
final long remainingTimeoutMS = request.masterTimeout().millis() - (clusterService.threadPool().relativeTimeInMillis()
177+
- startTime);
178+
if (remainingTimeoutMS <= 0) {
179+
logger.debug(() -> "timed out before retrying [" + actionName + "] after failure", failure);
180+
listener.onFailure(new MasterNotDiscoveredException(failure));
181+
return;
182+
}
183+
timeout = TimeValue.timeValueMillis(remainingTimeoutMS);
184+
}
185+
this.observer = new ClusterStateObserver(
186+
currentStateVersion,
187+
clusterService.getClusterApplierService(),
188+
timeout,
189+
logger,
190+
clusterService.threadPool().getThreadContext()
124191
);
125-
listener.onFailure(new ElasticsearchTimeoutException("timed out while waiting for cluster to unblock", exception));
126192
}
127-
}, clusterState -> isTaskCancelled(task) || checkBlock(request, clusterState) == null);
193+
observer.waitForNextChange(new ClusterStateObserver.Listener() {
194+
@Override
195+
public void onNewClusterState(ClusterState state) {
196+
logger.trace("retrying with cluster state version [{}]", state.version());
197+
doStart(state);
198+
}
199+
200+
@Override
201+
public void onClusterServiceClose() {
202+
listener.onFailure(new NodeClosedException(clusterService.localNode()));
203+
}
204+
205+
@Override
206+
public void onTimeout(TimeValue timeout) {
207+
logger.debug(
208+
() -> Strings.format("timed out while retrying [%s] after failure (timeout [%s])", actionName, timeout),
209+
failure
210+
);
211+
listener.onFailure(new MasterNotDiscoveredException(failure));
212+
}
213+
}, clusterState -> isTaskCancelled() || statePredicate.test(clusterState));
214+
}
215+
216+
private boolean isTaskCancelled() {
217+
return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled();
218+
}
128219
}
129220

130-
private boolean isTaskCancelled(Task task) {
221+
private static boolean isTaskCancelled(Task task) {
131222
return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled();
132223
}
133224
}

server/src/test/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ private ClusterStateResponse executeAction(ProjectResolver projectResolver, Clus
206206
projectResolver
207207
);
208208
final PlainActionFuture<ClusterStateResponse> future = new PlainActionFuture<>();
209-
action.masterOperation(task, request, state, future);
209+
action.localClusterStateOperation(task, request, state, future);
210210
return future.get();
211211
}
212212

0 commit comments

Comments
 (0)