Skip to content

Commit 69a089e

Browse files
authored
Fix cluster settings update task acknowledgment (elastic#97111) (elastic#97148)
ClusterUpdateSettingsTask uses a "changed" variable to set the value of the acknowledged flag of the ClusterUpdateSettingsResponse that is returned to the client. The "changed" variable is computed from the cluster state update but there is no guarantee that the update will be acknowledged. In case the update failed to be acked (onAckFailure) it should return the flag appropriately.
1 parent b811d11 commit 69a089e

File tree

3 files changed

+118
-6
lines changed

3 files changed

+118
-6
lines changed

docs/changelog/97111.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 97111
2+
summary: Fix cluster settings update task acknowledgment
3+
area: Cluster Coordination
4+
type: bug
5+
issues: []
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster.settings;
10+
11+
import org.elasticsearch.common.settings.Setting;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.core.TimeValue;
14+
import org.elasticsearch.logging.LogManager;
15+
import org.elasticsearch.logging.Logger;
16+
import org.elasticsearch.plugins.Plugin;
17+
import org.elasticsearch.test.ESIntegTestCase;
18+
import org.elasticsearch.test.disruption.NetworkDisruption;
19+
import org.elasticsearch.test.transport.MockTransportService;
20+
21+
import java.util.Collection;
22+
import java.util.List;
23+
import java.util.Set;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.stream.Collectors;
27+
28+
import static org.hamcrest.CoreMatchers.equalTo;
29+
import static org.hamcrest.Matchers.not;
30+
31+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
32+
public class ClusterSettingsUpdateWithFaultyMasterIT extends ESIntegTestCase {
33+
34+
@Override
35+
protected Collection<Class<? extends Plugin>> nodePlugins() {
36+
return List.of(BlockingClusterSettingTestPlugin.class, MockTransportService.TestPlugin.class);
37+
}
38+
39+
public void testClusterSettingsUpdateNotAcknowledged() throws Exception {
40+
final var nodes = internalCluster().startMasterOnlyNodes(3);
41+
final String masterNode = internalCluster().getMasterName();
42+
final String blockedNode = randomValueOtherThan(masterNode, () -> randomFrom(nodes));
43+
assertThat(blockedNode, not(equalTo(internalCluster().getMasterName())));
44+
ensureStableCluster(3);
45+
46+
NetworkDisruption networkDisruption = new NetworkDisruption(
47+
new NetworkDisruption.TwoPartitions(
48+
Set.of(blockedNode),
49+
nodes.stream().filter(n -> n.equals(blockedNode) == false).collect(Collectors.toSet())
50+
),
51+
NetworkDisruption.DISCONNECT
52+
);
53+
internalCluster().setDisruptionScheme(networkDisruption);
54+
55+
logger.debug("--> updating cluster settings");
56+
var future = client(masterNode).admin()
57+
.cluster()
58+
.prepareUpdateSettings()
59+
.setPersistentSettings(Settings.builder().put(BlockingClusterSettingTestPlugin.TEST_BLOCKING_SETTING.getKey(), true).build())
60+
.setMasterNodeTimeout(TimeValue.timeValueMillis(10L))
61+
.execute();
62+
63+
logger.debug("--> waiting for cluster state update to be blocked");
64+
BlockingClusterSettingTestPlugin.blockLatch.await();
65+
66+
logger.debug("--> isolating master eligible node [{}] from other nodes", blockedNode);
67+
networkDisruption.startDisrupting();
68+
69+
logger.debug("--> unblocking cluster state update");
70+
BlockingClusterSettingTestPlugin.releaseLatch.countDown();
71+
72+
assertThat("--> cluster settings update should not be acknowledged", future.get().isAcknowledged(), equalTo(false));
73+
74+
logger.debug("--> stop network disruption");
75+
networkDisruption.stopDisrupting();
76+
ensureStableCluster(3);
77+
}
78+
79+
public static class BlockingClusterSettingTestPlugin extends Plugin {
80+
81+
private static final Logger logger = LogManager.getLogger(BlockingClusterSettingTestPlugin.class);
82+
83+
private static final CountDownLatch blockLatch = new CountDownLatch(1);
84+
private static final CountDownLatch releaseLatch = new CountDownLatch(1);
85+
private static final AtomicBoolean blockOnce = new AtomicBoolean();
86+
87+
public static final Setting<Boolean> TEST_BLOCKING_SETTING = Setting.boolSetting("cluster.test.blocking_setting", false, value -> {
88+
if (blockOnce.compareAndSet(false, true)) {
89+
logger.debug("--> setting validation is now blocking cluster state update");
90+
blockLatch.countDown();
91+
try {
92+
logger.debug("--> setting validation is now waiting for release");
93+
releaseLatch.await();
94+
logger.debug("--> setting validation is done");
95+
} catch (InterruptedException e) {
96+
Thread.currentThread().interrupt();
97+
throw new AssertionError(e);
98+
}
99+
}
100+
}, Setting.Property.NodeScope, Setting.Property.Dynamic);
101+
102+
@Override
103+
public List<Setting<?>> getSettings() {
104+
return List.of(TEST_BLOCKING_SETTING);
105+
}
106+
}
107+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
159159

160160
@Override
161161
public void onAllNodesAcked() {
162-
if (changed) {
162+
if (reroute) {
163163
reroute(true);
164164
} else {
165165
super.onAllNodesAcked();
@@ -168,16 +168,16 @@ public void onAllNodesAcked() {
168168

169169
@Override
170170
public void onAckFailure(Exception e) {
171-
if (changed) {
172-
reroute(true);
171+
if (reroute) {
172+
reroute(false);
173173
} else {
174174
super.onAckFailure(e);
175175
}
176176
}
177177

178178
@Override
179179
public void onAckTimeout() {
180-
if (changed) {
180+
if (reroute) {
181181
reroute(false);
182182
} else {
183183
super.onAckTimeout();
@@ -239,7 +239,7 @@ public void onFailure(Exception e) {
239239
}
240240

241241
public static class ClusterUpdateSettingsTask extends AckedClusterStateUpdateTask {
242-
protected volatile boolean changed = false;
242+
protected volatile boolean reroute = false;
243243
protected final SettingsUpdater updater;
244244
protected final ClusterUpdateSettingsRequest request;
245245
private final ClusterSettings clusterSettings;
@@ -271,7 +271,7 @@ public ClusterState execute(final ClusterState currentState) {
271271
clusterSettings.upgradeSettings(request.persistentSettings()),
272272
logger
273273
);
274-
changed = clusterState != currentState;
274+
reroute = clusterState != currentState;
275275
return clusterState;
276276
}
277277
}

0 commit comments

Comments
 (0)