Skip to content

Commit dd50942

Browse files
[8.x] Reprocess operator file settings on service start (#114295) (#115198)
* Reprocess operator file settings on service start (#114295) Changes `FileSettingsService` to reprocess file settings on every restart or master node change, even if versions match between file and cluster-state metadata. If the file version is lower than the metadata version, processing is still skipped to avoid applying stale settings. This makes it easier for consumers of file settings to change their behavior w.r.t. file settings contents. For instance, an update of how role mappings are stored will automatically apply on the next restart, without the need to manually increment the file settings version to force reprocessing. Relates: ES-9628 * Backport 114295 --------- Co-authored-by: Elastic Machine <[email protected]>
1 parent 50e63ed commit dd50942

File tree

17 files changed

+762
-115
lines changed

17 files changed

+762
-115
lines changed

docs/changelog/114295.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114295
2+
summary: "Reprocess operator file settings when settings service starts, due to node restart or master node change"
3+
area: Infra/Settings
4+
type: enhancement
5+
issues: [ ]
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.upgrades;
11+
12+
import com.carrotsearch.randomizedtesting.annotations.Name;
13+
14+
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.core.SuppressForbidden;
16+
import org.elasticsearch.test.XContentTestUtils;
17+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
18+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
19+
import org.elasticsearch.test.cluster.util.resource.Resource;
20+
import org.junit.Before;
21+
import org.junit.ClassRule;
22+
import org.junit.rules.RuleChain;
23+
import org.junit.rules.TemporaryFolder;
24+
import org.junit.rules.TestRule;
25+
26+
import java.io.IOException;
27+
import java.util.List;
28+
import java.util.function.Supplier;
29+
30+
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.is;
32+
import static org.hamcrest.Matchers.not;
33+
import static org.hamcrest.Matchers.nullValue;
34+
35+
public class FileSettingsRoleMappingUpgradeIT extends ParameterizedRollingUpgradeTestCase {
36+
37+
private static final String settingsJSON = """
38+
{
39+
"metadata": {
40+
"version": "1",
41+
"compatibility": "8.4.0"
42+
},
43+
"state": {
44+
"role_mappings": {
45+
"everyone_kibana": {
46+
"enabled": true,
47+
"roles": [ "kibana_user" ],
48+
"rules": { "field": { "username": "*" } }
49+
}
50+
}
51+
}
52+
}""";
53+
54+
private static final TemporaryFolder repoDirectory = new TemporaryFolder();
55+
56+
private static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
57+
.distribution(DistributionType.DEFAULT)
58+
.version(getOldClusterTestVersion())
59+
.nodes(NODE_NUM)
60+
.setting("path.repo", new Supplier<>() {
61+
@Override
62+
@SuppressForbidden(reason = "TemporaryFolder only has io.File methods, not nio.File")
63+
public String get() {
64+
return repoDirectory.getRoot().getPath();
65+
}
66+
})
67+
.setting("xpack.security.enabled", "true")
68+
// workaround to avoid having to set up clients and authorization headers
69+
.setting("xpack.security.authc.anonymous.roles", "superuser")
70+
.configFile("operator/settings.json", Resource.fromString(settingsJSON))
71+
.build();
72+
73+
@ClassRule
74+
public static TestRule ruleChain = RuleChain.outerRule(repoDirectory).around(cluster);
75+
76+
public FileSettingsRoleMappingUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
77+
super(upgradedNodes);
78+
}
79+
80+
@Override
81+
protected ElasticsearchCluster getUpgradeCluster() {
82+
return cluster;
83+
}
84+
85+
@Before
86+
public void checkVersions() {
87+
assumeTrue(
88+
"Only relevant when upgrading from a version before role mappings were stored in cluster state",
89+
oldClusterHasFeature("gte_v8.4.0") && oldClusterHasFeature("gte_v8.15.0") == false
90+
);
91+
}
92+
93+
public void testRoleMappingsAppliedOnUpgrade() throws IOException {
94+
if (isOldCluster()) {
95+
Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
96+
List<Object> roleMappings = new XContentTestUtils.JsonMapView(entityAsMap(client().performRequest(clusterStateRequest))).get(
97+
"metadata.role_mappings.role_mappings"
98+
);
99+
assertThat(roleMappings, is(nullValue()));
100+
} else if (isUpgradedCluster()) {
101+
// the nodes have all been upgraded. Check they re-processed the role mappings in the settings file on
102+
// upgrade
103+
Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
104+
List<Object> roleMappings = new XContentTestUtils.JsonMapView(entityAsMap(client().performRequest(clusterStateRequest))).get(
105+
"metadata.role_mappings.role_mappings"
106+
);
107+
assertThat(roleMappings, is(not(nullValue())));
108+
assertThat(roleMappings.size(), equalTo(1));
109+
}
110+
}
111+
}

server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.core.Tuple;
2626
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
2727
import org.elasticsearch.test.ESIntegTestCase;
28+
import org.junit.Before;
2829

2930
import java.nio.charset.StandardCharsets;
3031
import java.nio.file.Files;
@@ -40,6 +41,7 @@
4041
import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
4142
import static org.elasticsearch.test.NodeRoles.masterNode;
4243
import static org.hamcrest.Matchers.allOf;
44+
import static org.hamcrest.Matchers.containsInAnyOrder;
4345
import static org.hamcrest.Matchers.containsString;
4446
import static org.hamcrest.Matchers.equalTo;
4547
import static org.hamcrest.Matchers.hasSize;
@@ -50,7 +52,12 @@
5052
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
5153
public class FileSettingsServiceIT extends ESIntegTestCase {
5254

53-
private static final AtomicLong versionCounter = new AtomicLong(1);
55+
private final AtomicLong versionCounter = new AtomicLong(1);
56+
57+
@Before
58+
public void resetVersionCounter() {
59+
versionCounter.set(1);
60+
}
5461

5562
private static final String testJSON = """
5663
{
@@ -102,15 +109,29 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
102109
}
103110
}""";
104111

112+
private static final String testOtherErrorJSON = """
113+
{
114+
"metadata": {
115+
"version": "%s",
116+
"compatibility": "8.4.0"
117+
},
118+
"state": {
119+
"bad_cluster_settings": {
120+
"search.allow_expensive_queries": "false"
121+
}
122+
}
123+
}""";
124+
105125
private void assertMasterNode(Client client, String node) {
106126
assertThat(
107127
client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().nodes().getMasterNode().getName(),
108128
equalTo(node)
109129
);
110130
}
111131

112-
public static void writeJSONFile(String node, String json, AtomicLong versionCounter, Logger logger) throws Exception {
113-
long version = versionCounter.incrementAndGet();
132+
public static void writeJSONFile(String node, String json, AtomicLong versionCounter, Logger logger, boolean incrementVersion)
133+
throws Exception {
134+
long version = incrementVersion ? versionCounter.incrementAndGet() : versionCounter.get();
114135

115136
FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);
116137

@@ -124,6 +145,15 @@ public static void writeJSONFile(String node, String json, AtomicLong versionCou
124145
logger.info("--> After writing new settings file: [{}]", settingsFileContent);
125146
}
126147

148+
public static void writeJSONFile(String node, String json, AtomicLong versionCounter, Logger logger) throws Exception {
149+
writeJSONFile(node, json, versionCounter, logger, true);
150+
}
151+
152+
public static void writeJSONFileWithoutVersionIncrement(String node, String json, AtomicLong versionCounter, Logger logger)
153+
throws Exception {
154+
writeJSONFile(node, json, versionCounter, logger, false);
155+
}
156+
127157
private Tuple<CountDownLatch, AtomicLong> setupCleanupClusterStateListener(String node) {
128158
ClusterService clusterService = internalCluster().clusterService(node);
129159
CountDownLatch savedClusterState = new CountDownLatch(1);
@@ -171,7 +201,10 @@ public void clusterChanged(ClusterChangedEvent event) {
171201
private void assertClusterStateSaveOK(CountDownLatch savedClusterState, AtomicLong metadataVersion, String expectedBytesPerSec)
172202
throws Exception {
173203
assertTrue(savedClusterState.await(20, TimeUnit.SECONDS));
204+
assertExpectedRecoveryBytesSettingAndVersion(metadataVersion, expectedBytesPerSec);
205+
}
174206

207+
private static void assertExpectedRecoveryBytesSettingAndVersion(AtomicLong metadataVersion, String expectedBytesPerSec) {
175208
final ClusterStateResponse clusterStateResponse = clusterAdmin().state(
176209
new ClusterStateRequest(TEST_REQUEST_TIMEOUT).waitForMetadataVersion(metadataVersion.get())
177210
).actionGet();
@@ -337,6 +370,77 @@ public void testErrorSaved() throws Exception {
337370
assertClusterStateNotSaved(savedClusterState.v1(), savedClusterState.v2());
338371
}
339372

373+
public void testErrorCanRecoverOnRestart() throws Exception {
374+
internalCluster().setBootstrapMasterNodeIndex(0);
375+
logger.info("--> start data node / non master node");
376+
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
377+
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);
378+
379+
assertFalse(dataFileSettingsService.watching());
380+
381+
logger.info("--> start master node");
382+
final String masterNode = internalCluster().startMasterOnlyNode(
383+
Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build()
384+
);
385+
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
386+
var savedClusterState = setupClusterStateListenerForError(masterNode);
387+
388+
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
389+
390+
assertTrue(masterFileSettingsService.watching());
391+
assertFalse(dataFileSettingsService.watching());
392+
393+
writeJSONFile(masterNode, testErrorJSON, versionCounter, logger);
394+
AtomicLong metadataVersion = savedClusterState.v2();
395+
assertClusterStateNotSaved(savedClusterState.v1(), metadataVersion);
396+
assertHasErrors(metadataVersion, "not_cluster_settings");
397+
398+
// write valid json without version increment to simulate ES being able to process settings after a restart (usually, this would be
399+
// due to a code change)
400+
writeJSONFileWithoutVersionIncrement(masterNode, testJSON, versionCounter, logger);
401+
internalCluster().restartNode(masterNode);
402+
ensureGreen();
403+
404+
// we don't know the exact metadata version to wait for so rely on an assertBusy instead
405+
assertBusy(() -> assertExpectedRecoveryBytesSettingAndVersion(metadataVersion, "50mb"));
406+
assertBusy(() -> assertNoErrors(metadataVersion));
407+
}
408+
409+
public void testNewErrorOnRestartReprocessing() throws Exception {
410+
internalCluster().setBootstrapMasterNodeIndex(0);
411+
logger.info("--> start data node / non master node");
412+
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
413+
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);
414+
415+
assertFalse(dataFileSettingsService.watching());
416+
417+
logger.info("--> start master node");
418+
final String masterNode = internalCluster().startMasterOnlyNode(
419+
Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build()
420+
);
421+
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
422+
var savedClusterState = setupClusterStateListenerForError(masterNode);
423+
424+
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
425+
426+
assertTrue(masterFileSettingsService.watching());
427+
assertFalse(dataFileSettingsService.watching());
428+
429+
writeJSONFile(masterNode, testErrorJSON, versionCounter, logger);
430+
AtomicLong metadataVersion = savedClusterState.v2();
431+
assertClusterStateNotSaved(savedClusterState.v1(), metadataVersion);
432+
assertHasErrors(metadataVersion, "not_cluster_settings");
433+
434+
// write json with new error without version increment to simulate ES failing to process settings after a restart for a new reason
435+
// (usually, this would be due to a code change)
436+
writeJSONFileWithoutVersionIncrement(masterNode, testOtherErrorJSON, versionCounter, logger);
437+
assertHasErrors(metadataVersion, "not_cluster_settings");
438+
internalCluster().restartNode(masterNode);
439+
ensureGreen();
440+
441+
assertBusy(() -> assertHasErrors(metadataVersion, "bad_cluster_settings"));
442+
}
443+
340444
public void testSettingsAppliedOnMasterReElection() throws Exception {
341445
internalCluster().setBootstrapMasterNodeIndex(0);
342446
logger.info("--> start master node");
@@ -383,4 +487,21 @@ public void testSettingsAppliedOnMasterReElection() throws Exception {
383487
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
384488
}
385489

490+
private void assertHasErrors(AtomicLong waitForMetadataVersion, String expectedError) {
491+
var errorMetadata = getErrorMetadata(waitForMetadataVersion);
492+
assertThat(errorMetadata, is(notNullValue()));
493+
assertThat(errorMetadata.errors(), containsInAnyOrder(containsString(expectedError)));
494+
}
495+
496+
private void assertNoErrors(AtomicLong waitForMetadataVersion) {
497+
var errorMetadata = getErrorMetadata(waitForMetadataVersion);
498+
assertThat(errorMetadata, is(nullValue()));
499+
}
500+
501+
private ReservedStateErrorMetadata getErrorMetadata(AtomicLong waitForMetadataVersion) {
502+
final ClusterStateResponse clusterStateResponse = clusterAdmin().state(
503+
new ClusterStateRequest(TEST_REQUEST_TIMEOUT).waitForMetadataVersion(waitForMetadataVersion.get())
504+
).actionGet();
505+
return clusterStateResponse.getState().getMetadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE).errorMetadata();
506+
}
386507
}

server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,15 @@ public AbstractFileWatchingService(Path watchedFile) {
7777

7878
protected abstract void processInitialFileMissing() throws InterruptedException, ExecutionException, IOException;
7979

80+
/**
81+
* Defaults to generic {@link #processFileChanges()} behavior.
82+
* An implementation can override this to define different file handling when the file is processed during
83+
* initial service start.
84+
*/
85+
protected void processFileOnServiceStart() throws IOException, ExecutionException, InterruptedException {
86+
processFileChanges();
87+
}
88+
8089
public final void addFileChangedListener(FileChangedListener listener) {
8190
eventListeners.add(listener);
8291
}
@@ -174,7 +183,7 @@ protected final void watcherThread() {
174183

175184
if (Files.exists(path)) {
176185
logger.debug("found initial operator settings file [{}], applying...", path);
177-
processSettingsAndNotifyListeners();
186+
processSettingsOnServiceStartAndNotifyListeners();
178187
} else {
179188
processInitialFileMissing();
180189
// Notify everyone we don't have any initial file settings
@@ -290,6 +299,17 @@ final WatchKey enableDirectoryWatcher(WatchKey previousKey, Path settingsDir) th
290299
} while (true);
291300
}
292301

302+
void processSettingsOnServiceStartAndNotifyListeners() throws InterruptedException {
303+
try {
304+
processFileOnServiceStart();
305+
for (var listener : eventListeners) {
306+
listener.watchedFileChanged();
307+
}
308+
} catch (IOException | ExecutionException e) {
309+
logger.error(() -> "Error processing watched file: " + watchedFile(), e);
310+
}
311+
}
312+
293313
void processSettingsAndNotifyListeners() throws InterruptedException {
294314
try {
295315
processFileChanges();

server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,22 @@
1515

1616
import static org.elasticsearch.ExceptionsHelper.stackTrace;
1717

18-
record ErrorState(String namespace, Long version, List<String> errors, ReservedStateErrorMetadata.ErrorKind errorKind) {
19-
ErrorState(String namespace, Long version, Exception e, ReservedStateErrorMetadata.ErrorKind errorKind) {
20-
this(namespace, version, List.of(stackTrace(e)), errorKind);
18+
record ErrorState(
19+
String namespace,
20+
Long version,
21+
ReservedStateVersionCheck versionCheck,
22+
List<String> errors,
23+
ReservedStateErrorMetadata.ErrorKind errorKind
24+
) {
25+
26+
ErrorState(
27+
String namespace,
28+
Long version,
29+
ReservedStateVersionCheck versionCheck,
30+
Exception e,
31+
ReservedStateErrorMetadata.ErrorKind errorKind
32+
) {
33+
this(namespace, version, versionCheck, List.of(stackTrace(e)), errorKind);
2134
}
2235

2336
public String toString() {

0 commit comments

Comments
 (0)