Skip to content

Commit ebd4291

Browse files
committed
Add waiting for master
1 parent 758cc44 commit ebd4291

File tree

13 files changed

+221
-27
lines changed

13 files changed

+221
-27
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateActionDisruptionIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public void testNonLocalRequestAlwaysFindsMaster() throws Exception {
5252
runRepeatedlyWhileChangingMaster(() -> {
5353
final ClusterStateRequestBuilder clusterStateRequestBuilder = clusterAdmin().prepareState(TimeValue.timeValueMillis(100))
5454
.clear()
55+
.setWaitForMaster(true)
5556
.setNodes(true)
5657
.setBlocks(true);
5758
final ClusterStateResponse clusterStateResponse;
@@ -98,6 +99,7 @@ public void testNonLocalRequestAlwaysFindsMasterAndWaitsForMetadata() throws Exc
9899
.cluster()
99100
.prepareState(TimeValue.timeValueMillis(100))
100101
.clear()
102+
.setWaitForMaster(true)
101103
.setNodes(true)
102104
.setMetadata(true)
103105
.setBlocks(true)

server/src/internalClusterTest/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,15 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
3737
logger.info("--> start data node / non master node");
3838
internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
3939
try {
40-
assertThat(clusterAdmin().prepareState(TimeValue.timeValueMillis(100)).get().getState().nodes().getMasterNodeId(), nullValue());
40+
assertThat(
41+
clusterAdmin().prepareState(TimeValue.timeValueMillis(100))
42+
.setWaitForMaster(true)
43+
.get()
44+
.getState()
45+
.nodes()
46+
.getMasterNodeId(),
47+
nullValue()
48+
);
4149
fail("should not be able to find master");
4250
} catch (MasterNotDiscoveredException e) {
4351
// all is well, no master elected
@@ -49,6 +57,7 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
4957
.admin()
5058
.cluster()
5159
.prepareState(TEST_REQUEST_TIMEOUT)
60+
.setWaitForMaster(true)
5261
.get()
5362
.getState()
5463
.nodes()
@@ -74,7 +83,15 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
7483
internalCluster().stopCurrentMasterNode();
7584

7685
try {
77-
assertThat(clusterAdmin().prepareState(TimeValue.timeValueMillis(100)).get().getState().nodes().getMasterNodeId(), nullValue());
86+
assertThat(
87+
clusterAdmin().prepareState(TimeValue.timeValueMillis(100))
88+
.setWaitForMaster(true)
89+
.get()
90+
.getState()
91+
.nodes()
92+
.getMasterNodeId(),
93+
nullValue()
94+
);
7895
fail("should not be able to find master");
7996
} catch (MasterNotDiscoveredException e) {
8097
// all is well, no master elected
@@ -89,6 +106,7 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
89106
.admin()
90107
.cluster()
91108
.prepareState(TEST_REQUEST_TIMEOUT)
109+
.setWaitForMaster(true)
92110
.get()
93111
.getState()
94112
.nodes()
@@ -115,7 +133,15 @@ public void testElectOnlyBetweenMasterNodes() throws Exception {
115133
logger.info("--> start data node / non master node");
116134
internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
117135
try {
118-
assertThat(clusterAdmin().prepareState(TimeValue.timeValueMillis(100)).get().getState().nodes().getMasterNodeId(), nullValue());
136+
assertThat(
137+
clusterAdmin().prepareState(TimeValue.timeValueMillis(100))
138+
.setWaitForMaster(true)
139+
.get()
140+
.getState()
141+
.nodes()
142+
.getMasterNodeId(),
143+
nullValue()
144+
);
119145
fail("should not be able to find master");
120146
} catch (MasterNotDiscoveredException e) {
121147
// all is well, no master elected
@@ -127,6 +153,7 @@ public void testElectOnlyBetweenMasterNodes() throws Exception {
127153
.admin()
128154
.cluster()
129155
.prepareState(TEST_REQUEST_TIMEOUT)
156+
.setWaitForMaster(true)
130157
.get()
131158
.getState()
132159
.nodes()
@@ -186,6 +213,7 @@ public void testElectOnlyBetweenMasterNodes() throws Exception {
186213
.admin()
187214
.cluster()
188215
.prepareState(TEST_REQUEST_TIMEOUT)
216+
.setWaitForMaster(true)
189217
.get()
190218
.getState()
191219
.nodes()
@@ -198,6 +226,7 @@ public void testElectOnlyBetweenMasterNodes() throws Exception {
198226
.admin()
199227
.cluster()
200228
.prepareState(TEST_REQUEST_TIMEOUT)
229+
.setWaitForMaster(true)
201230
.get()
202231
.getState()
203232
.nodes()

server/src/internalClusterTest/java/org/elasticsearch/readiness/ReadinessClusterIT.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,24 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
113113

114114
private void assertMasterNode(Client client, String node) {
115115
assertThat(
116-
client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode().getName(),
116+
client.admin()
117+
.cluster()
118+
.prepareState(TEST_REQUEST_TIMEOUT)
119+
.setWaitForMaster(true)
120+
.get()
121+
.getState()
122+
.nodes()
123+
.getMasterNode()
124+
.getName(),
117125
equalTo(node)
118126
);
119127
}
120128

121129
private void expectMasterNotFound() {
122-
expectThrows(MasterNotDiscoveredException.class, clusterAdmin().prepareState(TimeValue.timeValueMillis(100)));
130+
expectThrows(
131+
MasterNotDiscoveredException.class,
132+
clusterAdmin().prepareState(TimeValue.timeValueMillis(100)).setWaitForMaster(true)
133+
);
123134
}
124135

125136
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/108613")
@@ -194,8 +205,6 @@ public Settings onNodeStopped(String nodeName) throws Exception {
194205
internalCluster().restartNode(masterNode, new InternalTestCluster.RestartCallback() {
195206
@Override
196207
public Settings onNodeStopped(String nodeName) throws Exception {
197-
expectMasterNotFound();
198-
199208
logger.info("--> master node [{}] stopped", nodeName);
200209

201210
for (String dataNode : dataNodes) {

server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/ComponentTemplatesFileSettingsIT.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,16 @@ public class ComponentTemplatesFileSettingsIT extends ESIntegTestCase {
357357

358358
private void assertMasterNode(Client client, String node) throws ExecutionException, InterruptedException {
359359
assertThat(
360-
client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).execute().get().getState().nodes().getMasterNode().getName(),
360+
client.admin()
361+
.cluster()
362+
.prepareState(TEST_REQUEST_TIMEOUT)
363+
.setWaitForMaster(true)
364+
.execute()
365+
.get()
366+
.getState()
367+
.nodes()
368+
.getMasterNode()
369+
.getName(),
361370
equalTo(node)
362371
);
363372
}

server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,15 @@ public void resetVersionCounter() {
127127

128128
private void assertMasterNode(Client client, String node) {
129129
assertThat(
130-
client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode().getName(),
130+
client.admin()
131+
.cluster()
132+
.prepareState(TEST_REQUEST_TIMEOUT)
133+
.setWaitForMaster(true)
134+
.get()
135+
.getState()
136+
.nodes()
137+
.getMasterNode()
138+
.getName(),
131139
equalTo(node)
132140
);
133141
}

server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/RepositoriesFileSettingsIT.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,16 @@ public class RepositoriesFileSettingsIT extends ESIntegTestCase {
9494

9595
private void assertMasterNode(Client client, String node) throws ExecutionException, InterruptedException {
9696
assertThat(
97-
client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).execute().get().getState().nodes().getMasterNode().getName(),
97+
client.admin()
98+
.cluster()
99+
.prepareState(TEST_REQUEST_TIMEOUT)
100+
.setWaitForMaster(true)
101+
.execute()
102+
.get()
103+
.getState()
104+
.nodes()
105+
.getMasterNode()
106+
.getName(),
98107
equalTo(node)
99108
);
100109
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,9 @@ public ClusterStateRequestBuilder setWaitForTimeOut(TimeValue waitForTimeout) {
108108
request.waitForTimeout(waitForTimeout);
109109
return this;
110110
}
111+
112+
public ClusterStateRequestBuilder setWaitForMaster(boolean waitForMaster) {
113+
request.waitForMaster(waitForMaster);
114+
return this;
115+
}
111116
}

server/src/main/java/org/elasticsearch/action/support/local/LocalClusterStateRequest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public abstract class LocalClusterStateRequest extends ActionRequest {
3131
*/
3232
private final TimeValue masterTimeout;
3333

34+
private boolean waitForMaster = false;
35+
3436
protected LocalClusterStateRequest(TimeValue masterTimeout) {
3537
this.masterTimeout = Objects.requireNonNull(masterTimeout);
3638
}
@@ -57,4 +59,13 @@ public void writeTo(StreamOutput out) throws IOException {
5759
public TimeValue masterTimeout() {
5860
return masterTimeout;
5961
}
62+
63+
public LocalClusterStateRequest waitForMaster(boolean waitForMaster) {
64+
this.waitForMaster = waitForMaster;
65+
return this;
66+
}
67+
68+
public boolean waitForMaster() {
69+
return waitForMaster;
70+
}
6071
}

server/src/main/java/org/elasticsearch/action/support/local/TransportLocalClusterStateAction.java

Lines changed: 104 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@
2222
import org.elasticsearch.cluster.block.ClusterBlockException;
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.util.concurrent.EsExecutors;
25+
import org.elasticsearch.core.Strings;
2526
import org.elasticsearch.core.TimeValue;
27+
import org.elasticsearch.discovery.MasterNotDiscoveredException;
2628
import org.elasticsearch.node.NodeClosedException;
2729
import org.elasticsearch.tasks.CancellableTask;
2830
import org.elasticsearch.tasks.Task;
31+
import org.elasticsearch.tasks.TaskCancelledException;
2932
import org.elasticsearch.tasks.TaskManager;
3033

3134
import java.util.concurrent.Executor;
35+
import java.util.function.Predicate;
3236

3337
import static org.elasticsearch.common.Strings.format;
3438

@@ -65,15 +69,19 @@ protected abstract void localClusterStateOperation(Task task, Request request, C
6569
@Override
6670
protected final void doExecute(Task task, Request request, ActionListener<Response> listener) {
6771
final var state = clusterService.state();
68-
final var clusterBlockException = checkBlock(request, state);
69-
if (clusterBlockException != null) {
70-
if (clusterBlockException.retryable() == false) {
71-
listener.onFailure(clusterBlockException);
72+
if (request.waitForMaster() && state.nodes().getMasterNode() == null) {
73+
waitForClusterUnblock(task, request, listener, state, null);
74+
} else {
75+
final var clusterBlockException = checkBlock(request, state);
76+
if (clusterBlockException != null) {
77+
if (clusterBlockException.retryable() == false) {
78+
listener.onFailure(clusterBlockException);
79+
} else {
80+
waitForClusterUnblock(task, request, listener, state, clusterBlockException);
81+
}
7282
} else {
73-
waitForClusterUnblock(task, request, listener, state, clusterBlockException);
83+
innerDoExecute(task, request, listener, state);
7484
}
75-
} else {
76-
innerDoExecute(task, request, listener, state);
7785
}
7886
}
7987

@@ -113,16 +121,99 @@ public void onClusterServiceClose() {
113121

114122
@Override
115123
public void onTimeout(TimeValue timeout) {
116-
logger.debug(
117-
() -> format("timed out while waiting for cluster to unblock in [%s] (timeout [%s])", actionName, timeout),
118-
exception
124+
logger.debug(() -> format("timed out while retrying [%s] after failure (timeout [%s])", actionName, timeout), exception);
125+
final var timeoutException = initialState.nodes().getMasterNode() == null
126+
? new MasterNotDiscoveredException()
127+
: new ElasticsearchTimeoutException("timed out while waiting for cluster to unblock", exception);
128+
listener.onFailure(timeoutException);
129+
}
130+
}, clusterState -> isTaskCancelled(task) || isClusterStateReady(request, clusterState));
131+
}
132+
133+
private boolean isClusterStateReady(Request request, ClusterState clusterState) {
134+
if (request.waitForMaster() && clusterState.nodes().getMasterNode() == null) {
135+
return false;
136+
}
137+
return checkBlock(request, clusterState) == null;
138+
}
139+
140+
private class AsyncSingleAction {
141+
142+
private final ActionListener<Response> listener;
143+
private final Request request;
144+
private final Task task;
145+
private final long startTime;
146+
private ClusterStateObserver observer;
147+
148+
AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
149+
this.task = task;
150+
this.request = request;
151+
this.listener = listener;
152+
this.startTime = clusterService.threadPool().relativeTimeInMillis();
153+
}
154+
155+
protected void doStart(ClusterState clusterState) {
156+
if (isTaskCancelled()) {
157+
listener.onFailure(new TaskCancelledException("Task was cancelled"));
158+
return;
159+
}
160+
if (clusterState.nodes().getMasterNode() == null) {
161+
return;
162+
}
163+
}
164+
165+
private void retry(long currentStateVersion, final Throwable failure, final Predicate<ClusterState> statePredicate) {
166+
if (observer == null) {
167+
final TimeValue timeout;
168+
if (request.masterTimeout().millis() < 0) {
169+
timeout = null;
170+
} else {
171+
final long remainingTimeoutMS = request.masterTimeout().millis() - (clusterService.threadPool().relativeTimeInMillis()
172+
- startTime);
173+
if (remainingTimeoutMS <= 0) {
174+
logger.debug(() -> "timed out before retrying [" + actionName + "] after failure", failure);
175+
listener.onFailure(new MasterNotDiscoveredException(failure));
176+
return;
177+
}
178+
timeout = TimeValue.timeValueMillis(remainingTimeoutMS);
179+
}
180+
this.observer = new ClusterStateObserver(
181+
currentStateVersion,
182+
clusterService.getClusterApplierService(),
183+
timeout,
184+
logger,
185+
clusterService.threadPool().getThreadContext()
119186
);
120-
listener.onFailure(new ElasticsearchTimeoutException("timed out while waiting for cluster to unblock", exception));
121187
}
122-
}, clusterState -> isTaskCancelled(task) || checkBlock(request, clusterState) == null);
188+
observer.waitForNextChange(new ClusterStateObserver.Listener() {
189+
@Override
190+
public void onNewClusterState(ClusterState state) {
191+
logger.trace("retrying with cluster state version [{}]", state.version());
192+
doStart(state);
193+
}
194+
195+
@Override
196+
public void onClusterServiceClose() {
197+
listener.onFailure(new NodeClosedException(clusterService.localNode()));
198+
}
199+
200+
@Override
201+
public void onTimeout(TimeValue timeout) {
202+
logger.debug(
203+
() -> Strings.format("timed out while retrying [%s] after failure (timeout [%s])", actionName, timeout),
204+
failure
205+
);
206+
listener.onFailure(new MasterNotDiscoveredException(failure));
207+
}
208+
}, clusterState -> isTaskCancelled() || statePredicate.test(clusterState));
209+
}
210+
211+
private boolean isTaskCancelled() {
212+
return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled();
213+
}
123214
}
124215

125-
private boolean isTaskCancelled(Task task) {
216+
private static boolean isTaskCancelled(Task task) {
126217
return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled();
127218
}
128219
}

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,7 @@ protected final void doEnsureClusterStateConsistency(NamedWriteableRegistry name
12311231
}
12321232
try (RefCountingListener refCountingListener = new RefCountingListener(future)) {
12331233
SubscribableListener.<ClusterStateResponse>newForked(
1234-
l -> client().admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).all().execute(l)
1234+
l -> client().admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setWaitForMaster(true).all().execute(l)
12351235
).andThenAccept(masterStateResponse -> {
12361236
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterStateResponse.getState());
12371237
// remove local node reference

0 commit comments

Comments
 (0)