diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java index 1940b54b7ca03..fcd8161f4a54a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.settings.AbstractContextlessScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -112,7 +113,7 @@ public void validate(final TimeValue settingValue, final Map, Object> @Nullable private volatile TimeValue failuresDefaultRetention; /** We cache the global retention objects, volatile is sufficient we only "write" this values in the settings appliers which - * are executed by {@link org.elasticsearch.common.settings.AbstractScopedSettings#applySettings(Settings)} which is synchronised. + * are executed by {@link AbstractContextlessScopedSettings#applySettings(Settings)} which is synchronised. */ @Nullable private volatile DataStreamGlobalRetention dataGlobalRetention; diff --git a/server/src/main/java/org/elasticsearch/cluster/project/ProjectStateRegistry.java b/server/src/main/java/org/elasticsearch/cluster/project/ProjectStateRegistry.java index 014ee37724cbc..7b65a703400d5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/project/ProjectStateRegistry.java +++ b/server/src/main/java/org/elasticsearch/cluster/project/ProjectStateRegistry.java @@ -175,8 +175,7 @@ public long getProjectsMarkedForDeletionGeneration() { return projectsMarkedForDeletionGeneration; } - // visible for testing - Set knownProjects() { + public Set knownProjects() { return projectsEntries.keySet(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index b4a626faf79b3..80889e51c4429 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -21,11 +21,14 @@ import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.project.ProjectStateRegistry; import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.ProjectScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -78,6 +81,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask"; private final ClusterSettings clusterSettings; + private final ProjectScopedSettings projectScopedSettings; private final ThreadPool threadPool; private volatile TimeValue slowTaskLoggingThreshold; @@ -103,8 +107,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private NodeConnectionsService nodeConnectionsService; - public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public ClusterApplierService(String nodeName, ClusterSettings clusterSettings, ThreadPool threadPool) { + this(nodeName, clusterSettings, new ProjectScopedSettings(), threadPool); + } + + public ClusterApplierService( + String nodeName, + ClusterSettings clusterSettings, + ProjectScopedSettings projectScopedSettings, + ThreadPool threadPool + ) { this.clusterSettings = clusterSettings; + this.projectScopedSettings = projectScopedSettings; this.threadPool = threadPool; this.state = new AtomicReference<>(); this.nodeName = nodeName; @@ -518,14 +532,7 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu connectToNodesAndWait(newClusterState); } - // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency - if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metadataChanged()) { - logger.debug("applying settings from cluster state with version {}", newClusterState.version()); - final Settings incomingSettings = clusterChangedEvent.state().metadata().settings(); - try (Releasable ignored = stopWatch.record("applying settings")) { - clusterSettings.applySettings(incomingSettings); - } - } + applySettings(clusterChangedEvent, stopWatch); logger.debug("apply cluster state with version {}", newClusterState.version()); callClusterStateAppliers(clusterChangedEvent, stopWatch); @@ -538,6 +545,35 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu callClusterStateListeners(clusterChangedEvent, stopWatch); } + private void applySettings(ClusterChangedEvent clusterChangedEvent, Recorder stopWatch) { + // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency + if (clusterChangedEvent.state().blocks().disableStatePersistence() == false) { + if (clusterChangedEvent.metadataChanged()) { + logger.debug("applying settings from cluster state with version {}", clusterChangedEvent.state().version()); + final Settings incomingSettings = clusterChangedEvent.state().metadata().settings(); + try (Releasable ignored = stopWatch.record("applying settings")) { + clusterSettings.applySettings(incomingSettings); + } + } + + ProjectStateRegistry oldProjectStateRegistry = clusterChangedEvent.previousState() + .custom(ProjectStateRegistry.TYPE, ProjectStateRegistry.EMPTY); + ProjectStateRegistry newProjectStateRegistry = clusterChangedEvent.state() + .custom(ProjectStateRegistry.TYPE, ProjectStateRegistry.EMPTY); + if (oldProjectStateRegistry != newProjectStateRegistry) { + for (ProjectId projectId : newProjectStateRegistry.knownProjects()) { + Settings oldProjectSettings = oldProjectStateRegistry.getProjectSettings(projectId); + Settings newProjectSettings = newProjectStateRegistry.getProjectSettings(projectId); + if (newProjectSettings.equals(oldProjectSettings) == false) { + try (Releasable ignored = stopWatch.record("applying project settings")) { + projectScopedSettings.applySettings(projectId, newProjectSettings); + } + } + } + } + } + } + protected void connectToNodesAndWait(ClusterState newClusterState) { // can't wait for an ActionFuture on the cluster applier thread, but we do want to block the thread here, so use a CountDownLatch. final CountDownLatch countDownLatch = new CountDownLatch(1); diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index 5caf340036156..220ae001dfe33 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -63,9 +63,14 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread this( settings, clusterSettings, - new ProjectScopedSettings(settings, Collections.emptySet()), + new ProjectScopedSettings(Collections.emptySet()), new MasterService(settings, clusterSettings, threadPool, taskManager), - new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool) + new ClusterApplierService( + Node.NODE_NAME_SETTING.get(settings), + clusterSettings, + new ProjectScopedSettings(Collections.emptySet()), + threadPool + ) ); } @@ -81,7 +86,7 @@ public ClusterService( clusterSettings, projectScopedSettings, new MasterService(settings, clusterSettings, threadPool, taskManager), - new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool) + new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), clusterSettings, projectScopedSettings, threadPool) ); } @@ -91,7 +96,7 @@ public ClusterService( MasterService masterService, ClusterApplierService clusterApplierService ) { - this(settings, clusterSettings, new ProjectScopedSettings(settings, Collections.emptySet()), masterService, clusterApplierService); + this(settings, clusterSettings, new ProjectScopedSettings(Collections.emptySet()), masterService, clusterApplierService); } public ClusterService( diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractContextlessScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractContextlessScopedSettings.java new file mode 100644 index 0000000000000..be55f3916db2b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractContextlessScopedSettings.java @@ -0,0 +1,210 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.settings; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.TriConsumer; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public abstract class AbstractContextlessScopedSettings extends AbstractScopedSettings { + + protected final Settings settings; + private Settings lastSettingsApplied; + + public AbstractContextlessScopedSettings( + Settings nodeSettings, + Settings scopeSettings, + AbstractContextlessScopedSettings other, + Logger logger + ) { + super(other, logger); + + this.settings = nodeSettings; + this.lastSettingsApplied = scopeSettings; + } + + public AbstractContextlessScopedSettings(Settings settings, Set> settingsSet, Setting.Property scope) { + super(settingsSet, scope); + + this.settings = settings; + this.lastSettingsApplied = Settings.EMPTY; + } + + /** + * Validates the given settings by running it through all update listeners without applying it. This + * method will not change any settings but will fail if any of the settings can't be applied. + */ + public synchronized Settings validateUpdate(Settings settings) { + final Settings current = Settings.builder().put(this.settings).put(settings).build(); + final Settings previous = Settings.builder().put(this.settings).put(this.lastSettingsApplied).build(); + validateUpdate(current, previous); + + return current; + } + + /** + * Applies the given settings to all the settings consumers or to none of them. The settings + * will be merged with the node settings before they are applied while given settings override existing node + * settings. + * @param newSettings the settings to apply + * @return the unmerged applied settings + */ + public synchronized Settings applySettings(Settings newSettings) { + if (lastSettingsApplied != null && newSettings.equals(lastSettingsApplied)) { + // nothing changed in the settings, ignore + return newSettings; + } + final Settings current = Settings.builder().put(this.settings).put(newSettings).build(); + final Settings previous = Settings.builder().put(this.settings).put(this.lastSettingsApplied).build(); + executeSettingsUpdaters(null, current, previous); + + return lastSettingsApplied = newSettings; + } + + /** + * Returns the value for the given setting. + */ + public T get(Setting setting) { + if (setting.getProperties().contains(scope) == false) { + throw new IllegalArgumentException( + "settings scope doesn't match the setting scope [" + this.scope + "] not in [" + setting.getProperties() + "]" + ); + } + if (get(setting.getKey()) == null) { + throw new IllegalArgumentException("setting " + setting.getKey() + " has not been registered"); + } + return setting.get(this.lastSettingsApplied, settings); + } + + private static TriConsumer wrapIgnoringContext(BiConsumer consumer) { + return (ctx, t, v) -> consumer.accept(t, v); + } + + private static BiConsumer wrapIgnoringContext(Consumer consumer) { + return (ctx, v) -> consumer.accept(v); + } + + /** + * Adds a settings consumer with a predicate that is only evaluated at update time. + *

+ * Note: Only settings registered in {@link SettingsModule} can be changed dynamically. + *

+ * @param The type of the setting's value. + * @param setting The setting for which the updates are to be handled. + * @param consumer A {@link BiConsumer} that will be executed with the updated setting value. + * @param validator an additional validator that is only applied to updates of this setting. + * This is useful to add additional validation to settings at runtime compared to at startup time. + */ + public synchronized void addSettingsUpdateConsumer(Setting setting, Consumer consumer, Consumer validator) { + super.addSettingsUpdateConsumer(setting, wrapIgnoringContext(consumer), validator); + } + + /** + * Adds a settings consumer. + *

+ * Note: Only settings registered in {@link org.elasticsearch.cluster.ClusterModule} can be changed dynamically. + *

+ */ + public synchronized void addSettingsUpdateConsumer(Setting setting, Consumer consumer) { + super.addSettingsUpdateConsumer(setting, wrapIgnoringContext(consumer)); + } + + /** + * Adds a settings consumer that is only executed if any setting in the supplied list of settings is changed. In that case all the + * settings are specified in the argument are returned. + * + * Also automatically adds empty consumers for all settings in order to activate logging + */ + public synchronized void addSettingsUpdateConsumer(Consumer consumer, List> settings) { + super.addSettingsUpdateConsumer(wrapIgnoringContext(consumer), settings); + } + + /** + * Adds a settings consumer that is only executed if any setting in the supplied list of settings is changed. In that case all the + * settings are specified in the argument are returned. The validator is run across all specified settings before the settings are + * applied. + * + * Also automatically adds empty consumers for all settings in order to activate logging + */ + public synchronized void addSettingsUpdateConsumer( + Consumer consumer, + List> settings, + Consumer validator + ) { + super.addSettingsUpdateConsumer(wrapIgnoringContext(consumer), settings, validator); + } + + /** + * Adds a settings consumer for affix settings. Affix settings have a namespace associated to it that needs to be available to the + * consumer in order to be processed correctly. + */ + public synchronized void addAffixUpdateConsumer( + Setting.AffixSetting setting, + BiConsumer consumer, + BiConsumer validator + ) { + super.addAffixUpdateConsumer(setting, wrapIgnoringContext(consumer), validator); + } + + /** + * Adds a affix settings consumer that accepts the settings for a group of settings. The consumer is only + * notified if at least one of the settings change. + *

+ * Note: Only settings registered in {@link SettingsModule} can be changed dynamically. + *

+ */ + public synchronized void addAffixGroupUpdateConsumer(List> settings, BiConsumer consumer) { + super.addAffixGroupUpdateConsumer(settings, wrapIgnoringContext(consumer)); + } + + /** + * Adds a settings consumer for affix settings. Affix settings have a namespace associated to it that needs to be available to the + * consumer in order to be processed correctly. This consumer will get a namespace to value map instead of each individual namespace + * and value as in {@link #addAffixUpdateConsumer(Setting.AffixSetting, BiConsumer, BiConsumer)} + */ + public synchronized void addAffixMapUpdateConsumer( + Setting.AffixSetting setting, + Consumer> consumer, + BiConsumer validator + ) { + super.addAffixMapUpdateConsumer(setting, wrapIgnoringContext(consumer), validator); + } + + /** + * Adds a settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change + * and if the provided validator succeeded. + *

+ * Note: Only settings registered in {@link SettingsModule} can be changed dynamically. + *

+ * This method registers a compound updater that is useful if two settings are depending on each other. + * The consumer is always provided with both values even if only one of the two changes. + */ + public synchronized void addSettingsUpdateConsumer( + Setting a, + Setting b, + BiConsumer consumer, + BiConsumer validator + ) { + super.addSettingsUpdateConsumer(a, b, wrapIgnoringContext(consumer), validator); + } + + /** + * Adds a settings consumer that accepts the values for two settings. + * See {@link #addSettingsUpdateConsumer(Setting, Setting, BiConsumer, BiConsumer)} for details. + */ + public synchronized void addSettingsUpdateConsumer(Setting a, Setting b, BiConsumer consumer) { + super.addSettingsUpdateConsumer(a, b, wrapIgnoringContext(consumer)); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index c65f75df663d2..e427c6ed0baa3 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -14,6 +14,7 @@ import org.apache.lucene.search.spell.LevenshteinDistance; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.set.Sets; @@ -36,28 +37,26 @@ /** * A basic setting service that can be used for per-index and per-cluster settings. * This service offers transactional application of updates settings. + * + * @param type of context for which settings are updated */ -public abstract class AbstractScopedSettings { +public abstract class AbstractScopedSettings { public static final String ARCHIVED_SETTINGS_PREFIX = "archived."; private static final Pattern KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])*[-\\w]+$"); private static final Pattern GROUP_KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])+$"); private static final Pattern AFFIX_KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])+[*](?:[.][-\\w]+)+$"); - private final Logger logger; + protected final Logger logger; - private final Settings settings; - private final List> settingUpdaters = new CopyOnWriteArrayList<>(); + private final List> settingUpdaters = new CopyOnWriteArrayList<>(); private final Map> complexMatchers; private final Map> keySettings; - private final Setting.Property scope; - private Settings lastSettingsApplied; + protected final Setting.Property scope; @SuppressWarnings("this-escape") - protected AbstractScopedSettings(final Settings settings, final Set> settingsSet, final Setting.Property scope) { + protected AbstractScopedSettings(final Set> settingsSet, final Setting.Property scope) { this.logger = LogManager.getLogger(this.getClass()); - this.settings = settings; - this.lastSettingsApplied = Settings.EMPTY; this.scope = scope; Map> complexMatchers = new HashMap<>(); @@ -98,10 +97,8 @@ protected void validateSettingKey(Setting setting) { } } - protected AbstractScopedSettings(Settings nodeSettings, Settings scopeSettings, AbstractScopedSettings other, Logger logger) { + protected AbstractScopedSettings(AbstractScopedSettings other, Logger logger) { this.logger = logger; - this.settings = nodeSettings; - this.lastSettingsApplied = scopeSettings; this.scope = other.scope; complexMatchers = other.complexMatchers; keySettings = other.keySettings; @@ -128,11 +125,9 @@ static boolean isValidAffixKey(String key) { * Validates the given settings by running it through all update listeners without applying it. This * method will not change any settings but will fail if any of the settings can't be applied. */ - public synchronized Settings validateUpdate(Settings settings) { - final Settings current = Settings.builder().put(this.settings).put(settings).build(); - final Settings previous = Settings.builder().put(this.settings).put(this.lastSettingsApplied).build(); + public void validateUpdate(Settings current, Settings previous) { List exceptions = new ArrayList<>(); - for (SettingUpdater settingUpdater : settingUpdaters) { + for (SettingUpdater settingUpdater : settingUpdaters) { try { // ensure running this through the updater / dynamic validator // don't check if the value has changed we wanna test this anyways @@ -144,28 +139,14 @@ public synchronized Settings validateUpdate(Settings settings) { } // here we are exhaustive and record all settings that failed. ExceptionsHelper.rethrowAndSuppress(exceptions); - return current; } - /** - * Applies the given settings to all the settings consumers or to none of them. The settings - * will be merged with the node settings before they are applied while given settings override existing node - * settings. - * @param newSettings the settings to apply - * @return the unmerged applied settings - */ - public synchronized Settings applySettings(Settings newSettings) { - if (lastSettingsApplied != null && newSettings.equals(lastSettingsApplied)) { - // nothing changed in the settings, ignore - return newSettings; - } - final Settings current = Settings.builder().put(this.settings).put(newSettings).build(); - final Settings previous = Settings.builder().put(this.settings).put(this.lastSettingsApplied).build(); + protected final void executeSettingsUpdaters(C context, Settings current, Settings previous) { try { List applyRunnables = new ArrayList<>(); - for (SettingUpdater settingUpdater : settingUpdaters) { + for (SettingUpdater settingUpdater : settingUpdaters) { try { - applyRunnables.add(settingUpdater.updater(current, previous)); + applyRunnables.add(settingUpdater.updater(context, current, previous)); } catch (Exception ex) { logger.warn(() -> "failed to prepareCommit settings for [" + settingUpdater + "]", ex); throw ex; @@ -178,7 +159,6 @@ public synchronized Settings applySettings(Settings newSettings) { logger.warn("failed to apply settings", ex); throw ex; } - return lastSettingsApplied = newSettings; } /** @@ -186,10 +166,13 @@ public synchronized Settings applySettings(Settings newSettings) { *

* Note: Only settings registered in {@link SettingsModule} can be changed dynamically. *

+ * @param The type of the setting's value. + * @param setting The setting for which the updates are to be handled. + * @param consumer A {@link BiConsumer} that will be executed with the updated setting value and context for which update happened. * @param validator an additional validator that is only applied to updates of this setting. * This is useful to add additional validation to settings at runtime compared to at startup time. */ - public synchronized void addSettingsUpdateConsumer(Setting setting, Consumer consumer, Consumer validator) { + public synchronized void addSettingsUpdateConsumer(Setting setting, BiConsumer consumer, Consumer validator) { if (setting != get(setting.getKey())) { throw new IllegalArgumentException("Setting is not registered for key [" + setting.getKey() + "]"); } @@ -202,7 +185,7 @@ public synchronized void addSettingsUpdateConsumer(Setting setting, Consu * * Also automatically adds empty consumers for all settings in order to activate logging */ - public synchronized void addSettingsUpdateConsumer(Consumer consumer, List> settings) { + public synchronized void addSettingsUpdateConsumer(BiConsumer consumer, List> settings) { addSettingsUpdater(Setting.groupedSettingsUpdater(consumer, settings)); } @@ -214,7 +197,7 @@ public synchronized void addSettingsUpdateConsumer(Consumer consumer, * Also automatically adds empty consumers for all settings in order to activate logging */ public synchronized void addSettingsUpdateConsumer( - Consumer consumer, + BiConsumer consumer, List> settings, Consumer validator ) { @@ -227,83 +210,13 @@ public synchronized void addSettingsUpdateConsumer( */ public synchronized void addAffixUpdateConsumer( Setting.AffixSetting setting, - BiConsumer consumer, + TriConsumer consumer, BiConsumer validator ) { ensureSettingIsRegistered(setting); addSettingsUpdater(setting.newAffixUpdater(consumer, logger, validator)); } - /** - * Adds a affix settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change - * and if the provided validator succeeded. - *

- * Note: Only settings registered in {@link SettingsModule} can be changed dynamically. - *

- * This method registers a compound updater that is useful if two settings are depending on each other. - * The consumer is always provided with both values even if only one of the two changes. - */ - public synchronized void addAffixUpdateConsumer( - Setting.AffixSetting
settingA, - Setting.AffixSetting settingB, - BiConsumer> consumer, - BiConsumer> validator - ) { - // it would be awesome to have a generic way to do that ie. a set of settings that map to an object with a builder - // down the road this would be nice to have! - ensureSettingIsRegistered(settingA); - ensureSettingIsRegistered(settingB); - SettingUpdater, A>> affixUpdaterA = settingA.newAffixUpdater((a, b) -> {}, logger, (a, b) -> {}); - SettingUpdater, B>> affixUpdaterB = settingB.newAffixUpdater((a, b) -> {}, logger, (a, b) -> {}); - - addSettingsUpdater(new SettingUpdater>>() { - - @Override - public boolean hasChanged(Settings current, Settings previous) { - return affixUpdaterA.hasChanged(current, previous) || affixUpdaterB.hasChanged(current, previous); - } - - @Override - public Map> getValue(Settings current, Settings previous) { - Map> map = new HashMap<>(); - BiConsumer aConsumer = (key, value) -> { - assert map.containsKey(key) == false : "duplicate key: " + key; - map.put(key, new Tuple<>(value, settingB.getConcreteSettingForNamespace(key).get(current))); - }; - BiConsumer bConsumer = (key, value) -> { - Tuple abTuple = map.get(key); - if (abTuple != null) { - map.put(key, new Tuple<>(abTuple.v1(), value)); - } else { - assert settingA.getConcreteSettingForNamespace(key) - .get(current) - .equals(settingA.getConcreteSettingForNamespace(key).get(previous)) - : "expected: " - + settingA.getConcreteSettingForNamespace(key).get(current) - + " but was " - + settingA.getConcreteSettingForNamespace(key).get(previous); - map.put(key, new Tuple<>(settingA.getConcreteSettingForNamespace(key).get(current), value)); - } - }; - SettingUpdater, A>> affixUpdaterA = settingA.newAffixUpdater(aConsumer, logger, (a, b) -> {}); - SettingUpdater, B>> affixUpdaterB = settingB.newAffixUpdater(bConsumer, logger, (a, b) -> {}); - affixUpdaterA.apply(current, previous); - affixUpdaterB.apply(current, previous); - for (Map.Entry> entry : map.entrySet()) { - validator.accept(entry.getKey(), entry.getValue()); - } - return Collections.unmodifiableMap(map); - } - - @Override - public void apply(Map> values, Settings current, Settings previous) { - for (Map.Entry> entry : values.entrySet()) { - consumer.accept(entry.getKey(), entry.getValue()); - } - } - }); - } - /** * Adds a affix settings consumer that accepts the settings for a group of settings. The consumer is only * notified if at least one of the settings change. @@ -312,14 +225,17 @@ public void apply(Map> values, Settings current, Settings pr *

*/ @SuppressWarnings("rawtypes") - public synchronized void addAffixGroupUpdateConsumer(List> settings, BiConsumer consumer) { + public synchronized void addAffixGroupUpdateConsumer( + List> settings, + TriConsumer consumer + ) { List affixUpdaters = new ArrayList<>(settings.size()); for (Setting.AffixSetting setting : settings) { ensureSettingIsRegistered(setting); - affixUpdaters.add(setting.newAffixUpdater((a, b) -> {}, logger, (a, b) -> {})); + affixUpdaters.add(setting.newAffixUpdater((ctx, a, b) -> {}, logger, (a, b) -> {})); } - addSettingsUpdater(new SettingUpdater>() { + addSettingsUpdater(new SettingUpdater>() { @Override public boolean hasChanged(Settings current, Settings previous) { @@ -330,8 +246,8 @@ public boolean hasChanged(Settings current, Settings previous) { public Map getValue(Settings current, Settings previous) { Set namespaces = new HashSet<>(); for (Setting.AffixSetting setting : settings) { - SettingUpdater affixUpdaterA = setting.newAffixUpdater((k, v) -> namespaces.add(k), logger, (a, b) -> {}); - affixUpdaterA.apply(current, previous); + SettingUpdater affixUpdaterA = setting.newAffixUpdater((ctx, k, v) -> namespaces.add(k), logger, (a, b) -> {}); + affixUpdaterA.apply(null, current, previous); } Map namespaceToSettings = Maps.newMapWithExpectedSize(namespaces.size()); for (String namespace : namespaces) { @@ -345,15 +261,15 @@ public Map getValue(Settings current, Settings previous) { } @Override - public void apply(Map values, Settings current, Settings previous) { + public void apply(C context, Map values, Settings current, Settings previous) { for (Map.Entry entry : values.entrySet()) { - consumer.accept(entry.getKey(), entry.getValue()); + consumer.apply(context, entry.getKey(), entry.getValue()); } } }); } - private void ensureSettingIsRegistered(Setting.AffixSetting setting) { + protected final void ensureSettingIsRegistered(Setting.AffixSetting setting) { final Setting registeredSetting = this.complexMatchers.get(setting.getKey()); if (setting != registeredSetting) { throw new IllegalArgumentException("Setting is not registered for key [" + setting.getKey() + "]"); @@ -363,11 +279,11 @@ private void ensureSettingIsRegistered(Setting.AffixSetting setting) { /** * Adds a settings consumer for affix settings. Affix settings have a namespace associated to it that needs to be available to the * consumer in order to be processed correctly. This consumer will get a namespace to value map instead of each individual namespace - * and value as in {@link #addAffixUpdateConsumer(Setting.AffixSetting, BiConsumer, BiConsumer)} + * and value as in {@link #addAffixUpdateConsumer(Setting.AffixSetting, TriConsumer, BiConsumer)} */ public synchronized void addAffixMapUpdateConsumer( Setting.AffixSetting setting, - Consumer> consumer, + BiConsumer> consumer, BiConsumer validator ) { final Setting registeredSetting = this.complexMatchers.get(setting.getKey()); @@ -377,15 +293,15 @@ public synchronized void addAffixMapUpdateConsumer( addSettingsUpdater(setting.newAffixMapUpdater(consumer, logger, validator)); } - synchronized void addSettingsUpdater(SettingUpdater updater) { + synchronized void addSettingsUpdater(SettingUpdater updater) { this.settingUpdaters.add(updater); } /** * Adds a settings consumer that accepts the values for two settings. - * See {@link #addSettingsUpdateConsumer(Setting, Setting, BiConsumer, BiConsumer)} for details. + * See {@link #addSettingsUpdateConsumer(Setting, Setting, TriConsumer, BiConsumer)} for details. */ - public synchronized void addSettingsUpdateConsumer(Setting
a, Setting b, BiConsumer consumer) { + public synchronized void addSettingsUpdateConsumer(Setting a, Setting b, TriConsumer consumer) { addSettingsUpdateConsumer(a, b, consumer, (i, j) -> {}); } @@ -401,7 +317,7 @@ public synchronized void addSettingsUpdateConsumer(Setting a, Setting< public synchronized void addSettingsUpdateConsumer( Setting a, Setting b, - BiConsumer consumer, + TriConsumer consumer, BiConsumer validator ) { if (a != get(a.getKey())) { @@ -419,24 +335,10 @@ public synchronized void addSettingsUpdateConsumer( * Note: Only settings registered in {@link org.elasticsearch.cluster.ClusterModule} can be changed dynamically. *

*/ - public synchronized void addSettingsUpdateConsumer(Setting setting, Consumer consumer) { + public synchronized void addSettingsUpdateConsumer(Setting setting, BiConsumer consumer) { addSettingsUpdateConsumer(setting, consumer, (s) -> {}); } - /** - * This methods passes the setting value to a consumer during the initialization and on every setting change - *

- * Note: Only settings registered in {@link org.elasticsearch.cluster.ClusterModule} can be changed dynamically. - *

- */ - public synchronized void initializeAndWatch(Setting setting, Consumer consumer) { - assert setting.getProperties().contains(Setting.Property.Dynamic) - || setting.getProperties().contains(Setting.Property.OperatorDynamic) : "Can only watch dynamic settings"; - assert setting.getProperties().contains(Setting.Property.NodeScope) : "Can only watch node settings"; - consumer.accept(setting.get(settings)); - addSettingsUpdateConsumer(setting, consumer); - } - protected void validateDeprecatedAndRemovedSettingV7(Settings settings, Setting setting) {} /** @@ -617,9 +519,10 @@ void validate(final String key, final Settings settings, final boolean validateV /** * Transactional interface to update settings. * @see Setting + * @param the type of context for which settings are updated * @param the type of the value of the setting */ - public interface SettingUpdater { + public interface SettingUpdater { /** * Returns true if this updaters setting has changed with the current update @@ -638,30 +541,30 @@ public interface SettingUpdater { /** * Applies the given value to the updater. This methods will actually run the update. */ - void apply(T value, Settings current, Settings previous); + void apply(C context, T value, Settings current, Settings previous); /** * Updates this updaters value if it has changed. * @return true iff the value has been updated. */ - default boolean apply(Settings current, Settings previous) { + default boolean apply(C context, Settings current, Settings previous) { if (hasChanged(current, previous)) { T value = getValue(current, previous); - apply(value, current, previous); + apply(context, value, current, previous); return true; } return false; } /** - * Returns a callable runnable that calls {@link #apply(Object, Settings, Settings)} if the settings + * Returns a callable runnable that calls {@link #apply(C, Settings, Settings)} if the settings * actually changed. This allows to defer the update to a later point in time while keeping type safety. * If the value didn't change the returned runnable is a noop. */ - default Runnable updater(Settings current, Settings previous) { + default Runnable updater(C context, Settings current, Settings previous) { if (hasChanged(current, previous)) { T value = getValue(current, previous); - return () -> { apply(value, current, previous); }; + return () -> { apply(context, value, current, previous); }; } return () -> {}; } @@ -741,21 +644,6 @@ public Settings diff(Settings source, Settings defaultSettings) { return builder.build(); } - /** - * Returns the value for the given setting. - */ - public T get(Setting setting) { - if (setting.getProperties().contains(scope) == false) { - throw new IllegalArgumentException( - "settings scope doesn't match the setting scope [" + this.scope + "] not in [" + setting.getProperties() + "]" - ); - } - if (get(setting.getKey()) == null) { - throw new IllegalArgumentException("setting " + setting.getKey() + " has not been registered"); - } - return setting.get(this.lastSettingsApplied, settings); - } - /** * Updates a target settings builder with new, updated or deleted settings from a given settings builder. *

diff --git a/server/src/main/java/org/elasticsearch/common/settings/BaseSettingsUpdater.java b/server/src/main/java/org/elasticsearch/common/settings/BaseSettingsUpdater.java index 22ae72019182b..9f587f6adfe2c 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/BaseSettingsUpdater.java +++ b/server/src/main/java/org/elasticsearch/common/settings/BaseSettingsUpdater.java @@ -17,10 +17,10 @@ import static org.elasticsearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX; -public abstract class BaseSettingsUpdater { - protected final AbstractScopedSettings scopedSettings; +public abstract class BaseSettingsUpdater> { + protected final T scopedSettings; - public BaseSettingsUpdater(AbstractScopedSettings scopedSettings) { + public BaseSettingsUpdater(T scopedSettings) { this.scopedSettings = scopedSettings; } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 1fbc8993cc5aa..97d3976d2f807 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -63,6 +63,7 @@ import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkService; @@ -71,6 +72,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Tuple; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.HandshakingTransportAddressConnector; import org.elasticsearch.discovery.PeerFinder; @@ -141,13 +143,18 @@ import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.watcher.ResourceWatcherService; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Predicate; /** * Encapsulates all valid cluster level settings. */ -public final class ClusterSettings extends AbstractScopedSettings { +public final class ClusterSettings extends AbstractContextlessScopedSettings { public static ClusterSettings createBuiltInClusterSettings() { return createBuiltInClusterSettings(Settings.EMPTY); @@ -162,7 +169,7 @@ public ClusterSettings(final Settings nodeSettings, final Set> settin addSettingsUpdater(new LoggingSettingUpdater(nodeSettings)); } - private static final class LoggingSettingUpdater implements SettingUpdater { + private static final class LoggingSettingUpdater implements SettingUpdater { final Predicate loggerPredicate = Loggers.LOG_LEVEL_SETTING::match; private final Settings settings; @@ -192,7 +199,7 @@ public Settings getValue(Settings current, Settings previous) { } @Override - public void apply(Settings value, Settings current, Settings previous) { + public void apply(Void context, Settings value, Settings current, Settings previous) { for (String key : value.keySet()) { assert loggerPredicate.test(key); String component = key.substring("logger.".length()); @@ -652,4 +659,104 @@ public void apply(Settings value, Settings current, Settings previous) { WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_SHARD_WRITE_LOAD_POLLING_INTERVAL_SETTING, WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING ); + + /** + * This methods passes the setting value to a consumer during the initialization and on every setting change + *

+ * Note: Only settings registered in {@link org.elasticsearch.cluster.ClusterModule} can be changed dynamically. + *

+ */ + public synchronized void initializeAndWatch(Setting setting, Consumer consumer) { + assert setting.getProperties().contains(Setting.Property.Dynamic) + || setting.getProperties().contains(Setting.Property.OperatorDynamic) : "Can only watch dynamic settings"; + assert setting.getProperties().contains(Setting.Property.NodeScope) : "Can only watch node settings"; + consumer.accept(setting.get(settings)); + addSettingsUpdateConsumer(setting, consumer); + } + + /** + * Adds a affix settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change + * and if the provided validator succeeded. + *

+ * Note: Only settings registered in {@link SettingsModule} can be changed dynamically. + *

+ * This method registers a compound updater that is useful if two settings are depending on each other. + * The consumer is always provided with both values even if only one of the two changes. + */ + public synchronized void addAffixUpdateConsumer( + Setting.AffixSetting
settingA, + Setting.AffixSetting settingB, + BiConsumer> consumer, + BiConsumer> validator + ) { + // it would be awesome to have a generic way to do that ie. a set of settings that map to an object with a builder + // down the road this would be nice to have! + ensureSettingIsRegistered(settingA); + ensureSettingIsRegistered(settingB); + SettingUpdater, A>> affixUpdaterA = settingA.newAffixUpdater( + (ctx, a, b) -> {}, + logger, + (a, b) -> {} + ); + SettingUpdater, B>> affixUpdaterB = settingB.newAffixUpdater( + (ctx, a, b) -> {}, + logger, + (a, b) -> {} + ); + + addSettingsUpdater(new SettingUpdater>>() { + + @Override + public boolean hasChanged(Settings current, Settings previous) { + return affixUpdaterA.hasChanged(current, previous) || affixUpdaterB.hasChanged(current, previous); + } + + @Override + public Map> getValue(Settings current, Settings previous) { + Map> map = new HashMap<>(); + TriConsumer aConsumer = (ctx, key, value) -> { + assert map.containsKey(key) == false : "duplicate key: " + key; + map.put(key, new Tuple<>(value, settingB.getConcreteSettingForNamespace(key).get(current))); + }; + TriConsumer bConsumer = (ctx, key, value) -> { + Tuple abTuple = map.get(key); + if (abTuple != null) { + map.put(key, new Tuple<>(abTuple.v1(), value)); + } else { + assert settingA.getConcreteSettingForNamespace(key) + .get(current) + .equals(settingA.getConcreteSettingForNamespace(key).get(previous)) + : "expected: " + + settingA.getConcreteSettingForNamespace(key).get(current) + + " but was " + + settingA.getConcreteSettingForNamespace(key).get(previous); + map.put(key, new Tuple<>(settingA.getConcreteSettingForNamespace(key).get(current), value)); + } + }; + SettingUpdater, A>> affixUpdaterA = settingA.newAffixUpdater( + aConsumer, + logger, + (a, b) -> {} + ); + SettingUpdater, B>> affixUpdaterB = settingB.newAffixUpdater( + bConsumer, + logger, + (a, b) -> {} + ); + affixUpdaterA.apply(null, current, previous); + affixUpdaterB.apply(null, current, previous); + for (Map.Entry> entry : map.entrySet()) { + validator.accept(entry.getKey(), entry.getValue()); + } + return Collections.unmodifiableMap(map); + } + + @Override + public void apply(Void context, Map> values, Settings current, Settings previous) { + for (Map.Entry> entry : values.entrySet()) { + consumer.accept(entry.getKey(), entry.getValue()); + } + } + }); + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 9f4c5b80ccf23..78c5b8f0350c6 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -55,7 +55,7 @@ * Encapsulates all valid index level settings. * @see Property#IndexScope */ -public final class IndexScopedSettings extends AbstractScopedSettings { +public final class IndexScopedSettings extends AbstractContextlessScopedSettings { public static final Set> BUILT_IN_INDEX_SETTINGS; diff --git a/server/src/main/java/org/elasticsearch/common/settings/ProjectScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ProjectScopedSettings.java index fd74263c67a56..2e1bc24bb921f 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ProjectScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ProjectScopedSettings.java @@ -9,18 +9,61 @@ package org.elasticsearch.common.settings; -import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.cluster.metadata.ProjectId; +import java.util.Collections; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; -public class ProjectScopedSettings extends AbstractScopedSettings { - public ProjectScopedSettings(Settings settings, Set> settingsSet) { - super(settings, settingsSet, Setting.Property.ProjectScope); +/** + * Encapsulates project level settings and its update listeners. + */ +public class ProjectScopedSettings extends AbstractScopedSettings { + private final Map projectSettings = new ConcurrentHashMap<>(); + + public ProjectScopedSettings(Set> settingsSet) { + super(settingsSet, Setting.Property.ProjectScope); + } + + public ProjectScopedSettings() { + this(Collections.emptySet()); + } + + /** + * Retrieves a setting value for a specific project. + * + * @param projectId id of the project for which the setting value is to be retrieved + * @param setting the setting whose value needs to be fetched + * @return the value of the specified setting for the given project + */ + public T get(ProjectId projectId, Setting setting) { + return setting.get(projectSettings.getOrDefault(projectId, Settings.EMPTY)); + } + + /** + * Validates the given settings for the given project by running it through all update listeners without applying it. This + * method will not change any settings but will fail if any of the settings can't be applied. + */ + public void validateUpdate(ProjectId projectId, Settings newSettings) { + Settings lastSettingsApplied = projectSettings.getOrDefault(projectId, Settings.EMPTY); + validateUpdate(newSettings, lastSettingsApplied); } - @FixForMultiProject - @Override - public T get(Setting setting) { - throw new UnsupportedOperationException("Not implemented for project scoped settings"); + /** + * Applies the given settings of the project to all the settings consumers or to none of them. + * + * @param projectId id of the project + * @param newSettings the settings to apply + * @return the unmerged applied settings + */ + public Settings applySettings(ProjectId projectId, Settings newSettings) { + return projectSettings.compute(projectId, (id, lastSettingsApplied) -> { + if (lastSettingsApplied == null) { + lastSettingsApplied = Settings.EMPTY; + } + executeSettingsUpdaters(projectId, newSettings, lastSettingsApplied); + return newSettings; + }); } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 1ccaeee626fc9..590ceaca135a2 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -15,6 +15,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.VersionId; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.unit.ByteSizeValue; @@ -777,7 +778,7 @@ public Set getSettingsDependencies(final String key) { /** * Build a new updater with a noop validator. */ - final AbstractScopedSettings.SettingUpdater newUpdater(Consumer consumer, Logger logger) { + final AbstractScopedSettings.SettingUpdater newUpdater(BiConsumer consumer, Logger logger) { return newUpdater(consumer, logger, (s) -> {}); } @@ -785,9 +786,9 @@ final AbstractScopedSettings.SettingUpdater newUpdater(Consumer consumer, * Build the updater responsible for validating new values, logging the new * value, and eventually setting the value where it belongs. */ - AbstractScopedSettings.SettingUpdater newUpdater(Consumer consumer, Logger logger, Consumer validator) { + AbstractScopedSettings.SettingUpdater newUpdater(BiConsumer consumer, Logger logger, Consumer validator) { if (isDynamic()) { - return new Updater(consumer, logger, validator); + return new Updater<>(consumer, logger, validator); } else { throw new IllegalStateException("setting [" + getKey() + "] is not dynamic"); } @@ -795,17 +796,17 @@ AbstractScopedSettings.SettingUpdater newUpdater(Consumer consumer, Logger /** * Updates settings that depend on each other. - * See {@link AbstractScopedSettings#addSettingsUpdateConsumer(Setting, Setting, BiConsumer)} and its usage for details. + * See {@link AbstractScopedSettings#addSettingsUpdateConsumer(Setting, Setting, TriConsumer)} and its usage for details. */ - static AbstractScopedSettings.SettingUpdater> compoundUpdater( - final BiConsumer consumer, + static AbstractScopedSettings.SettingUpdater> compoundUpdater( + final TriConsumer consumer, final BiConsumer validator, final Setting aSetting, final Setting bSetting, Logger logger ) { - final AbstractScopedSettings.SettingUpdater aSettingUpdater = aSetting.newUpdater(null, logger); - final AbstractScopedSettings.SettingUpdater bSettingUpdater = bSetting.newUpdater(null, logger); + final AbstractScopedSettings.SettingUpdater aSettingUpdater = aSetting.newUpdater(null, logger); + final AbstractScopedSettings.SettingUpdater bSettingUpdater = bSetting.newUpdater(null, logger); return new AbstractScopedSettings.SettingUpdater<>() { @Override public boolean hasChanged(Settings current, Settings previous) { @@ -821,14 +822,14 @@ public Tuple getValue(Settings current, Settings previous) { } @Override - public void apply(Tuple value, Settings current, Settings previous) { + public void apply(C context, Tuple value, Settings current, Settings previous) { if (aSettingUpdater.hasChanged(current, previous)) { logSettingUpdate(aSetting, current, previous, logger); } if (bSettingUpdater.hasChanged(current, previous)) { logSettingUpdate(bSetting, current, previous, logger); } - consumer.accept(value.v1(), value.v2()); + consumer.apply(context, value.v1(), value.v2()); } @Override @@ -838,15 +839,15 @@ public String toString() { }; } - static AbstractScopedSettings.SettingUpdater groupedSettingsUpdater( - Consumer consumer, + static AbstractScopedSettings.SettingUpdater groupedSettingsUpdater( + BiConsumer consumer, final List> configuredSettings ) { return groupedSettingsUpdater(consumer, configuredSettings, (v) -> {}); } - static AbstractScopedSettings.SettingUpdater groupedSettingsUpdater( - Consumer consumer, + static AbstractScopedSettings.SettingUpdater groupedSettingsUpdater( + BiConsumer consumer, final List> configuredSettings, Consumer validator ) { @@ -877,8 +878,8 @@ public Settings getValue(Settings current, Settings previous) { } @Override - public void apply(Settings value, Settings current, Settings previous) { - consumer.accept(value); + public void apply(C context, Settings value, Settings current, Settings previous) { + consumer.accept(context, value); } @Override @@ -964,8 +965,8 @@ public void validate(final String key, final Object value, final Object dependen } } - AbstractScopedSettings.SettingUpdater, T>> newAffixUpdater( - BiConsumer consumer, + AbstractScopedSettings.SettingUpdater, T>> newAffixUpdater( + TriConsumer consumer, Logger logger, BiConsumer validator ) { @@ -977,14 +978,14 @@ public boolean hasChanged(Settings current, Settings previous) { } @Override - public Map, T> getValue(Settings current, Settings previous) { + public Map, T> getValue(Settings current, Settings previous) { // we collect all concrete keys and then delegate to the actual setting for validation and settings extraction - final Map, T> result = new IdentityHashMap<>(); + final Map, T> result = new IdentityHashMap<>(); Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> { String namespace = key.getNamespace(aKey); Setting concreteSetting = getConcreteSetting(namespace, aKey); - AbstractScopedSettings.SettingUpdater updater = concreteSetting.newUpdater( - (v) -> consumer.accept(namespace, v), + AbstractScopedSettings.SettingUpdater updater = concreteSetting.newUpdater( + (c, v) -> consumer.apply(null, namespace, v), logger, (v) -> validator.accept(namespace, v) ); @@ -999,16 +1000,21 @@ public Map, T> getValue(Settings curren } @Override - public void apply(Map, T> value, Settings current, Settings previous) { - for (Map.Entry, T> entry : value.entrySet()) { - entry.getKey().apply(entry.getValue(), current, previous); + public void apply( + C context, + Map, T> value, + Settings current, + Settings previous + ) { + for (Map.Entry, T> entry : value.entrySet()) { + entry.getKey().apply(context, entry.getValue(), current, previous); } } }; } - AbstractScopedSettings.SettingUpdater> newAffixMapUpdater( - Consumer> consumer, + AbstractScopedSettings.SettingUpdater> newAffixMapUpdater( + BiConsumer> consumer, Logger logger, BiConsumer validator ) { @@ -1026,8 +1032,8 @@ public Map getValue(Settings current, Settings previous) { Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> { String namespace = key.getNamespace(aKey); Setting concreteSetting = getConcreteSetting(namespace, aKey); - AbstractScopedSettings.SettingUpdater updater = concreteSetting.newUpdater( - (v) -> {}, + AbstractScopedSettings.SettingUpdater updater = concreteSetting.newUpdater( + (c, v) -> {}, logger, (v) -> validator.accept(namespace, v) ); @@ -1042,9 +1048,9 @@ public Map getValue(Settings current, Settings previous) { } @Override - public void apply(Map value, Settings current, Settings previous) { + public void apply(C context, Map value, Settings current, Settings previous) { Setting.logSettingUpdate(AffixSetting.this, current, previous, logger); - consumer.accept(value); + consumer.accept(context, value); } }; } @@ -1248,8 +1254,8 @@ public void diff(Settings.Builder builder, Settings source, Settings defaultSett } @Override - public AbstractScopedSettings.SettingUpdater newUpdater( - Consumer consumer, + public AbstractScopedSettings.SettingUpdater newUpdater( + BiConsumer consumer, Logger logger, Consumer validator ) { @@ -1283,9 +1289,9 @@ public Settings getValue(Settings current, Settings previous) { } @Override - public void apply(Settings value, Settings current, Settings previous) { + public void apply(C context, Settings value, Settings current, Settings previous) { Setting.logSettingUpdate(GroupSetting.this, current, previous, logger); - consumer.accept(value); + consumer.accept(context, value); } @Override @@ -1296,15 +1302,15 @@ public String toString() { } } - private final class Updater implements AbstractScopedSettings.SettingUpdater { - private final Consumer consumer; + private final class Updater implements AbstractScopedSettings.SettingUpdater { + private final BiConsumer consumer; private final Logger logger; - private final Consumer accept; + private final Consumer validator; - Updater(Consumer consumer, Logger logger, Consumer accept) { + Updater(BiConsumer consumer, Logger logger, Consumer validator) { this.consumer = consumer; this.logger = logger; - this.accept = accept; + this.validator = validator; } @Override @@ -1328,7 +1334,7 @@ public T getValue(Settings current, Settings previous) { final String value = getRaw(previous); try { T inst = get(current); - accept.accept(inst); + validator.accept(inst); return inst; } catch (Exception e) { if (isFiltered()) { @@ -1343,9 +1349,9 @@ public T getValue(Settings current, Settings previous) { } @Override - public void apply(T value, Settings current, Settings previous) { + public void apply(C context, T value, Settings current, Settings previous) { logSettingUpdate(Setting.this, current, previous, logger); - consumer.accept(value); + consumer.accept(context, value); } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/SettingsModule.java b/server/src/main/java/org/elasticsearch/common/settings/SettingsModule.java index c9d1a0bc25778..43791d158892b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/SettingsModule.java +++ b/server/src/main/java/org/elasticsearch/common/settings/SettingsModule.java @@ -82,7 +82,7 @@ public SettingsModule(Settings settings, List> additionalSettings, Li } this.indexScopedSettings = new IndexScopedSettings(settings, new HashSet<>(this.indexSettings.values())); this.clusterSettings = new ClusterSettings(settings, new HashSet<>(this.nodeSettings.values())); - this.projectScopedSettings = new ProjectScopedSettings(settings, new HashSet<>(this.projectSettings.values())); + this.projectScopedSettings = new ProjectScopedSettings(new HashSet<>(this.projectSettings.values())); Settings indexSettings = settings.filter((s) -> s.startsWith("index.") && clusterSettings.get(s) == null); if (indexSettings.isEmpty() == false) { try { diff --git a/server/src/main/java/org/elasticsearch/common/settings/SettingsUpdater.java b/server/src/main/java/org/elasticsearch/common/settings/SettingsUpdater.java index 07d19c4ddf372..f222080b21206 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/SettingsUpdater.java +++ b/server/src/main/java/org/elasticsearch/common/settings/SettingsUpdater.java @@ -21,7 +21,7 @@ * Updates transient and persistent cluster state settings if there are any changes * due to the update. */ -public final class SettingsUpdater extends BaseSettingsUpdater { +public final class SettingsUpdater extends BaseSettingsUpdater { final Settings.Builder transientUpdates = Settings.builder(); final Settings.Builder persistentUpdates = Settings.builder(); diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index d5dbe23b7994d..0d6b2c8baa2ab 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -72,7 +72,7 @@ public void testScheduling() { final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(); final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(); - final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) { + final ClusterApplierService clusterApplierService = new ClusterApplierService("test", clusterSettings, threadPool) { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java index 6ef622948f5c5..c501bbd9b3470 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java @@ -232,7 +232,7 @@ protected ExecutorService createThreadPoolExecutor() { } }; - final var applierService = new ClusterApplierService("master", settings, clusterSettings, threadPool) { + final var applierService = new ClusterApplierService("master", clusterSettings, threadPool) { private final PrioritizedEsThreadPoolExecutor directExecutor = new PrioritizedEsThreadPoolExecutor( "master-service", 1, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java index 21d547c1593b8..78266e44c22e3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java @@ -145,7 +145,7 @@ public void testAllocate(AllocateUnassignedHandler allocateUnassigned, Consumer< settings, clusterSettings, new FakeThreadPoolMasterService(LOCAL_NODE_ID, threadPool, deterministicTaskQueue::scheduleNow), - new ClusterApplierService(LOCAL_NODE_ID, settings, clusterSettings, threadPool) { + new ClusterApplierService(LOCAL_NODE_ID, clusterSettings, threadPool) { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor(); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index e6f50ef42365e..12e500eaf6576 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -18,12 +18,16 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.NoMasterBlockService; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.project.ProjectStateRegistry; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.ProjectScopedSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; @@ -38,6 +42,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -50,12 +55,18 @@ import static org.hamcrest.Matchers.is; public class ClusterApplierServiceTests extends ESTestCase { + private static final Setting PROJECT_SETTING = new Setting<>("test_project_setting", "0", Integer::parseInt, value -> { + if (value < 0) { + throw new IllegalArgumentException("must be positive"); + } + }, Setting.Property.Dynamic, Setting.Property.ProjectScope); private ThreadPool threadPool; private long currentTimeMillis; private boolean allowClusterStateApplicationFailure = false; private ClusterApplierService clusterApplierService; private ClusterSettings clusterSettings; + private ProjectScopedSettings projectScopedSettings; @Before public void setUp() throws Exception { @@ -74,6 +85,7 @@ public long rawRelativeTimeInMillis() { } }; clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + projectScopedSettings = new ProjectScopedSettings(Set.of(PROJECT_SETTING)); allowClusterStateApplicationFailure = false; clusterApplierService = createClusterApplierService(true); } @@ -91,8 +103,8 @@ private ClusterApplierService createClusterApplierService(boolean makeMaster) { final DiscoveryNode localNode = DiscoveryNodeUtils.builder("node1").roles(emptySet()).build(); final ClusterApplierService clusterApplierService = new ClusterApplierService( "test_node", - Settings.builder().put("cluster.name", "ClusterApplierServiceTests").build(), clusterSettings, + projectScopedSettings, threadPool ) { @Override @@ -495,6 +507,124 @@ public void onFailure(Exception e) { assertThat(error.get().getMessage(), containsString("illegal value can't update")); } + public void testClusterStateApplierBubblesUpExceptionsInProjectSettingsApplier() throws InterruptedException { + AtomicReference error = new AtomicReference<>(); + projectScopedSettings.addSettingsUpdateConsumer(PROJECT_SETTING, (c, v) -> {}); + allowClusterStateApplicationFailure = true; + + CountDownLatch latch = new CountDownLatch(1); + clusterApplierService.onNewClusterState( + "test", + () -> ClusterState.builder(clusterApplierService.state()) + .putCustom( + ProjectStateRegistry.TYPE, + ProjectStateRegistry.builder() + .putProjectSettings(randomUniqueProjectId(), Settings.builder().put(PROJECT_SETTING.getKey(), -1).build()) + .build() + ) + .build(), + new ActionListener<>() { + + @Override + public void onResponse(Void ignored) { + latch.countDown(); + fail("should not be called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(error.compareAndSet(null, e)); + latch.countDown(); + } + } + ); + + latch.await(); + assertNotNull(error.get()); + assertThat(error.get().getMessage(), containsString("illegal value can't update")); + } + + public void testClusterSettingsUpdaterCalledOnSettingsChange() throws InterruptedException { + AtomicReference ref = new AtomicReference<>(); + clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, ref::set); + + CountDownLatch latch = new CountDownLatch(1); + clusterApplierService.onNewClusterState( + "test", + () -> ClusterState.builder(clusterApplierService.state()) + .metadata( + Metadata.builder(clusterApplierService.state().metadata()) + .persistentSettings( + Settings.builder() + .put( + EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), + EnableAllocationDecider.Allocation.NEW_PRIMARIES + ) + .build() + ) + .build() + ) + .build(), + new ActionListener<>() { + + @Override + public void onResponse(Void ignored) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + fail("should not be called"); + } + } + ); + + latch.await(); + assertNotNull(ref.get()); + assertThat(ref.get(), is(EnableAllocationDecider.Allocation.NEW_PRIMARIES)); + } + + public void testProjectSettingsUpdaterCalledOnSettingsChange() throws InterruptedException { + AtomicReference ref = new AtomicReference<>(); + ProjectId projectId = randomUniqueProjectId(); + projectScopedSettings.addSettingsUpdateConsumer(PROJECT_SETTING, (c, v) -> { + assertThat(c, is(projectId)); + ref.set(v); + }); + allowClusterStateApplicationFailure = true; + + CountDownLatch latch = new CountDownLatch(1); + clusterApplierService.onNewClusterState( + "test", + () -> ClusterState.builder(clusterApplierService.state()) + .putCustom( + ProjectStateRegistry.TYPE, + ProjectStateRegistry.builder() + .putProjectSettings(projectId, Settings.builder().put(PROJECT_SETTING.getKey(), 42).build()) + .putProjectSettings(randomUniqueProjectId(), Settings.builder().build()) + .build() + ) + .build(), + new ActionListener<>() { + @Override + public void onResponse(Void ignored) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + fail("should not be called"); + } + } + ); + + latch.await(); + assertNotNull(ref.get()); + assertThat(ref.get(), is(42)); + } + public void testClusterStateApplierSwallowsExceptionInListener() throws InterruptedException { AtomicReference error = new AtomicReference<>(); AtomicBoolean applierCalled = new AtomicBoolean(); diff --git a/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java b/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java index 253abcf93dace..b26101e0533b0 100644 --- a/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/common/settings/ScopedSettingsTests.java @@ -148,7 +148,7 @@ public void testNoopSettingsUpdate() { public void testAddConsumer() { Setting testSetting = Setting.intSetting("foo.bar", 1, Property.Dynamic, Property.NodeScope); Setting testSetting2 = Setting.intSetting("foo.bar.baz", 1, Property.Dynamic, Property.NodeScope); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, Collections.singleton(testSetting)); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, Collections.singleton(testSetting)); AtomicInteger consumer = new AtomicInteger(); service.addSettingsUpdateConsumer(testSetting, consumer::set); @@ -189,7 +189,7 @@ public void testDependentSettings() { () -> stringSetting ); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, stringSetting))); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, stringSetting))); IllegalArgumentException iae = expectThrows( IllegalArgumentException.class, @@ -228,7 +228,7 @@ public void validate(final String key, final Object value, final Object dependen } ); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, stringSetting))); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, stringSetting))); SettingsException iae = expectThrows( SettingsException.class, @@ -266,7 +266,7 @@ public void testDependentSettingsWithFallback() { () -> nameSetting ); - final AbstractScopedSettings service = new ClusterSettings( + final ClusterSettings service = new ClusterSettings( Settings.EMPTY, new HashSet<>(Arrays.asList(nameFallbackSetting, nameSetting, barSetting)) ); @@ -309,7 +309,7 @@ public Iterator> settings() { }; final Setting dependingSetting = Setting.simpleString(prefix + "foo.depending", dependingValidator, scopeProperty); - final AbstractScopedSettings service = nodeSetting + final AbstractContextlessScopedSettings service = nodeSetting ? new ClusterSettings(Settings.EMPTY, Set.of(baseSetting, dependingSetting)) : new IndexScopedSettings(Settings.EMPTY, Set.of(baseSetting, dependingSetting)); @@ -349,7 +349,7 @@ public void testTupleAffixUpdateConsumer() { listSuffix, (k) -> Setting.listSetting(k, Arrays.asList("1"), Integer::parseInt, Property.Dynamic, Property.NodeScope) ); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, listSetting))); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, listSetting))); Map, Integer>> results = new HashMap<>(); Function listBuilder = g -> (prefix + g + "." + listSuffix); Function intBuilder = g -> (prefix + g + "." + intSuffix); @@ -459,7 +459,7 @@ public void testAffixGroupUpdateConsumer() { listSuffix, (k) -> Setting.listSetting(k, Arrays.asList("1"), Integer::parseInt, Property.Dynamic, Property.NodeScope) ); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, listSetting))); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, listSetting))); Map results = new HashMap<>(); Function listBuilder = g -> (prefix + g + "." + listSuffix); Function intBuilder = g -> (prefix + g + "." + intSuffix); @@ -551,7 +551,7 @@ public void testAffixUpdateConsumerWithAlias() { "fallback.", (ns, k) -> Setting.simpleString(k, "default", Property.Dynamic, Property.NodeScope) ); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(prefixSetting))); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(prefixSetting))); BiConsumer affixUpdateConsumer = Mockito.mock("affixUpdateConsumer"); service.addAffixUpdateConsumer(prefixSetting, affixUpdateConsumer, (s, v) -> {}); @@ -583,7 +583,7 @@ public void testAddConsumerAffix() { "bar.", (ns, k) -> Setting.boolSetting(k, false, Property.Dynamic, Property.NodeScope) ); - AbstractScopedSettings service = new ClusterSettings( + ClusterSettings service = new ClusterSettings( Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, listSetting, fallbackSetting)) ); @@ -656,7 +656,7 @@ public void testAddConsumerAffixMap() { "list", (k) -> Setting.listSetting(k, Arrays.asList("1"), Integer::parseInt, Property.Dynamic, Property.NodeScope) ); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, listSetting))); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(intSetting, listSetting))); Map> listResults = new HashMap<>(); Map intResults = new HashMap<>(); @@ -731,7 +731,7 @@ public void testAffixMapConsumerNotCalledWithNull() { "other.", (k) -> Setting.intSetting(k, 1, Property.Dynamic, Property.NodeScope) ); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(prefixSetting, otherSetting))); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(prefixSetting, otherSetting))); Map affixResults = new HashMap<>(); Consumer> consumer = (map) -> { @@ -753,7 +753,7 @@ public void testAffixMapConsumerNotCalledWithNull() { public void testApply() { Setting testSetting = Setting.intSetting("foo.bar", 1, Property.Dynamic, Property.NodeScope); Setting testSetting2 = Setting.intSetting("foo.bar.baz", 1, Property.Dynamic, Property.NodeScope); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(testSetting, testSetting2))); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(testSetting, testSetting2))); AtomicInteger consumer = new AtomicInteger(); service.addSettingsUpdateConsumer(testSetting, consumer::set); diff --git a/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java b/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java index 6b7f45f2fc669..41789fcc6f19a 100644 --- a/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java +++ b/server/src/test/java/org/elasticsearch/common/settings/SettingTests.java @@ -105,11 +105,14 @@ public void testByteSizeSettingValidation() { final ByteSizeValue byteSizeValue = byteSizeValueSetting.get(Settings.EMPTY); assertThat(byteSizeValue.getBytes(), equalTo(2048L)); AtomicReference value = new AtomicReference<>(null); - ClusterSettings.SettingUpdater settingUpdater = byteSizeValueSetting.newUpdater(value::set, logger); + ClusterSettings.SettingUpdater settingUpdater = byteSizeValueSetting.newUpdater( + (ctx, newValue) -> value.set(newValue), + logger + ); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> settingUpdater.apply(Settings.builder().put("a.byte.size", 12).build(), Settings.EMPTY) + () -> settingUpdater.apply(null, Settings.builder().put("a.byte.size", 12).build(), Settings.EMPTY) ); assertThat(e, hasToString(containsString("illegal value can't update [a.byte.size] from [2048b] to [12]"))); assertNotNull(e.getCause()); @@ -117,7 +120,7 @@ public void testByteSizeSettingValidation() { final IllegalArgumentException cause = (IllegalArgumentException) e.getCause(); final String expected = "failed to parse setting [a.byte.size] with value [12] as a size in bytes: unit is missing or unrecognized"; assertThat(cause, hasToString(containsString(expected))); - assertTrue(settingUpdater.apply(Settings.builder().put("a.byte.size", "12b").build(), Settings.EMPTY)); + assertTrue(settingUpdater.apply(null, Settings.builder().put("a.byte.size", "12b").build(), Settings.EMPTY)); assertThat(value.get(), equalTo(ByteSizeValue.ofBytes(12))); } @@ -147,9 +150,12 @@ public void testMemorySize() { assertEquals(memorySizeValue.getBytes(), JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.25, 1.0); AtomicReference value = new AtomicReference<>(null); - ClusterSettings.SettingUpdater settingUpdater = memorySizeValueSetting.newUpdater(value::set, logger); + ClusterSettings.SettingUpdater settingUpdater = memorySizeValueSetting.newUpdater( + (ctx, newValue) -> value.set(newValue), + logger + ); try { - settingUpdater.apply(Settings.builder().put("a.byte.size", 12).build(), Settings.EMPTY); + settingUpdater.apply(null, Settings.builder().put("a.byte.size", 12).build(), Settings.EMPTY); fail("no unit"); } catch (IllegalArgumentException ex) { assertThat(ex, hasToString(containsString("illegal value can't update [a.byte.size] from [25%] to [12]"))); @@ -161,28 +167,38 @@ public void testMemorySize() { assertThat(cause, hasToString(containsString(expected))); } - assertTrue(settingUpdater.apply(Settings.builder().put("a.byte.size", "12b").build(), Settings.EMPTY)); + assertTrue(settingUpdater.apply(null, Settings.builder().put("a.byte.size", "12b").build(), Settings.EMPTY)); assertEquals(ByteSizeValue.ofBytes(12), value.get()); - assertTrue(settingUpdater.apply(Settings.builder().put("a.byte.size", "20%").build(), Settings.EMPTY)); + assertTrue(settingUpdater.apply(null, Settings.builder().put("a.byte.size", "20%").build(), Settings.EMPTY)); assertEquals(ByteSizeValue.ofBytes((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.2)), value.get()); } public void testSimpleUpdate() { Setting booleanSetting = Setting.boolSetting("foo.bar", false, Property.Dynamic, Property.NodeScope); AtomicReference atomicBoolean = new AtomicReference<>(null); - ClusterSettings.SettingUpdater settingUpdater = booleanSetting.newUpdater(atomicBoolean::set, logger); + AtomicReference context = new AtomicReference<>(); + AbstractScopedSettings.SettingUpdater settingUpdater = booleanSetting.newUpdater((ctx, newValue) -> { + context.set(ctx); + atomicBoolean.set(newValue); + }, logger); + + Object ctx = new Object(); Settings build = Settings.builder().put("foo.bar", false).build(); - settingUpdater.apply(build, Settings.EMPTY); + settingUpdater.apply(ctx, build, Settings.EMPTY); assertNull(atomicBoolean.get()); + assertNull(context.get()); + + context.set(null); build = Settings.builder().put("foo.bar", true).build(); - settingUpdater.apply(build, Settings.EMPTY); + settingUpdater.apply(ctx, build, Settings.EMPTY); assertTrue(atomicBoolean.get()); + assertThat(context.get(), is(ctx)); // try update bogus value build = Settings.builder().put("foo.bar", "I am not a boolean").build(); try { - settingUpdater.apply(build, Settings.EMPTY); + settingUpdater.apply(ctx, build, Settings.EMPTY); fail("not a boolean"); } catch (IllegalArgumentException ex) { assertThat(ex, hasToString(containsString("illegal value can't update [foo.bar] from [false] to [I am not a boolean]"))); @@ -198,12 +214,15 @@ public void testSimpleUpdate() { public void testSimpleUpdateOfFilteredSetting() { Setting booleanSetting = Setting.boolSetting("foo.bar", false, Property.Dynamic, Property.Filtered); - AtomicReference atomicBoolean = new AtomicReference<>(null); - ClusterSettings.SettingUpdater settingUpdater = booleanSetting.newUpdater(atomicBoolean::set, logger); + SettingUpdater settingUpdater = booleanSetting.newUpdater( + (ctx, value) -> fail("Listeners should not be called"), + logger + ); + Object ctx = new Object(); // try update bogus value Settings build = Settings.builder().put("foo.bar", "I am not a boolean").build(); - IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> settingUpdater.apply(build, Settings.EMPTY)); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> settingUpdater.apply(ctx, build, Settings.EMPTY)); assertThat(ex, hasToString(equalTo("java.lang.IllegalArgumentException: illegal value can't update [foo.bar]"))); assertNull(ex.getCause()); } @@ -433,7 +452,7 @@ public void testUpdateNotDynamic() { assertFalse(booleanSetting.isGroupSetting()); AtomicReference atomicBoolean = new AtomicReference<>(null); try { - booleanSetting.newUpdater(atomicBoolean::set, logger); + booleanSetting.newUpdater((ctx, value) -> fail("Listeners shouldn't be called"), logger); fail("not dynamic"); } catch (IllegalStateException ex) { assertEquals("setting [foo.bar] is not dynamic", ex.getMessage()); @@ -443,10 +462,16 @@ public void testUpdateNotDynamic() { public void testUpdaterIsIsolated() { Setting booleanSetting = Setting.boolSetting("foo.bar", false, Property.Dynamic, Property.NodeScope); AtomicReference ab1 = new AtomicReference<>(null); + AtomicReference context = new AtomicReference<>(null); AtomicReference ab2 = new AtomicReference<>(null); - ClusterSettings.SettingUpdater settingUpdater = booleanSetting.newUpdater(ab1::set, logger); - settingUpdater.apply(Settings.builder().put("foo.bar", true).build(), Settings.EMPTY); + SettingUpdater settingUpdater = booleanSetting.newUpdater((ctx, value) -> { + context.set(ctx); + ab1.set(value); + }, logger); + Object ctx = new Object(); + settingUpdater.apply(ctx, Settings.builder().put("foo.bar", true).build(), Settings.EMPTY); assertTrue(ab1.get()); + assertThat(context.get(), is(ctx)); assertNull(ab2.get()); } @@ -488,20 +513,27 @@ public void testComplexType() { Setting setting = new Setting<>("foo.bar", (s) -> "", (s) -> new ComplexType(s), Property.Dynamic, Property.NodeScope); assertFalse(setting.isGroupSetting()); ref.set(setting.get(Settings.EMPTY)); + AtomicReference context = new AtomicReference<>(null); ComplexType type = ref.get(); - ClusterSettings.SettingUpdater settingUpdater = setting.newUpdater(ref::set, logger); - assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY)); + ClusterSettings.SettingUpdater settingUpdater = setting.newUpdater((ctx, value) -> { + context.set(ctx); + ref.set(value); + }, logger); + Object ctx = new Object(); + assertFalse(settingUpdater.apply(ctx, Settings.EMPTY, Settings.EMPTY)); assertSame("no update - type has not changed", type, ref.get()); // change from default - assertTrue(settingUpdater.apply(Settings.builder().put("foo.bar", "2").build(), Settings.EMPTY)); + assertTrue(settingUpdater.apply(ctx, Settings.builder().put("foo.bar", "2").build(), Settings.EMPTY)); assertNotSame("update - type has changed", type, ref.get()); assertEquals("2", ref.get().foo); + assertThat(context.get(), is(ctx)); // change back to default... - assertTrue(settingUpdater.apply(Settings.EMPTY, Settings.builder().put("foo.bar", "2").build())); + assertTrue(settingUpdater.apply(ctx, Settings.EMPTY, Settings.builder().put("foo.bar", "2").build())); assertNotSame("update - type has changed", type, ref.get()); assertEquals("", ref.get().foo); + assertThat(context.get(), is(ctx)); } public void testType() { @@ -517,16 +549,22 @@ public void testGroups() { AtomicReference ref = new AtomicReference<>(null); Setting setting = Setting.groupSetting("foo.bar.", Property.Dynamic, Property.NodeScope); assertTrue(setting.isGroupSetting()); - ClusterSettings.SettingUpdater settingUpdater = setting.newUpdater(ref::set, logger); + AtomicReference context = new AtomicReference<>(null); + ClusterSettings.SettingUpdater settingUpdater = setting.newUpdater((ctx, value) -> { + context.set(ctx); + ref.set(value); + }, logger); + Object ctx = new Object(); Settings currentInput = Settings.builder() .put("foo.bar.1.value", "1") .put("foo.bar.2.value", "2") .put("foo.bar.3.value", "3") .build(); Settings previousInput = Settings.EMPTY; - assertTrue(settingUpdater.apply(currentInput, previousInput)); + assertTrue(settingUpdater.apply(ctx, currentInput, previousInput)); assertNotNull(ref.get()); + assertThat(context.get(), is(ctx)); Settings settings = ref.get(); Map asMap = settings.getAsGroups(); assertEquals(3, asMap.size()); @@ -534,28 +572,34 @@ public void testGroups() { assertEquals(asMap.get("2").get("value"), "2"); assertEquals(asMap.get("3").get("value"), "3"); + context.set(null); previousInput = currentInput; currentInput = Settings.builder().put("foo.bar.1.value", "1").put("foo.bar.2.value", "2").put("foo.bar.3.value", "3").build(); Settings current = ref.get(); - assertFalse(settingUpdater.apply(currentInput, previousInput)); + assertFalse(settingUpdater.apply(ctx, currentInput, previousInput)); assertSame(current, ref.get()); + assertNull(context.get()); + context.set(null); previousInput = currentInput; currentInput = Settings.builder().put("foo.bar.1.value", "1").put("foo.bar.2.value", "2").build(); // now update and check that we got it - assertTrue(settingUpdater.apply(currentInput, previousInput)); + assertTrue(settingUpdater.apply(ctx, currentInput, previousInput)); assertNotSame(current, ref.get()); + assertThat(context.get(), is(ctx)); asMap = ref.get().getAsGroups(); assertEquals(2, asMap.size()); assertEquals(asMap.get("1").get("value"), "1"); assertEquals(asMap.get("2").get("value"), "2"); + context.set(null); previousInput = currentInput; currentInput = Settings.builder().put("foo.bar.1.value", "1").put("foo.bar.2.value", "4").build(); // now update and check that we got it - assertTrue(settingUpdater.apply(currentInput, previousInput)); + assertTrue(settingUpdater.apply(ctx, currentInput, previousInput)); assertNotSame(current, ref.get()); + assertThat(context.get(), is(ctx)); asMap = ref.get().getAsGroups(); assertEquals(2, asMap.size()); @@ -565,11 +609,16 @@ public void testGroups() { assertTrue(setting.match("foo.bar.baz")); assertFalse(setting.match("foo.baz.bar")); - ClusterSettings.SettingUpdater predicateSettingUpdater = setting.newUpdater(ref::set, logger, (s) -> { - throw randomBoolean() ? new RuntimeException("anything") : new IllegalArgumentException("illegal"); - }); + SettingUpdater predicateSettingUpdater = setting.newUpdater( + (c, value) -> fail("Listeners shouldn't be called"), + logger, + (s) -> { + throw randomBoolean() ? new RuntimeException("anything") : new IllegalArgumentException("illegal"); + } + ); try { predicateSettingUpdater.apply( + ctx, Settings.builder().put("foo.bar.1.value", "1").put("foo.bar.2.value", "2").build(), Settings.EMPTY ); @@ -587,15 +636,20 @@ public void testGroupKeyExists() { } public void testFilteredGroups() { - AtomicReference ref = new AtomicReference<>(null); Setting setting = Setting.groupSetting("foo.bar.", Property.Filtered, Property.Dynamic); - ClusterSettings.SettingUpdater predicateSettingUpdater = setting.newUpdater(ref::set, logger, (s) -> { - throw randomBoolean() ? new RuntimeException("anything") : new IllegalArgumentException("illegal"); - }); + ClusterSettings.SettingUpdater predicateSettingUpdater = setting.newUpdater( + (ctx, value) -> fail("Listeners shouldn't be called"), + logger, + (s) -> { + throw randomBoolean() ? new RuntimeException("anything") : new IllegalArgumentException("illegal"); + } + ); + Object ctx = new Object(); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, () -> predicateSettingUpdater.apply( + ctx, Settings.builder().put("foo.bar.1.value", "1").put("foo.bar.2.value", "2").build(), Settings.EMPTY ) @@ -633,64 +687,88 @@ public void testComposite() { Composite c = new Composite(); Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); Setting b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope); - ClusterSettings.SettingUpdater> settingUpdater = Setting.compoundUpdater(c::set, c::validate, a, b, logger); - assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY)); + AtomicReference context = new AtomicReference<>(null); + ClusterSettings.SettingUpdater> settingUpdater = Setting.compoundUpdater((ctx, value1, value2) -> { + context.set(ctx); + c.set(value1, value2); + }, c::validate, a, b, logger); + + Object ctx = new Object(); + assertFalse(settingUpdater.apply(ctx, Settings.EMPTY, Settings.EMPTY)); assertNull(c.a); assertNull(c.b); + assertNull(context.get()); + context.set(null); Settings build = Settings.builder().put("foo.int.bar.a", 2).build(); - assertTrue(settingUpdater.apply(build, Settings.EMPTY)); + assertTrue(settingUpdater.apply(ctx, build, Settings.EMPTY)); assertEquals(2, c.a.intValue()); assertEquals(1, c.b.intValue()); + assertThat(context.get(), is(ctx)); + context.set(null); Integer aValue = c.a; - assertFalse(settingUpdater.apply(build, build)); + assertFalse(settingUpdater.apply(ctx, build, build)); assertSame(aValue, c.a); + assertNull(context.get()); Settings previous = build; build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build(); - assertTrue(settingUpdater.apply(build, previous)); + assertTrue(settingUpdater.apply(ctx, build, previous)); assertEquals(2, c.a.intValue()); assertEquals(5, c.b.intValue()); + assertThat(context.get(), is(ctx)); + context.set(null); // reset to default - assertTrue(settingUpdater.apply(Settings.EMPTY, build)); + assertTrue(settingUpdater.apply(ctx, Settings.EMPTY, build)); assertEquals(1, c.a.intValue()); assertEquals(1, c.b.intValue()); - + assertThat(context.get(), is(ctx)); } public void testCompositeValidator() { Composite c = new Composite(); Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); Setting b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope); - ClusterSettings.SettingUpdater> settingUpdater = Setting.compoundUpdater(c::set, c::validate, a, b, logger); - assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY)); + AtomicReference context = new AtomicReference<>(null); + ClusterSettings.SettingUpdater> settingUpdater = Setting.compoundUpdater((ctx, value1, value2) -> { + context.set(ctx); + c.set(value1, value2); + }, c::validate, a, b, logger); + Object ctx = new Object(); + assertFalse(settingUpdater.apply(ctx, Settings.EMPTY, Settings.EMPTY)); assertNull(c.a); assertNull(c.b); + assertNull(context.get()); + context.set(null); Settings build = Settings.builder().put("foo.int.bar.a", 2).build(); - assertTrue(settingUpdater.apply(build, Settings.EMPTY)); + assertTrue(settingUpdater.apply(ctx, build, Settings.EMPTY)); assertEquals(2, c.a.intValue()); assertEquals(1, c.b.intValue()); + context.set(null); Integer aValue = c.a; - assertFalse(settingUpdater.apply(build, build)); + assertFalse(settingUpdater.apply(ctx, build, build)); assertSame(aValue, c.a); + assertNull(context.get()); Settings previous = build; build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build(); - assertTrue(settingUpdater.apply(build, previous)); + assertTrue(settingUpdater.apply(ctx, build, previous)); assertEquals(2, c.a.intValue()); assertEquals(5, c.b.intValue()); + assertThat(context.get(), is(ctx)); + context.set(null); Settings invalid = Settings.builder().put("foo.int.bar.a", -2).put("foo.int.bar.b", 5).build(); - IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () -> settingUpdater.apply(invalid, previous)); + IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () -> settingUpdater.apply(ctx, invalid, previous)); assertThat(exc.getMessage(), equalTo("boom")); // reset to default - assertTrue(settingUpdater.apply(Settings.EMPTY, build)); + assertTrue(settingUpdater.apply(ctx, Settings.EMPTY, build)); assertEquals(1, c.a.intValue()); assertEquals(1, c.b.intValue()); - + assertThat(context.get(), is(ctx)); } public void testListKeyExists() { @@ -773,24 +851,35 @@ public void testListSettings() { assertTrue(listSetting.exists(builder.build())); AtomicReference> ref = new AtomicReference<>(); - AbstractScopedSettings.SettingUpdater> settingUpdater = listSetting.newUpdater(ref::set, logger); + AtomicReference context = new AtomicReference<>(); + AbstractScopedSettings.SettingUpdater> settingUpdater = listSetting.newUpdater((ctx, newValue) -> { + context.set(ctx); + ref.set(newValue); + }, logger); + Object ctx = new Object(); assertTrue(settingUpdater.hasChanged(builder.build(), Settings.EMPTY)); - settingUpdater.apply(builder.build(), Settings.EMPTY); + settingUpdater.apply(ctx, builder.build(), Settings.EMPTY); assertEquals(input.size(), ref.get().size()); assertArrayEquals(ref.get().toArray(new String[0]), input.toArray(new String[0])); + assertThat(context.get(), is(ctx)); - settingUpdater.apply(Settings.builder().putList("foo.bar", "123").build(), builder.build()); + context.set(null); + settingUpdater.apply(ctx, Settings.builder().putList("foo.bar", "123").build(), builder.build()); assertEquals(1, ref.get().size()); assertArrayEquals(ref.get().toArray(new String[0]), new String[] { "123" }); - settingUpdater.apply(Settings.builder().put("foo.bar", "1,2,3").build(), Settings.builder().putList("foo.bar", "123").build()); + settingUpdater.apply(ctx, Settings.builder().put("foo.bar", "1,2,3").build(), Settings.builder().putList("foo.bar", "123").build()); assertEquals(3, ref.get().size()); assertArrayEquals(ref.get().toArray(new String[0]), new String[] { "1", "2", "3" }); + assertThat(context.get(), is(ctx)); - settingUpdater.apply(Settings.EMPTY, Settings.builder().put("foo.bar", "1,2,3").build()); + context.set(null); + settingUpdater.apply(ctx, Settings.EMPTY, Settings.builder().put("foo.bar", "1,2,3").build()); assertEquals(1, ref.get().size()); assertEquals("foo,bar", ref.get().get(0)); + assertThat(context.get(), is(ctx)); + context.set(null); Setting> otherSettings = Setting.listSetting( "foo.bar", Collections.emptyList(), @@ -1246,27 +1335,24 @@ public void testTimeValueBounds() { public void testSettingsGroupUpdater() { Setting intSetting = Setting.intSetting("prefix.foo", 1, Property.NodeScope, Property.Dynamic); Setting intSetting2 = Setting.intSetting("prefix.same", 1, Property.NodeScope, Property.Dynamic); - AbstractScopedSettings.SettingUpdater updater = Setting.groupedSettingsUpdater( - s -> {}, - Arrays.asList(intSetting, intSetting2) - ); + SettingUpdater updater = Setting.groupedSettingsUpdater((ctx, s) -> {}, Arrays.asList(intSetting, intSetting2)); Settings current = Settings.builder().put("prefix.foo", 123).put("prefix.same", 5555).build(); Settings previous = Settings.builder().put("prefix.foo", 321).put("prefix.same", 5555).build(); - assertTrue(updater.apply(current, previous)); + assertTrue(updater.apply(null, current, previous)); } public void testSettingsGroupUpdaterRemoval() { Setting intSetting = Setting.intSetting("prefix.foo", 1, Property.NodeScope, Property.Dynamic); Setting intSetting2 = Setting.intSetting("prefix.same", 1, Property.NodeScope, Property.Dynamic); - AbstractScopedSettings.SettingUpdater updater = Setting.groupedSettingsUpdater( - s -> {}, + AbstractScopedSettings.SettingUpdater updater = Setting.groupedSettingsUpdater( + (ctx, s) -> {}, Arrays.asList(intSetting, intSetting2) ); Settings current = Settings.builder().put("prefix.same", 5555).build(); Settings previous = Settings.builder().put("prefix.foo", 321).put("prefix.same", 5555).build(); - assertTrue(updater.apply(current, previous)); + assertTrue(updater.apply(null, current, previous)); } public void testSettingsGroupUpdaterWithAffixSetting() { @@ -1281,8 +1367,8 @@ public void testSettingsGroupUpdaterWithAffixSetting() { key -> Setting.simpleString(key, Property.NodeScope, Property.Dynamic) ); - AbstractScopedSettings.SettingUpdater updater = Setting.groupedSettingsUpdater( - s -> {}, + AbstractScopedSettings.SettingUpdater updater = Setting.groupedSettingsUpdater( + (ctx, s) -> {}, Arrays.asList(intSetting, prefixKeySetting, affixSetting) ); @@ -1319,7 +1405,7 @@ public void testSettingsGroupUpdaterWithAffixSetting() { || changeAffixKeySetting || changePrefixKeySetting || removeAffixNamespace; - assertThat(updater.apply(currentSettingsBuilder.build(), previousSettingsBuilder.build()), is(expectedChange)); + assertThat(updater.apply(null, currentSettingsBuilder.build(), previousSettingsBuilder.build()), is(expectedChange)); } public void testAffixNamespacesWithGroupSetting() { @@ -1351,8 +1437,8 @@ public void testGroupSettingUpdaterValidator() { } }; - AbstractScopedSettings.SettingUpdater updater = Setting.groupedSettingsUpdater( - s -> {}, + AbstractScopedSettings.SettingUpdater updater = Setting.groupedSettingsUpdater( + (ctx, s) -> {}, Arrays.asList(affixSetting, fixSetting), validator ); @@ -1421,11 +1507,11 @@ public void testAffixMapUpdateWithNullSettingValue() { key -> Setting.simpleString(key, Property.Dynamic, Property.NodeScope) ); - final Consumer> consumer = (map) -> {}; + final BiConsumer> consumer = (ctx, map) -> {}; final BiConsumer validator = (s1, s2) -> {}; // WHEN creating an affix updater - final SettingUpdater> updater = affixSetting.newAffixMapUpdater(consumer, logger, validator); + final SettingUpdater> updater = affixSetting.newAffixMapUpdater(consumer, logger, validator); // THEN affix updater is always expected to have changed (even when defaults are omitted) assertTrue(updater.hasChanged(current, previous)); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index aa0bf049c2c0c..12476e0c5c1d7 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -101,7 +101,7 @@ private ClusterService createClusterService(final Settings.Builder settingsBuild settings, clusterSettings, new FakeThreadPoolMasterService(initialState.nodes().getLocalNodeId(), threadPool, deterministicTaskQueue::scheduleNow), - new ClusterApplierService(initialState.nodes().getLocalNodeId(), settings, clusterSettings, threadPool) { + new ClusterApplierService(initialState.nodes().getLocalNodeId(), clusterSettings, threadPool) { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 81f4f743d821a..7ce7103b65f2b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -2312,7 +2312,7 @@ private final class TestClusterNode { settings, clusterSettings, masterService, - new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) { + new ClusterApplierService(node.getName(), clusterSettings, threadPool) { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor(command -> new Runnable() { diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index f9a9baf09898b..0ad880fe2cf42 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -17,7 +17,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.VersionInformation; -import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -661,7 +660,7 @@ public void testModeSettingsCannotBeUsedWhenInDifferentMode() { Set> clusterSettings = new HashSet<>(); clusterSettings.add(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE); clusterSettings.addAll(restrictedSettings.stream().map(Tuple::v1).toList()); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings); // Should validate successfully service.validate(settings, true); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 76e280c987ae1..f43c31c7b0969 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -19,7 +19,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; @@ -1136,7 +1135,7 @@ public void testRemoteClusterSkipIfDisconnectedSetting() { ); } - AbstractScopedSettings service = new ClusterSettings( + ClusterSettings service = new ClusterSettings( Settings.EMPTY, new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)) ); diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index de445f63afe3a..e66e6fa21bc5d 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; @@ -1216,7 +1215,7 @@ public void testModeSettingsCannotBeUsedWhenInDifferentMode() { Set> clusterSettings = new HashSet<>(); clusterSettings.add(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE); clusterSettings.addAll(restrictedSettings.stream().map(Tuple::v1).toList()); - AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings); + ClusterSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings); // Should validate successfully service.validate(settings, true); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index b8f225aedc0de..438f9de5cb650 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -55,6 +55,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.ProjectScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -1123,11 +1124,12 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)) ); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final ProjectScopedSettings projectScopedSettings = new ProjectScopedSettings(Collections.emptySet()); clusterApplierService = new DisruptableClusterApplierService( localNode.getId(), localNode.getEphemeralId(), - settings, clusterSettings, + projectScopedSettings, deterministicTaskQueue, this::onNode, threadPool @@ -1823,13 +1825,13 @@ static class DisruptableClusterApplierService extends ClusterApplierService { DisruptableClusterApplierService( String nodeName, String nodeId, - Settings settings, ClusterSettings clusterSettings, + ProjectScopedSettings projectScopedSettings, DeterministicTaskQueue deterministicTaskQueue, UnaryOperator taskWrapper, ThreadPool threadPool ) { - super(nodeName, settings, clusterSettings, threadPool); + super(nodeName, clusterSettings, projectScopedSettings, threadPool); this.nodeName = nodeName; this.nodeId = nodeId; this.deterministicTaskQueue = deterministicTaskQueue; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/RealmSettingsTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/RealmSettingsTests.java index 7cfcb89e78085..bdcf33b80bba1 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/RealmSettingsTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/RealmSettingsTests.java @@ -441,7 +441,7 @@ private IllegalArgumentException assertError(String realmType, String realmName, private void validate(Settings settings) { final Set> settingsSet = new HashSet<>(InternalRealmsSettings.getSettings()); - final AbstractScopedSettings validator = new AbstractScopedSettings(settings, settingsSet, Setting.Property.NodeScope) { + final AbstractScopedSettings validator = new AbstractScopedSettings<>(settingsSet, Setting.Property.NodeScope) { }; validator.validate(settings, false); }